diff --git a/Cargo.lock b/Cargo.lock index 9ac93cc..206a0d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,140 +234,16 @@ dependencies = [ ] [[package]] -name = "async-channel" -version = "1.6.1" +name = "async-trait" +version = "0.1.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" +checksum = "36ea56748e10732c49404c153638a15ec3d6211ec5ff35d9bb20e13b93576adf" dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", + "proc-macro2", + "quote", + "syn", ] -[[package]] -name = "async-executor" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb877970c7b440ead138f6321a3b5395d6061183af779340b65e20c0fede9146" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "once_cell", - "vec-arena", -] - -[[package]] -name = "async-global-executor" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6" -dependencies = [ - "async-channel", - "async-executor", - "async-io", - "async-mutex", - "blocking", - "futures-lite", - "num_cpus", - "once_cell", -] - -[[package]] -name = "async-io" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9315f8f07556761c3e48fec2e6b276004acf426e6dc068b2c2251854d65ee0fd" -dependencies = [ - "concurrent-queue", - "fastrand", - "futures-lite", - "libc", - "log", - "nb-connect", - "once_cell", - "parking", - "polling", - "vec-arena", - "waker-fn", - "winapi", -] - -[[package]] -name = "async-lock" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-mutex" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-process" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef37b86e2fa961bae5a4d212708ea0154f904ce31d1a4a7f47e1bbc33a0c040b" -dependencies = [ - "async-io", - "blocking", - "cfg-if", - "event-listener", - "futures-lite", - "once_cell", - "signal-hook", - "winapi", -] - -[[package]] -name = "async-std" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f06685bad74e0570f5213741bea82158279a4103d988e57bfada11ad230341" -dependencies = [ - "async-channel", - "async-global-executor", - "async-io", - "async-lock", - "async-process", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "num_cpus", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" - -[[package]] -name = "atomic-waker" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" - [[package]] name = "atty" version = "0.2.14" @@ -452,20 +328,6 @@ dependencies = [ "byte-tools", ] -[[package]] -name = "blocking" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - [[package]] name = "brotli-sys" version = "0.3.2" @@ -519,35 +381,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "cacache" -version = "8.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "159f24b3d31d4498536a317776f6d766c0cbf22a4627a607d03ec8903d910c80" -dependencies = [ - "async-std", - "digest 0.8.1", - "either", - "futures", - "hex 0.4.3", - "memmap", - "serde", - "serde_derive", - "serde_json", - "sha-1 0.8.2", - "sha2", - "ssri", - "tempfile", - "thiserror", - "walkdir", -] - -[[package]] -name = "cache-padded" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" - [[package]] name = "cc" version = "1.0.67" @@ -618,15 +451,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "concurrent-queue" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" -dependencies = [ - "cache-padded", -] - [[package]] name = "const_fn" version = "0.4.6" @@ -681,27 +505,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-utils" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49" -dependencies = [ - "autocfg", - "cfg-if", - "lazy_static", -] - -[[package]] -name = "ctor" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e98e2ad1a782e33928b96fc3948e7c355e5af34ba4de7670fe8bac2a3b2006d" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "ctrlc" version = "3.1.8" @@ -712,6 +515,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "dashmap" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +dependencies = [ + "cfg-if", + "num_cpus", +] + [[package]] name = "derive_more" version = "0.99.13" @@ -769,27 +582,12 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "event-listener" -version = "2.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" - [[package]] name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -[[package]] -name = "fastrand" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" -dependencies = [ - "instant", -] - [[package]] name = "flate2" version = "1.0.20" @@ -881,21 +679,6 @@ version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59" -[[package]] -name = "futures-lite" -version = "1.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4481d0cd0de1d204a4fa55e7d45f07b1d958abcb06714b3446438e2eff695fb" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.13" @@ -970,19 +753,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "gloo-timers" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "h2" version = "0.3.2" @@ -1035,12 +805,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" -[[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - [[package]] name = "http" version = "0.2.4" @@ -1163,15 +927,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "language-tags" version = "0.2.2" @@ -1235,7 +990,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ "cfg-if", - "value-bag", ] [[package]] @@ -1252,13 +1006,14 @@ name = "mangadex-home" version = "0.2.1" dependencies = [ "actix-web", + "async-trait", "base64 0.13.0", "bincode", "bytes", - "cacache", "chrono", "clap", "ctrlc", + "dashmap", "dotenv", "futures", "log", @@ -1289,16 +1044,6 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "mime" version = "0.3.16" @@ -1355,16 +1100,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nb-connect" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d" -dependencies = [ - "libc", - "socket2", -] - [[package]] name = "nix" version = "0.20.0" @@ -1472,12 +1207,6 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85" -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - [[package]] name = "parking_lot" version = "0.11.1" @@ -1547,19 +1276,6 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" -[[package]] -name = "polling" -version = "2.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fc12d774e799ee9ebae13f4076ca003b40d18a11ac0f3641e6f899618580b7b" -dependencies = [ - "cfg-if", - "libc", - "log", - "wepoll-sys", - "winapi", -] - [[package]] name = "ppv-lite86" version = "0.2.10" @@ -1773,15 +1489,6 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.19" @@ -1932,16 +1639,6 @@ dependencies = [ "opaque-debug 0.2.3", ] -[[package]] -name = "signal-hook" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef33d6d0cd06e0840fba9985aab098c147e67e05cee14d412d3345ed14ff30ac" -dependencies = [ - "libc", - "signal-hook-registry", -] - [[package]] name = "signal-hook-registry" version = "1.3.0" @@ -2011,7 +1708,7 @@ checksum = "e875e58f12c33f832d09cff94b079aee84a48d8188d7a7132b3434358afb70c9" dependencies = [ "base64 0.10.1", "digest 0.8.1", - "hex 0.3.2", + "hex", "serde", "serde_derive", "sha-1 0.8.2", @@ -2379,27 +2076,12 @@ dependencies = [ "serde", ] -[[package]] -name = "value-bag" -version = "1.0.0-alpha.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b676010e055c99033117c2343b33a40a30b91fecd6c49055ac9cd2d6c305ab1" -dependencies = [ - "ctor", -] - [[package]] name = "vcpkg" version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb" -[[package]] -name = "vec-arena" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34b2f665b594b07095e3ac3f718e13c2197143416fae4c5706cffb7b1af8d7f1" - [[package]] name = "vec_map" version = "0.8.2" @@ -2412,23 +2094,6 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - -[[package]] -name = "walkdir" -version = "2.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" -dependencies = [ - "same-file", - "winapi", - "winapi-util", -] - [[package]] name = "want" version = "0.3.0" @@ -2542,15 +2207,6 @@ dependencies = [ "webpki", ] -[[package]] -name = "wepoll-sys" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcb14dea929042224824779fbc82d9fab8d2e6d3cbc0ac404de8edf489e77ff" -dependencies = [ - "cc", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index e89fdc2..cbf1fc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,13 +10,14 @@ repository = "https://github.com/edward-shen/mangadex-home-rs" [dependencies] actix-web = { version = "4.0.0-beta.4", features = [ "rustls" ] } +async-trait = "0.1" base64 = "0.13" bincode = "1" bytes = "1" -cacache = "8" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] } ctrlc = "3" +dashmap = "4" dotenv = "0.15" futures = "0.3" once_cell = "1" diff --git a/src/cache.rs b/src/cache.rs deleted file mode 100644 index 4e234e6..0000000 --- a/src/cache.rs +++ /dev/null @@ -1,203 +0,0 @@ -use std::{fmt::Display, path::PathBuf}; - -use futures::future::join_all; -use log::warn; -use lru::LruCache; -use serde::{Deserialize, Serialize}; -use ssri::Integrity; - -#[derive(PartialEq, Eq, Hash, Clone)] -pub struct CacheKey(pub String, pub String, pub bool); - -impl Display for CacheKey { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.2 { - write!(f, "saver/{}/{}", self.0, self.1) - } else { - write!(f, "data/{}/{}", self.0, self.1) - } - } -} - -#[derive(Serialize, Deserialize)] -pub struct CachedImage { - pub data: Vec, - pub content_type: Option>, - pub content_length: Option>, - pub last_modified: Option>, -} - -impl CachedImage { - #[inline] - fn len(&self) -> usize { - self.data.capacity() - + self - .content_type - .as_ref() - .map(Vec::capacity) - .unwrap_or_default() - + self - .content_length - .as_ref() - .map(Vec::capacity) - .unwrap_or_default() - + self - .last_modified - .as_ref() - .map(Vec::capacity) - .unwrap_or_default() - } - - #[inline] - fn shrink_to_fit(&mut self) { - self.data.shrink_to_fit(); - self.content_length.as_mut().map(Vec::shrink_to_fit); - self.last_modified.as_mut().map(Vec::shrink_to_fit); - } -} - -pub struct Cache { - in_memory: LruCache, - memory_max_size: usize, - memory_cur_size: usize, - - on_disk: LruCache, - disk_path: PathBuf, - disk_max_size: usize, - disk_cur_size: usize, -} - -impl Cache { - pub fn new(memory_max_size: usize, disk_max_size: usize, disk_path: PathBuf) -> Self { - Self { - in_memory: LruCache::unbounded(), - memory_max_size, - memory_cur_size: 0, - on_disk: LruCache::unbounded(), - disk_path, - disk_max_size, - disk_cur_size: 0, - } - } - - pub async fn get(&mut self, key: &CacheKey) -> Option<&CachedImage> { - if self.in_memory.contains(key) { - return self.in_memory.get(key); - } - - if let Some(disk_key) = self.on_disk.pop(key) { - // extract from disk, if it exists - let bytes = cacache::read_hash(&self.disk_path, &disk_key).await.ok()?; - // We don't particularly care if we fail to delete from disk since - // if it's an error it means it's already been dropped. - cacache::remove_hash(&self.disk_path, &disk_key).await.ok(); - self.disk_cur_size -= bytes.len(); - let cached_image: CachedImage = match bincode::deserialize(&bytes) { - Ok(image) => image, - Err(e) => { - warn!("Failed to serialize on-disk data?! {}", e); - return None; - } - }; - - // put into in-memory - self.put(key.clone(), cached_image).await; - } - - None - } - - pub async fn put(&mut self, key: CacheKey, mut image: CachedImage) { - image.shrink_to_fit(); - let mut hot_evicted = vec![]; - let new_img_size = image.len(); - - if self.memory_max_size >= new_img_size { - // Evict oldest entires to make space for new image. - while new_img_size + self.memory_cur_size > self.memory_max_size { - match self.in_memory.pop_lru() { - Some((key, evicted_image)) => { - self.memory_cur_size -= evicted_image.len(); - hot_evicted.push((key, evicted_image)); - } - None => unreachable!(concat!( - "Invariant violated. Cache is empty but we already ", - "guaranteed we can remove items from cache to make space." - )), - } - } - - self.in_memory.put(key, image); - self.memory_cur_size += new_img_size; - } else { - // Image was larger than memory capacity, push directly into cold - // storage. - self.push_into_cold(key, image).await; - } - - // Push evicted hot entires into cold storage. - for (key, image) in hot_evicted { - self.push_into_cold(key, image).await; - } - } - - async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage) { - let image = bincode::serialize(&image).unwrap(); // This should never fail. - let new_img_size = image.len(); - let mut to_drop = vec![]; - - if self.disk_max_size >= new_img_size { - // Add images to drop from cold cache into a queue - while new_img_size + self.disk_cur_size > self.disk_max_size { - match self.on_disk.pop_lru() { - Some((key, disk_key)) => { - // Basically, the assumption here is that if the meta - // data was deleted, we can't assume that deleting the - // value will yield any free space back, so we assume a - // size of zero while this is the case. This also means - // that we automatically drop broken links as well. - let on_disk_size = cacache::metadata(&self.disk_path, key.to_string()) - .await - .unwrap_or_default() - .map(|metadata| metadata.size) - .unwrap_or_default(); - self.disk_cur_size -= on_disk_size; - - to_drop.push(disk_key); - } - None => unreachable!(concat!( - "Invariant violated. Cache is empty but we already ", - "guaranteed we can remove items from cache to make space." - )), - } - } - } else { - warn!( - "Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}", - key - ); - return; - } - - let mut futs = vec![]; - for key in &to_drop { - futs.push(cacache::remove_hash(&self.disk_path, key)); - } - // Run all cold caching in parallel, we don't care if the removal fails - // because it just means it's already been removed. - join_all(futs).await; - - let new_disk_key = match cacache::write(&self.disk_path, key.to_string(), image).await { - Ok(key) => key, - Err(e) => { - warn!( - "failed to write to disk cache, dropping value instead: {}", - e - ); - return; - } - }; - self.on_disk.put(key.clone(), new_disk_key); - self.disk_cur_size += new_img_size; - } -} diff --git a/src/fs.rs b/src/cache/fs.rs similarity index 90% rename from src/fs.rs rename to src/cache/fs.rs index 4aae4b8..ce7ee65 100644 --- a/src/fs.rs +++ b/src/cache/fs.rs @@ -35,15 +35,15 @@ static WRITING_STATUS: Lazy>>> = Lazy::new(|| RwLock::new(HashMap::new())); /// Tries to read from the file, returning a byte stream if it exists -pub async fn read_file( - path: &Path, -) -> Option>, std::io::Error>> { +pub async fn read_file(path: &Path) -> Option> { if path.exists() { - if let Some(status) = WRITING_STATUS.read().get(path) { - Some(FromFsStream::new(path, Arc::clone(status)).await) - } else { - Some(FromFsStream::new(path, Arc::new(CacheStatus::done())).await) - } + let status = WRITING_STATUS + .read() + .get(path) + .map(Arc::clone) + .unwrap_or_else(|| Arc::new(CacheStatus::done())); + + Some(FromFsStream::new(path, status).await) } else { None } @@ -54,7 +54,7 @@ pub async fn read_file( pub async fn transparent_file_stream( path: &Path, mut byte_stream: impl Stream> + Unpin + Send + 'static, -) -> Result>, std::io::Error> { +) -> Result { let done_writing_flag = Arc::new(CacheStatus::new()); let mut file = { @@ -73,13 +73,16 @@ pub async fn transparent_file_stream( while let Some(bytes) = byte_stream.next().await { match bytes { Ok(bytes) => file.write_all(&bytes).await?, - Err(_) => was_errored = true, + Err(_) => { + was_errored = true; + break; + } } } if was_errored { // It's ok if the deleting the file fails, since we truncate on - // create anyways + // create anyways, but it should be best effort let _ = remove_file(&path_buf).await; } else { file.flush().await?; @@ -103,7 +106,7 @@ pub async fn transparent_file_stream( Ok(FromFsStream::new(path, done_writing_flag).await?) } -struct FromFsStream { +pub struct FromFsStream { file: Pin>, sleep: Pin>, is_file_done_writing: Arc, diff --git a/src/cache/generational.rs b/src/cache/generational.rs new file mode 100644 index 0000000..6628ec2 --- /dev/null +++ b/src/cache/generational.rs @@ -0,0 +1,222 @@ +use std::path::PathBuf; + +use async_trait::async_trait; +use bytes::Bytes; +use log::{debug, warn}; +use lru::LruCache; +use tokio::fs::{remove_file, File}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use super::{Cache, CacheKey, CachedImage, ImageMetadata}; + +pub struct GenerationalCache { + in_memory: LruCache, + memory_max_size: usize, + memory_cur_size: usize, + + on_disk: LruCache, + disk_path: PathBuf, + disk_max_size: usize, + disk_cur_size: usize, +} + +impl GenerationalCache { + pub fn new(memory_max_size: usize, disk_max_size: usize, disk_path: PathBuf) -> Self { + Self { + in_memory: LruCache::unbounded(), + memory_max_size, + memory_cur_size: 0, + + on_disk: LruCache::unbounded(), + disk_path, + disk_max_size, + disk_cur_size: 0, + } + } + + async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) { + let new_img_size = image.0.len(); + let mut to_drop = vec![]; + + if self.disk_max_size >= new_img_size { + // Add images to drop from cold cache into a queue + while new_img_size + self.disk_cur_size > self.disk_max_size { + match self.on_disk.pop_lru() { + Some((key, _)) => { + let mut path = self.disk_path.clone(); + path.push(PathBuf::from(key)); + let async_file = File::open(&path).await; + + // Basically, the assumption here is that if the meta + // data was deleted, we can't assume that deleting the + // value will yield any free space back, so we assume a + // size of zero while this is the case. This also means + // that we automatically drop broken links as well. + if let Ok(file) = async_file { + let file_size = file + .into_std() + .await + .metadata() + .map(|metadata| { + #[cfg(target_os = "windows")] + { + use std::os::windows::fs::MetadataExt; + metadata.file_size() + } + + #[cfg(target_os = "linux")] + { + use std::os::unix::fs::MetadataExt; + metadata.size() + } + }) + .unwrap_or_default(); + + self.disk_cur_size -= file_size as usize; + } + + to_drop.push(path); + } + None => unreachable!(concat!( + "Invariant violated. Cache is empty but we already ", + "guaranteed we can remove items from cache to make space." + )), + } + } + } else { + warn!( + "Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}", + key + ); + return; + } + + // Run all cold caching in parallel, we don't care if the removal fails + // because it just means it's already been removed. + // + // We also don't care when it happens, so just spawn tasks to do them + // later, whenever is convenient. + for to_drop in to_drop { + tokio::spawn(remove_file(to_drop)); + } + + let new_key_path = { + let mut root = self.disk_path.clone(); + root.push(PathBuf::from(&key)); + root + }; + + match File::open(&new_key_path).await { + Ok(mut file) => { + debug!("Starting to write to file: {:?}", &new_key_path); + match file.write_all(&image.0).await { + Ok(_) => { + self.on_disk.put(key, metadata); + self.disk_cur_size += new_img_size; + debug!( + "Successfully written data to disk for file {:?}", + new_key_path + ); + } + Err(e) => { + warn!("Failed to write to {:?}: {}", new_key_path, e); + } + } + } + Err(e) => { + warn!("Failed to open {:?}: {}", new_key_path, e); + } + } + } +} + +#[async_trait] +impl Cache for GenerationalCache { + async fn get(&mut self, key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> { + if self.in_memory.contains(key) { + return self.in_memory.get(key); + } + + if let Some(metadata) = self.on_disk.pop(key) { + let new_key_path = { + let mut root = self.disk_path.clone(); + root.push(PathBuf::from(key)); + root + }; + + // extract from disk, if it exists + let file = File::open(new_key_path.clone()).await; + + let mut buffer = if let Some(size) = metadata.content_length { + Vec::with_capacity(size) + } else { + Vec::new() + }; + + match file { + Ok(mut file) => { + match file.read_to_end(&mut buffer).await { + Ok(_) => { + // We don't particularly care if we fail to delete from disk since + // if it's an error it means it's already been dropped. + tokio::spawn(remove_file(new_key_path.clone())); + } + Err(e) => { + warn!("Failed to read from {:?}: {}", new_key_path, e); + } + } + } + Err(e) => { + warn!("Failed to open {:?}: {}", new_key_path, e); + return None; + } + } + + buffer.shrink_to_fit(); + + self.disk_cur_size -= buffer.len(); + let image = CachedImage(Bytes::from(buffer)); + + // Since we just put it in the in-memory cache it should be there + // when we retrieve it + self.put(key.clone(), image, metadata).await; + return self.get(key).await; + } + + None + } + + #[inline] + async fn put(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) { + let mut hot_evicted = vec![]; + let new_img_size = image.0.len(); + + if self.memory_max_size >= new_img_size { + // Evict oldest entires to make space for new image. + while new_img_size + self.memory_cur_size > self.memory_max_size { + match self.in_memory.pop_lru() { + Some((key, (image, metadata))) => { + self.memory_cur_size -= image.0.len(); + hot_evicted.push((key, image, metadata)); + } + None => unreachable!(concat!( + "Invariant violated. Cache is empty but we already ", + "guaranteed we can remove items from cache to make space." + )), + } + } + + self.in_memory.put(key, (image, metadata)); + self.memory_cur_size += new_img_size; + } else { + // Image was larger than memory capacity, push directly into cold + // storage. + self.push_into_cold(key, image, metadata).await; + }; + + // Push evicted hot entires into cold storage. + for (key, image, metadata) in hot_evicted { + self.push_into_cold(key, image, metadata).await; + } + } +} diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs new file mode 100644 index 0000000..7e2c572 --- /dev/null +++ b/src/cache/low_mem.rs @@ -0,0 +1,46 @@ +//! Low memory caching stuff + +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::Stream; +use lru::LruCache; + +use super::{fs::FromFsStream, Cache, CacheKey}; + +pub struct LowMemCache { + on_disk: LruCache, + disk_path: PathBuf, + disk_max_size: usize, + disk_cur_size: usize, +} + +impl LowMemCache { + pub fn new(disk_max_size: usize, disk_path: PathBuf) -> Self { + Self { + on_disk: LruCache::unbounded(), + disk_path, + disk_max_size, + disk_cur_size: 0, + } + } +} + +#[async_trait] +impl Cache for LowMemCache { + async fn get_stream(&mut self, key: &CacheKey) -> Option> { + if self.on_disk.get(key).is_some() { + super::fs::read_file(&Path::new(&key.to_string())).await + } else { + None + } + } + + async fn put_stream( + &mut self, + key: CacheKey, + image: impl Stream> + Unpin + Send + 'static, + ) { + } +} diff --git a/src/cache/mod.rs b/src/cache/mod.rs new file mode 100644 index 0000000..15a166a --- /dev/null +++ b/src/cache/mod.rs @@ -0,0 +1,119 @@ +use std::fmt::Display; +use std::path::PathBuf; + +use actix_web::http::HeaderValue; +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{DateTime, FixedOffset}; +use futures::Stream; + +pub use generational::GenerationalCache; +pub use low_mem::LowMemCache; + +use self::fs::FromFsStream; + +mod fs; +mod generational; +mod low_mem; + +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct CacheKey(pub String, pub String, pub bool); + +impl Display for CacheKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.2 { + write!(f, "saver/{}/{}", self.0, self.1) + } else { + write!(f, "data/{}/{}", self.0, self.1) + } + } +} + +impl From for PathBuf { + #[inline] + fn from(key: CacheKey) -> Self { + key.to_string().into() + } +} + +impl From<&CacheKey> for PathBuf { + #[inline] + fn from(key: &CacheKey) -> Self { + key.to_string().into() + } +} + +pub struct CachedImage(pub Bytes); + +pub struct ImageMetadata { + pub content_type: Option, + pub content_length: Option, + pub last_modified: Option>, +} + +#[derive(Debug)] +pub enum ImageRequestError { + InvalidContentType, + InvalidContentLength, + InvalidLastModified, +} + +impl ImageMetadata { + pub fn new( + content_type: Option, + content_length: Option, + last_modified: Option, + ) -> Result { + Ok(Self { + content_type: content_type + .map(|v| v.to_str().map(|v| v.to_string())) + .transpose() + .map_err(|_| ImageRequestError::InvalidContentType)?, + content_length: content_length + .map(|header_val| { + header_val + .to_str() + .map_err(|_| ImageRequestError::InvalidContentLength)? + .parse() + .map_err(|_| ImageRequestError::InvalidContentLength) + }) + .transpose()?, + last_modified: last_modified + .map(|header_val| { + DateTime::parse_from_rfc2822( + header_val + .to_str() + .map_err(|_| ImageRequestError::InvalidLastModified)?, + ) + .map_err(|_| ImageRequestError::InvalidLastModified) + }) + .transpose()?, + }) + } +} + +#[async_trait] +pub trait Cache { + async fn get(&mut self, _key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> { + unimplemented!() + } + + async fn put(&mut self, _key: CacheKey, _image: CachedImage, _metadata: ImageMetadata) { + unimplemented!() + } + + async fn get_stream( + &mut self, + _key: &CacheKey, + ) -> Option> { + unimplemented!() + } + + async fn put_stream( + &mut self, + _key: CacheKey, + _image: impl Stream> + Unpin + Send + 'static, + ) { + unimplemented!() + } +} diff --git a/src/config.rs b/src/config.rs index eeb44f5..0d4d228 100644 --- a/src/config.rs +++ b/src/config.rs @@ -34,4 +34,6 @@ pub struct CliArgs { /// reasons. #[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)] pub enable_server_string: bool, + #[clap(short, long, conflicts_with("memory_quota"))] + pub low_memory: bool, } diff --git a/src/main.rs b/src/main.rs index 279282d..e5594d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::{num::ParseIntError, sync::atomic::Ordering}; use actix_web::rt::{spawn, time, System}; use actix_web::web::{self, Data}; use actix_web::{App, HttpServer}; -use cache::Cache; +use cache::GenerationalCache; use clap::Clap; use config::CliArgs; use log::{debug, error, warn, LevelFilter}; @@ -25,7 +25,6 @@ use thiserror::Error; mod cache; mod config; -mod fs; mod ping; mod routes; mod state; @@ -112,7 +111,7 @@ async fn main() -> Result<(), std::io::Error> { .service(routes::token_data_saver) .route("{tail:.*}", web::get().to(routes::default)) .app_data(Data::from(Arc::clone(&data_1))) - .app_data(Data::new(Mutex::new(Cache::new( + .app_data(Data::new(Mutex::new(GenerationalCache::new( memory_max_size, disk_quota, cache_path.clone(), diff --git a/src/routes.rs b/src/routes.rs index 7382431..17aa9c7 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -18,7 +18,7 @@ use serde::Deserialize; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use thiserror::Error; -use crate::cache::{Cache, CacheKey, CachedImage}; +use crate::cache::{Cache, CacheKey, CachedImage, GenerationalCache, ImageMetadata}; use crate::client_api_version; use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS}; use crate::state::RwLockServerState; @@ -52,7 +52,7 @@ impl Responder for ServerResponse { #[get("/{token}/data/{chapter_hash}/{file_name}")] async fn token_data( state: Data, - cache: Data>, + cache: Data>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -68,7 +68,7 @@ async fn token_data( #[get("/{token}/data-saver/{chapter_hash}/{file_name}")] async fn token_data_saver( state: Data, - cache: Data>, + cache: Data>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -175,15 +175,15 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder { async fn fetch_image( state: Data, - cache: Data>, + cache: Data>, chapter_hash: String, file_name: String, is_data_saver: bool, ) -> ServerResponse { let key = CacheKey(chapter_hash, file_name, is_data_saver); - if let Some(cached) = cache.lock().get(&key).await { - return construct_response(cached); + if let Some((image, metadata)) = cache.lock().get(&key).await { + return construct_response(image, metadata); } // It's important to not get a write lock before this request, else we're @@ -207,7 +207,7 @@ async fn fetch_image( .await; match resp { - Ok(resp) => { + Ok(mut resp) => { let content_type = resp.headers().get(CONTENT_TYPE); let is_image = content_type @@ -230,24 +230,21 @@ async fn fetch_image( ); } - let headers = resp.headers(); - let content_type = headers.get(CONTENT_TYPE).map(AsRef::as_ref).map(Vec::from); - let content_length = headers - .get(CONTENT_LENGTH) - .map(AsRef::as_ref) - .map(Vec::from); - let last_modified = headers.get(LAST_MODIFIED).map(AsRef::as_ref).map(Vec::from); + let (content_type, length, last_mod) = { + let headers = resp.headers_mut(); + ( + headers.remove(CONTENT_TYPE), + headers.remove(CONTENT_LENGTH), + headers.remove(LAST_MODIFIED), + ) + }; let body = resp.bytes().await; match body { Ok(bytes) => { - let cached = CachedImage { - data: bytes.to_vec(), - content_type, - content_length, - last_modified, - }; - let resp = construct_response(&cached); - cache.lock().put(key, cached).await; + let cached = ImageMetadata::new(content_type, length, last_mod).unwrap(); + let image = CachedImage(bytes); + let resp = construct_response(&image, &cached); + cache.lock().put(key, image, cached).await; return resp; } Err(e) => { @@ -267,22 +264,22 @@ async fn fetch_image( } } -fn construct_response(cached: &CachedImage) -> ServerResponse { +fn construct_response(cached: &CachedImage, metadata: &ImageMetadata) -> ServerResponse { let data: Vec> = cached - .data + .0 .to_vec() .chunks(1460) // TCP MSS default size .map(|v| Ok(Bytes::from(v.to_vec()))) .collect(); let mut resp = HttpResponse::Ok(); - if let Some(content_type) = &cached.content_type { + if let Some(content_type) = &metadata.content_type { resp.append_header((CONTENT_TYPE, &**content_type)); } - if let Some(content_length) = &cached.content_length { - resp.append_header((CONTENT_LENGTH, &**content_length)); + if let Some(content_length) = &metadata.content_length { + resp.append_header((CONTENT_LENGTH, content_length.to_string())); } - if let Some(last_modified) = &cached.last_modified { - resp.append_header((LAST_MODIFIED, &**last_modified)); + if let Some(last_modified) = &metadata.last_modified { + resp.append_header((LAST_MODIFIED, last_modified.to_rfc2822())); } ServerResponse::HttpResponse(push_headers(&mut resp).streaming(stream::iter(data)))