Compare commits

..

2 commits

Author SHA1 Message Date
8f3430fb77
Add support for reading old db image ids 2021-07-11 23:33:22 -04:00
ec9473fa78
clippy lint 2021-07-11 23:25:17 -04:00
4 changed files with 97 additions and 9 deletions

12
Cargo.lock generated
View file

@ -1184,6 +1184,7 @@ dependencies = [
"lfu_cache", "lfu_cache",
"log", "log",
"lru", "lru",
"md-5",
"once_cell", "once_cell",
"parking_lot", "parking_lot",
"prometheus", "prometheus",
@ -1217,6 +1218,17 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "md-5"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15"
dependencies = [
"block-buffer",
"digest",
"opaque-debug",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.4.0" version = "2.4.0"

View file

@ -29,6 +29,7 @@ once_cell = "1"
log = { version = "0.4", features = [ "serde" ] } log = { version = "0.4", features = [ "serde" ] }
lfu_cache = "1" lfu_cache = "1"
lru = "0.6" lru = "0.6"
md-5 = "0.9"
parking_lot = "0.11" parking_lot = "0.11"
prometheus = { version = "0.12", features = [ "process" ] } prometheus = { version = "0.12", features = [ "process" ] }
reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] } reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] }

83
src/cache/disk.rs vendored
View file

@ -1,6 +1,7 @@
//! Low memory caching stuff //! Low memory caching stuff
use std::path::PathBuf; use std::os::unix::prelude::OsStrExt;
use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -8,6 +9,8 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use log::{debug, error, warn, LevelFilter}; use log::{debug, error, warn, LevelFilter};
use md5::digest::generic_array::GenericArray;
use md5::{Digest, Md5};
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
use sqlx::{ConnectOptions, SqlitePool}; use sqlx::{ConnectOptions, SqlitePool};
use tokio::fs::remove_file; use tokio::fs::remove_file;
@ -120,9 +123,16 @@ async fn db_listener(
for message in messages { for message in messages {
match message { match message {
DbMessage::Get(entry) => { DbMessage::Get(entry) => {
let hash = Md5Hash::from(entry.as_path());
let hash_str = hash.to_hex_string();
let key = entry.as_os_str().to_str(); let key = entry.as_os_str().to_str();
let query = // let legacy_key = key.map();
sqlx::query!("update Images set accessed = ? where id = ?", now, key) let query = sqlx::query!(
"update Images set accessed = ? where id = ? or id = ?",
now,
key,
hash_str
)
.execute(&mut transaction) .execute(&mut transaction)
.await; .await;
if let Err(e) = query { if let Err(e) = query {
@ -195,7 +205,27 @@ async fn db_listener(
for item in items { for item in items {
debug!("deleting file due to exceeding cache size"); debug!("deleting file due to exceeding cache size");
size_freed += item.size as u64; size_freed += item.size as u64;
tokio::spawn(remove_file(item.id)); tokio::spawn(async move {
let key = item.id;
if let Err(e) = remove_file(key.clone()).await {
match e.kind() {
std::io::ErrorKind::NotFound => {
let hash = Md5Hash(*GenericArray::from_slice(key.as_bytes()));
let path: PathBuf = hash.into();
if let Err(e) = remove_file(&path).await {
warn!(
"Failed to delete file `{}` from cache: {}",
path.to_string_lossy(),
e
);
}
}
_ => {
warn!("Failed to delete file `{}` from cache: {}", &key, e);
}
}
}
});
} }
cache.disk_cur_size.fetch_sub(size_freed, Ordering::Release); cache.disk_cur_size.fetch_sub(size_freed, Ordering::Release);
@ -203,6 +233,51 @@ async fn db_listener(
} }
} }
/// Represents a Md5 hash that can be converted to and from a path. This is used
/// for compatibility with the official client, where the image id and on-disk
/// path is determined by file path.
#[derive(Clone, Copy)]
struct Md5Hash(GenericArray<u8, <Md5 as md5::Digest>::OutputSize>);
impl Md5Hash {
fn to_hex_string(self) -> String {
format!("{:x}", self.0)
}
}
impl From<&Path> for Md5Hash {
fn from(path: &Path) -> Self {
let mut iter = path.iter();
let file_name = iter.next_back().unwrap();
let chapter_hash = iter.next_back().unwrap();
let is_data_saver = iter.next_back().unwrap() == "saver";
let mut hasher = Md5::new();
if is_data_saver {
hasher.update("saver");
}
hasher.update(chapter_hash.as_bytes());
hasher.update(".");
hasher.update(file_name.as_bytes());
Self(hasher.finalize())
}
}
// Lint is overly aggressive here, as Md5Hash guarantees there to be at least 3
// bytes.
#[allow(clippy::fallible_impl_from)]
impl From<Md5Hash> for PathBuf {
fn from(hash: Md5Hash) -> Self {
let hex_value = hash.to_hex_string();
hex_value[0..3]
.chars()
.rev()
.map(|char| Self::from(char.to_string()))
.reduce(|first, second| first.join(second))
.unwrap() // literally not possible
.join(hex_value)
}
}
#[async_trait] #[async_trait]
impl Cache for DiskCache { impl Cache for DiskCache {
async fn get( async fn get(

View file

@ -119,9 +119,9 @@ pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl R
} }
// push_headers(&mut resp_builder); // push_headers(&mut resp_builder);
let mut ret = resp_builder.body(resp.bytes().await.unwrap_or_default()); let mut resp = resp_builder.body(resp.bytes().await.unwrap_or_default());
*ret.headers_mut() = headers; *resp.headers_mut() = headers;
ServerResponse::HttpResponse(ret) ServerResponse::HttpResponse(resp)
} }
#[allow(clippy::future_not_send, clippy::unused_async)] #[allow(clippy::future_not_send, clippy::unused_async)]