Compare commits

..

2 commits

Author SHA1 Message Date
8949e41bee
Run clippy 2021-04-14 22:52:54 -04:00
c75b5063af
remove cacahe, use disk cache instead 2021-04-14 22:11:00 -04:00
11 changed files with 502 additions and 626 deletions

382
Cargo.lock generated
View file

@ -234,140 +234,16 @@ dependencies = [
]
[[package]]
name = "async-channel"
version = "1.6.1"
name = "async-trait"
version = "0.1.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
checksum = "36ea56748e10732c49404c153638a15ec3d6211ec5ff35d9bb20e13b93576adf"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-executor"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb877970c7b440ead138f6321a3b5395d6061183af779340b65e20c0fede9146"
dependencies = [
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"once_cell",
"vec-arena",
]
[[package]]
name = "async-global-executor"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6"
dependencies = [
"async-channel",
"async-executor",
"async-io",
"async-mutex",
"blocking",
"futures-lite",
"num_cpus",
"once_cell",
]
[[package]]
name = "async-io"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9315f8f07556761c3e48fec2e6b276004acf426e6dc068b2c2251854d65ee0fd"
dependencies = [
"concurrent-queue",
"fastrand",
"futures-lite",
"libc",
"log",
"nb-connect",
"once_cell",
"parking",
"polling",
"vec-arena",
"waker-fn",
"winapi",
]
[[package]]
name = "async-lock"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb"
dependencies = [
"event-listener",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-process"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef37b86e2fa961bae5a4d212708ea0154f904ce31d1a4a7f47e1bbc33a0c040b"
dependencies = [
"async-io",
"blocking",
"cfg-if",
"event-listener",
"futures-lite",
"once_cell",
"signal-hook",
"winapi",
]
[[package]]
name = "async-std"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9f06685bad74e0570f5213741bea82158279a4103d988e57bfada11ad230341"
dependencies = [
"async-channel",
"async-global-executor",
"async-io",
"async-lock",
"async-process",
"crossbeam-utils",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log",
"memchr",
"num_cpus",
"once_cell",
"pin-project-lite",
"pin-utils",
"slab",
"wasm-bindgen-futures",
]
[[package]]
name = "async-task"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]]
name = "atty"
version = "0.2.14"
@ -452,20 +328,6 @@ dependencies = [
"byte-tools",
]
[[package]]
name = "blocking"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9"
dependencies = [
"async-channel",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"once_cell",
]
[[package]]
name = "brotli-sys"
version = "0.3.2"
@ -519,35 +381,6 @@ dependencies = [
"bytes",
]
[[package]]
name = "cacache"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159f24b3d31d4498536a317776f6d766c0cbf22a4627a607d03ec8903d910c80"
dependencies = [
"async-std",
"digest 0.8.1",
"either",
"futures",
"hex 0.4.3",
"memmap",
"serde",
"serde_derive",
"serde_json",
"sha-1 0.8.2",
"sha2",
"ssri",
"tempfile",
"thiserror",
"walkdir",
]
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]]
name = "cc"
version = "1.0.67"
@ -618,15 +451,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]]
name = "const_fn"
version = "0.4.6"
@ -681,27 +505,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [
"autocfg",
"cfg-if",
"lazy_static",
]
[[package]]
name = "ctor"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e98e2ad1a782e33928b96fc3948e7c355e5af34ba4de7670fe8bac2a3b2006d"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "ctrlc"
version = "3.1.8"
@ -712,6 +515,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
]
[[package]]
name = "derive_more"
version = "0.99.13"
@ -769,27 +582,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fastrand"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3"
dependencies = [
"instant",
]
[[package]]
name = "flate2"
version = "1.0.20"
@ -881,21 +679,6 @@ version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59"
[[package]]
name = "futures-lite"
version = "1.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4481d0cd0de1d204a4fa55e7d45f07b1d958abcb06714b3446438e2eff695fb"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.13"
@ -970,19 +753,6 @@ dependencies = [
"wasi",
]
[[package]]
name = "gloo-timers"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "h2"
version = "0.3.2"
@ -1035,12 +805,6 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "http"
version = "0.2.4"
@ -1163,15 +927,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log",
]
[[package]]
name = "language-tags"
version = "0.2.2"
@ -1235,7 +990,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if",
"value-bag",
]
[[package]]
@ -1252,13 +1006,14 @@ name = "mangadex-home"
version = "0.2.1"
dependencies = [
"actix-web",
"async-trait",
"base64 0.13.0",
"bincode",
"bytes",
"cacache",
"chrono",
"clap",
"ctrlc",
"dashmap",
"dotenv",
"futures",
"log",
@ -1289,16 +1044,6 @@ version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "mime"
version = "0.3.16"
@ -1355,16 +1100,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nb-connect"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d"
dependencies = [
"libc",
"socket2",
]
[[package]]
name = "nix"
version = "0.20.0"
@ -1472,12 +1207,6 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.11.1"
@ -1547,19 +1276,6 @@ version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
[[package]]
name = "polling"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fc12d774e799ee9ebae13f4076ca003b40d18a11ac0f3641e6f899618580b7b"
dependencies = [
"cfg-if",
"libc",
"log",
"wepoll-sys",
"winapi",
]
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -1773,15 +1489,6 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.19"
@ -1932,16 +1639,6 @@ dependencies = [
"opaque-debug 0.2.3",
]
[[package]]
name = "signal-hook"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef33d6d0cd06e0840fba9985aab098c147e67e05cee14d412d3345ed14ff30ac"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]]
name = "signal-hook-registry"
version = "1.3.0"
@ -2011,7 +1708,7 @@ checksum = "e875e58f12c33f832d09cff94b079aee84a48d8188d7a7132b3434358afb70c9"
dependencies = [
"base64 0.10.1",
"digest 0.8.1",
"hex 0.3.2",
"hex",
"serde",
"serde_derive",
"sha-1 0.8.2",
@ -2379,27 +2076,12 @@ dependencies = [
"serde",
]
[[package]]
name = "value-bag"
version = "1.0.0-alpha.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b676010e055c99033117c2343b33a40a30b91fecd6c49055ac9cd2d6c305ab1"
dependencies = [
"ctor",
]
[[package]]
name = "vcpkg"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb"
[[package]]
name = "vec-arena"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34b2f665b594b07095e3ac3f718e13c2197143416fae4c5706cffb7b1af8d7f1"
[[package]]
name = "vec_map"
version = "0.8.2"
@ -2412,23 +2094,6 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.0"
@ -2542,15 +2207,6 @@ dependencies = [
"webpki",
]
[[package]]
name = "wepoll-sys"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fcb14dea929042224824779fbc82d9fab8d2e6d3cbc0ac404de8edf489e77ff"
dependencies = [
"cc",
]
[[package]]
name = "winapi"
version = "0.3.9"

View file

@ -10,13 +10,14 @@ repository = "https://github.com/edward-shen/mangadex-home-rs"
[dependencies]
actix-web = { version = "4.0.0-beta.4", features = [ "rustls" ] }
async-trait = "0.1"
base64 = "0.13"
bincode = "1"
bytes = "1"
cacache = "8"
chrono = { version = "0.4", features = [ "serde" ] }
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
ctrlc = "3"
dashmap = "4"
dotenv = "0.15"
futures = "0.3"
once_cell = "1"

View file

@ -1,203 +0,0 @@
use std::{fmt::Display, path::PathBuf};
use futures::future::join_all;
use log::warn;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use ssri::Integrity;
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct CacheKey(pub String, pub String, pub bool);
impl Display for CacheKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.2 {
write!(f, "saver/{}/{}", self.0, self.1)
} else {
write!(f, "data/{}/{}", self.0, self.1)
}
}
}
#[derive(Serialize, Deserialize)]
pub struct CachedImage {
pub data: Vec<u8>,
pub content_type: Option<Vec<u8>>,
pub content_length: Option<Vec<u8>>,
pub last_modified: Option<Vec<u8>>,
}
impl CachedImage {
#[inline]
fn len(&self) -> usize {
self.data.capacity()
+ self
.content_type
.as_ref()
.map(Vec::capacity)
.unwrap_or_default()
+ self
.content_length
.as_ref()
.map(Vec::capacity)
.unwrap_or_default()
+ self
.last_modified
.as_ref()
.map(Vec::capacity)
.unwrap_or_default()
}
#[inline]
fn shrink_to_fit(&mut self) {
self.data.shrink_to_fit();
self.content_length.as_mut().map(Vec::shrink_to_fit);
self.last_modified.as_mut().map(Vec::shrink_to_fit);
}
}
pub struct Cache {
in_memory: LruCache<CacheKey, CachedImage>,
memory_max_size: usize,
memory_cur_size: usize,
on_disk: LruCache<CacheKey, Integrity>,
disk_path: PathBuf,
disk_max_size: usize,
disk_cur_size: usize,
}
impl Cache {
pub fn new(memory_max_size: usize, disk_max_size: usize, disk_path: PathBuf) -> Self {
Self {
in_memory: LruCache::unbounded(),
memory_max_size,
memory_cur_size: 0,
on_disk: LruCache::unbounded(),
disk_path,
disk_max_size,
disk_cur_size: 0,
}
}
pub async fn get(&mut self, key: &CacheKey) -> Option<&CachedImage> {
if self.in_memory.contains(key) {
return self.in_memory.get(key);
}
if let Some(disk_key) = self.on_disk.pop(key) {
// extract from disk, if it exists
let bytes = cacache::read_hash(&self.disk_path, &disk_key).await.ok()?;
// We don't particularly care if we fail to delete from disk since
// if it's an error it means it's already been dropped.
cacache::remove_hash(&self.disk_path, &disk_key).await.ok();
self.disk_cur_size -= bytes.len();
let cached_image: CachedImage = match bincode::deserialize(&bytes) {
Ok(image) => image,
Err(e) => {
warn!("Failed to serialize on-disk data?! {}", e);
return None;
}
};
// put into in-memory
self.put(key.clone(), cached_image).await;
}
None
}
pub async fn put(&mut self, key: CacheKey, mut image: CachedImage) {
image.shrink_to_fit();
let mut hot_evicted = vec![];
let new_img_size = image.len();
if self.memory_max_size >= new_img_size {
// Evict oldest entires to make space for new image.
while new_img_size + self.memory_cur_size > self.memory_max_size {
match self.in_memory.pop_lru() {
Some((key, evicted_image)) => {
self.memory_cur_size -= evicted_image.len();
hot_evicted.push((key, evicted_image));
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
self.in_memory.put(key, image);
self.memory_cur_size += new_img_size;
} else {
// Image was larger than memory capacity, push directly into cold
// storage.
self.push_into_cold(key, image).await;
}
// Push evicted hot entires into cold storage.
for (key, image) in hot_evicted {
self.push_into_cold(key, image).await;
}
}
async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage) {
let image = bincode::serialize(&image).unwrap(); // This should never fail.
let new_img_size = image.len();
let mut to_drop = vec![];
if self.disk_max_size >= new_img_size {
// Add images to drop from cold cache into a queue
while new_img_size + self.disk_cur_size > self.disk_max_size {
match self.on_disk.pop_lru() {
Some((key, disk_key)) => {
// Basically, the assumption here is that if the meta
// data was deleted, we can't assume that deleting the
// value will yield any free space back, so we assume a
// size of zero while this is the case. This also means
// that we automatically drop broken links as well.
let on_disk_size = cacache::metadata(&self.disk_path, key.to_string())
.await
.unwrap_or_default()
.map(|metadata| metadata.size)
.unwrap_or_default();
self.disk_cur_size -= on_disk_size;
to_drop.push(disk_key);
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
} else {
warn!(
"Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}",
key
);
return;
}
let mut futs = vec![];
for key in &to_drop {
futs.push(cacache::remove_hash(&self.disk_path, key));
}
// Run all cold caching in parallel, we don't care if the removal fails
// because it just means it's already been removed.
join_all(futs).await;
let new_disk_key = match cacache::write(&self.disk_path, key.to_string(), image).await {
Ok(key) => key,
Err(e) => {
warn!(
"failed to write to disk cache, dropping value instead: {}",
e
);
return;
}
};
self.on_disk.put(key.clone(), new_disk_key);
self.disk_cur_size += new_img_size;
}
}

View file

@ -28,22 +28,21 @@ use tokio::time::Sleep;
/// up to Client A's request, then Client B could receive a broken image, as it
/// thinks it's done reading the file.
///
/// We effectively use WRITING_STATUS as a status relay to ensure concurrent
/// We effectively use `WRITING_STATUS` as a status relay to ensure concurrent
/// reads to the file while it's being written to will wait for writing to be
/// completed.
static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Arc<CacheStatus>>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
/// Tries to read from the file, returning a byte stream if it exists
pub async fn read_file(
path: &Path,
) -> Option<Result<impl Stream<Item = Result<Bytes, UpstreamError>>, std::io::Error>> {
pub async fn read_file(path: &Path) -> Option<Result<FromFsStream, std::io::Error>> {
if path.exists() {
if let Some(status) = WRITING_STATUS.read().get(path) {
Some(FromFsStream::new(path, Arc::clone(status)).await)
} else {
Some(FromFsStream::new(path, Arc::new(CacheStatus::done())).await)
}
let status = WRITING_STATUS
.read()
.get(path)
.map_or_else(|| Arc::new(CacheStatus::done()), Arc::clone);
Some(FromFsStream::new(path, status).await)
} else {
None
}
@ -54,7 +53,7 @@ pub async fn read_file(
pub async fn transparent_file_stream(
path: &Path,
mut byte_stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static,
) -> Result<impl Stream<Item = Result<Bytes, UpstreamError>>, std::io::Error> {
) -> Result<FromFsStream, std::io::Error> {
let done_writing_flag = Arc::new(CacheStatus::new());
let mut file = {
@ -69,17 +68,19 @@ pub async fn transparent_file_stream(
let path_buf = path.to_path_buf();
tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async
let mut was_errored = false;
let mut errored = false;
while let Some(bytes) = byte_stream.next().await {
match bytes {
Ok(bytes) => file.write_all(&bytes).await?,
Err(_) => was_errored = true,
if let Ok(bytes) = bytes {
file.write_all(&bytes).await?
} else {
errored = true;
break;
}
}
if was_errored {
if errored {
// It's ok if the deleting the file fails, since we truncate on
// create anyways
// create anyways, but it should be best effort
let _ = remove_file(&path_buf).await;
} else {
file.flush().await?;
@ -89,7 +90,7 @@ pub async fn transparent_file_stream(
let mut write_lock = WRITING_STATUS.write();
// This needs to be written atomically with the write lock, else
// it's possible we have an inconsistent state
if was_errored {
if errored {
write_flag.store(WritingStatus::Error);
} else {
write_flag.store(WritingStatus::Done);
@ -103,7 +104,7 @@ pub async fn transparent_file_stream(
Ok(FromFsStream::new(path, done_writing_flag).await?)
}
struct FromFsStream {
pub struct FromFsStream {
file: Pin<Box<File>>,
sleep: Pin<Box<Sleep>>,
is_file_done_writing: Arc<CacheStatus>,

220
src/cache/generational.rs vendored Normal file
View file

@ -0,0 +1,220 @@
use std::path::PathBuf;
use async_trait::async_trait;
use bytes::Bytes;
use log::{debug, warn};
use lru::LruCache;
use tokio::fs::{remove_file, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::{Cache, CacheKey, CachedImage, ImageMetadata};
pub struct GenerationalCache {
in_memory: LruCache<CacheKey, (CachedImage, ImageMetadata)>,
memory_max_size: u64,
memory_cur_size: u64,
on_disk: LruCache<CacheKey, ImageMetadata>,
disk_path: PathBuf,
disk_max_size: u64,
disk_cur_size: u64,
}
impl GenerationalCache {
pub fn new(memory_max_size: u64, disk_max_size: u64, disk_path: PathBuf) -> Self {
Self {
in_memory: LruCache::unbounded(),
memory_max_size,
memory_cur_size: 0,
on_disk: LruCache::unbounded(),
disk_path,
disk_max_size,
disk_cur_size: 0,
}
}
async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) {
let new_img_size = image.0.len();
let mut to_drop = vec![];
if self.disk_max_size >= new_img_size as u64 {
// Add images to drop from cold cache into a queue
while new_img_size as u64 + self.disk_cur_size > self.disk_max_size {
match self.on_disk.pop_lru() {
Some((key, _)) => {
let mut path = self.disk_path.clone();
path.push(PathBuf::from(key));
let async_file = File::open(&path).await;
// Basically, the assumption here is that if the meta
// data was deleted, we can't assume that deleting the
// value will yield any free space back, so we assume a
// size of zero while this is the case. This also means
// that we automatically drop broken links as well.
if let Ok(file) = async_file {
let file_size = file
.into_std()
.await
.metadata()
.map(|metadata| {
#[cfg(target_os = "windows")]
{
use std::os::windows::fs::MetadataExt;
metadata.file_size()
}
#[cfg(target_os = "linux")]
{
use std::os::unix::fs::MetadataExt;
metadata.size()
}
})
.unwrap_or_default();
self.disk_cur_size -= file_size;
}
to_drop.push(path);
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
} else {
warn!(
"Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}",
key
);
return;
}
// Run all cold caching in parallel, we don't care if the removal fails
// because it just means it's already been removed.
//
// We also don't care when it happens, so just spawn tasks to do them
// later, whenever is convenient.
for to_drop in to_drop {
tokio::spawn(remove_file(to_drop));
}
let new_key_path = {
let mut root = self.disk_path.clone();
root.push(PathBuf::from(key.clone()));
root
};
match File::open(&new_key_path).await {
Ok(mut file) => {
debug!("Starting to write to file: {:?}", &new_key_path);
match file.write_all(&image.0).await {
Ok(_) => {
self.on_disk.put(key, metadata);
self.disk_cur_size += new_img_size as u64;
debug!(
"Successfully written data to disk for file {:?}",
new_key_path
);
}
Err(e) => {
warn!("Failed to write to {:?}: {}", new_key_path, e);
}
}
}
Err(e) => {
warn!("Failed to open {:?}: {}", new_key_path, e);
}
}
}
}
#[async_trait]
impl Cache for GenerationalCache {
async fn get(&mut self, key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> {
if self.in_memory.contains(key) {
return self.in_memory.get(key);
}
if let Some(metadata) = self.on_disk.pop(key) {
let new_key_path = {
let mut root = self.disk_path.clone();
root.push(PathBuf::from(key.clone()));
root
};
// extract from disk, if it exists
let file = File::open(&new_key_path).await;
let mut buffer = metadata
.content_length
.map_or_else(Vec::new, Vec::with_capacity);
match file {
Ok(mut file) => {
match file.read_to_end(&mut buffer).await {
Ok(_) => {
// We don't particularly care if we fail to delete from disk since
// if it's an error it means it's already been dropped.
tokio::spawn(remove_file(new_key_path));
}
Err(e) => {
warn!("Failed to read from {:?}: {}", new_key_path, e);
}
}
}
Err(e) => {
warn!("Failed to open {:?}: {}", new_key_path, e);
return None;
}
}
buffer.shrink_to_fit();
self.disk_cur_size -= buffer.len() as u64;
let image = CachedImage(Bytes::from(buffer));
// Since we just put it in the in-memory cache it should be there
// when we retrieve it
self.put(key.clone(), image, metadata).await;
return self.get(key).await;
}
None
}
#[inline]
async fn put(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) {
let mut hot_evicted = vec![];
let new_img_size = image.0.len() as u64;
if self.memory_max_size >= new_img_size {
// Evict oldest entires to make space for new image.
while new_img_size + self.memory_cur_size > self.memory_max_size {
match self.in_memory.pop_lru() {
Some((key, (image, metadata))) => {
self.memory_cur_size -= image.0.len() as u64;
hot_evicted.push((key, image, metadata));
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
self.in_memory.put(key, (image, metadata));
self.memory_cur_size += new_img_size;
} else {
// Image was larger than memory capacity, push directly into cold
// storage.
self.push_into_cold(key, image, metadata).await;
};
// Push evicted hot entires into cold storage.
for (key, image, metadata) in hot_evicted {
self.push_into_cold(key, image, metadata).await;
}
}
}

46
src/cache/low_mem.rs vendored Normal file
View file

@ -0,0 +1,46 @@
//! Low memory caching stuff
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use lru::LruCache;
use super::{fs::FromFsStream, Cache, CacheKey};
pub struct LowMemCache {
on_disk: LruCache<CacheKey, ()>,
disk_path: PathBuf,
disk_max_size: usize,
disk_cur_size: usize,
}
impl LowMemCache {
pub fn new(disk_max_size: usize, disk_path: PathBuf) -> Self {
Self {
on_disk: LruCache::unbounded(),
disk_path,
disk_max_size,
disk_cur_size: 0,
}
}
}
#[async_trait]
impl Cache for LowMemCache {
async fn get_stream(&mut self, key: &CacheKey) -> Option<Result<FromFsStream, std::io::Error>> {
if self.on_disk.get(key).is_some() {
super::fs::read_file(Path::new(&key.to_string())).await
} else {
None
}
}
async fn put_stream(
&mut self,
key: CacheKey,
image: impl Stream<Item = Result<Bytes, reqwest::Error>> + Unpin + Send + 'static,
) {
}
}

157
src/cache/mod.rs vendored Normal file
View file

@ -0,0 +1,157 @@
use std::path::PathBuf;
use std::{fmt::Display, str::FromStr};
use actix_web::http::HeaderValue;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, FixedOffset};
use futures::Stream;
pub use generational::GenerationalCache;
pub use low_mem::LowMemCache;
use self::fs::FromFsStream;
mod fs;
mod generational;
mod low_mem;
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct CacheKey(pub String, pub String, pub bool);
impl Display for CacheKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.2 {
write!(f, "saver/{}/{}", self.0, self.1)
} else {
write!(f, "data/{}/{}", self.0, self.1)
}
}
}
impl From<CacheKey> for PathBuf {
#[inline]
fn from(key: CacheKey) -> Self {
key.to_string().into()
}
}
pub struct CachedImage(pub Bytes);
#[derive(Copy, Clone)]
pub struct ImageMetadata {
pub content_type: Option<ImageContentType>,
pub content_length: Option<usize>,
pub last_modified: Option<DateTime<FixedOffset>>,
}
// Note to self: If these are wrong blame Triscuit 9
#[derive(Copy, Clone)]
pub enum ImageContentType {
Png,
Jpeg,
Gif,
Bmp,
Tif,
}
pub struct InvalidContentType;
impl FromStr for ImageContentType {
type Err = InvalidContentType;
#[inline]
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"image/png" => Ok(Self::Png),
"image/jpeg" => Ok(Self::Jpeg),
"image/gif" => Ok(Self::Gif),
"image/bmp" => Ok(Self::Bmp),
"image/tif" => Ok(Self::Tif),
_ => Err(InvalidContentType),
}
}
}
impl AsRef<str> for ImageContentType {
#[inline]
fn as_ref(&self) -> &str {
match self {
Self::Png => "image/png",
Self::Jpeg => "image/jpeg",
Self::Gif => "image/gif",
Self::Bmp => "image/bmp",
Self::Tif => "image/tif",
}
}
}
#[derive(Debug)]
pub enum ImageRequestError {
InvalidContentType,
InvalidContentLength,
InvalidLastModified,
}
impl ImageMetadata {
pub fn new(
content_type: Option<HeaderValue>,
content_length: Option<HeaderValue>,
last_modified: Option<HeaderValue>,
) -> Result<Self, ImageRequestError> {
Ok(Self {
content_type: content_type
.map(|v| match v.to_str() {
Ok(v) => ImageContentType::from_str(v),
Err(_) => Err(InvalidContentType),
})
.transpose()
.map_err(|_| ImageRequestError::InvalidContentType)?,
content_length: content_length
.map(|header_val| {
header_val
.to_str()
.map_err(|_| ImageRequestError::InvalidContentLength)?
.parse()
.map_err(|_| ImageRequestError::InvalidContentLength)
})
.transpose()?,
last_modified: last_modified
.map(|header_val| {
DateTime::parse_from_rfc2822(
header_val
.to_str()
.map_err(|_| ImageRequestError::InvalidLastModified)?,
)
.map_err(|_| ImageRequestError::InvalidLastModified)
})
.transpose()?,
})
}
}
#[async_trait]
pub trait Cache {
async fn get(&mut self, _key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> {
unimplemented!()
}
async fn put(&mut self, _key: CacheKey, _image: CachedImage, _metadata: ImageMetadata) {
unimplemented!()
}
async fn get_stream(
&mut self,
_key: &CacheKey,
) -> Option<Result<FromFsStream, std::io::Error>> {
unimplemented!()
}
async fn put_stream(
&mut self,
_key: CacheKey,
_image: impl Stream<Item = Result<Bytes, reqwest::Error>> + Unpin + Send + 'static,
) {
unimplemented!()
}
}

View file

@ -1,4 +1,4 @@
use std::num::{NonZeroU16, NonZeroUsize};
use std::num::{NonZeroU16, NonZeroU64};
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
@ -18,20 +18,22 @@ pub struct CliArgs {
/// How large, in bytes, the in-memory cache should be. Note that this does
/// not include runtime memory usage.
#[clap(long, env = "MEM_CACHE_QUOTA_BYTES")]
pub memory_quota: NonZeroUsize,
pub memory_quota: NonZeroU64,
/// How large, in bytes, the on-disk cache should be. Note that actual
/// values may be larger for metadata information.
#[clap(long, env = "DISK_CACHE_QUOTA_BYTES")]
pub disk_quota: usize,
pub disk_quota: u64,
/// Sets the location of the disk cache.
#[clap(long, default_value = "./cache", env = "DISK_CACHE_PATH")]
pub cache_path: PathBuf,
/// The network speed to advertise to Mangadex@Home control server.
#[clap(long, env = "MAX_NETWORK_SPEED")]
pub network_speed: NonZeroUsize,
pub network_speed: NonZeroU64,
/// Whether or not to provide the Server HTTP header to clients. This is
/// useful for debugging, but is generally not recommended for security
/// reasons.
#[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)]
pub enable_server_string: bool,
#[clap(short, long, conflicts_with("memory_quota"))]
pub low_memory: bool,
}

View file

@ -12,7 +12,7 @@ use std::{num::ParseIntError, sync::atomic::Ordering};
use actix_web::rt::{spawn, time, System};
use actix_web::web::{self, Data};
use actix_web::{App, HttpServer};
use cache::Cache;
use cache::GenerationalCache;
use clap::Clap;
use config::CliArgs;
use log::{debug, error, warn, LevelFilter};
@ -25,7 +25,6 @@ use thiserror::Error;
mod cache;
mod config;
mod fs;
mod ping;
mod routes;
mod state;
@ -112,7 +111,7 @@ async fn main() -> Result<(), std::io::Error> {
.service(routes::token_data_saver)
.route("{tail:.*}", web::get().to(routes::default))
.app_data(Data::from(Arc::clone(&data_1)))
.app_data(Data::new(Mutex::new(Cache::new(
.app_data(Data::new(Mutex::new(GenerationalCache::new(
memory_max_size,
disk_quota,
cache_path.clone(),

View file

@ -1,6 +1,6 @@
use std::{io::BufReader, sync::Arc};
use std::{
num::{NonZeroU16, NonZeroUsize},
num::{NonZeroU16, NonZeroU64},
sync::atomic::Ordering,
};
@ -27,9 +27,9 @@ pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
pub struct Request<'a> {
secret: &'a str,
port: NonZeroU16,
disk_space: usize,
network_speed: NonZeroUsize,
build_version: usize,
disk_space: u64,
network_speed: NonZeroU64,
build_version: u64,
tls_created_at: Option<String>,
}

View file

@ -18,7 +18,7 @@ use serde::Deserialize;
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
use thiserror::Error;
use crate::cache::{Cache, CacheKey, CachedImage};
use crate::cache::{Cache, CacheKey, CachedImage, GenerationalCache, ImageMetadata};
use crate::client_api_version;
use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS};
use crate::state::RwLockServerState;
@ -52,7 +52,7 @@ impl Responder for ServerResponse {
#[get("/{token}/data/{chapter_hash}/{file_name}")]
async fn token_data(
state: Data<RwLockServerState>,
cache: Data<Mutex<Cache>>,
cache: Data<Mutex<GenerationalCache>>,
path: Path<(String, String, String)>,
) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner();
@ -68,7 +68,7 @@ async fn token_data(
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
async fn token_data_saver(
state: Data<RwLockServerState>,
cache: Data<Mutex<Cache>>,
cache: Data<Mutex<GenerationalCache>>,
path: Path<(String, String, String)>,
) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner();
@ -175,15 +175,15 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
async fn fetch_image(
state: Data<RwLockServerState>,
cache: Data<Mutex<Cache>>,
cache: Data<Mutex<GenerationalCache>>,
chapter_hash: String,
file_name: String,
is_data_saver: bool,
) -> ServerResponse {
let key = CacheKey(chapter_hash, file_name, is_data_saver);
if let Some(cached) = cache.lock().get(&key).await {
return construct_response(cached);
if let Some((image, metadata)) = cache.lock().get(&key).await {
return construct_response(image, metadata);
}
// It's important to not get a write lock before this request, else we're
@ -207,7 +207,7 @@ async fn fetch_image(
.await;
match resp {
Ok(resp) => {
Ok(mut resp) => {
let content_type = resp.headers().get(CONTENT_TYPE);
let is_image = content_type
@ -230,24 +230,21 @@ async fn fetch_image(
);
}
let headers = resp.headers();
let content_type = headers.get(CONTENT_TYPE).map(AsRef::as_ref).map(Vec::from);
let content_length = headers
.get(CONTENT_LENGTH)
.map(AsRef::as_ref)
.map(Vec::from);
let last_modified = headers.get(LAST_MODIFIED).map(AsRef::as_ref).map(Vec::from);
let (content_type, length, last_mod) = {
let headers = resp.headers_mut();
(
headers.remove(CONTENT_TYPE),
headers.remove(CONTENT_LENGTH),
headers.remove(LAST_MODIFIED),
)
};
let body = resp.bytes().await;
match body {
Ok(bytes) => {
let cached = CachedImage {
data: bytes.to_vec(),
content_type,
content_length,
last_modified,
};
let resp = construct_response(&cached);
cache.lock().put(key, cached).await;
let cached = ImageMetadata::new(content_type, length, last_mod).unwrap();
let image = CachedImage(bytes);
let resp = construct_response(&image, &cached);
cache.lock().put(key, image, cached).await;
return resp;
}
Err(e) => {
@ -267,22 +264,22 @@ async fn fetch_image(
}
}
fn construct_response(cached: &CachedImage) -> ServerResponse {
fn construct_response(cached: &CachedImage, metadata: &ImageMetadata) -> ServerResponse {
let data: Vec<Result<Bytes, Infallible>> = cached
.data
.0
.to_vec()
.chunks(1460) // TCP MSS default size
.map(|v| Ok(Bytes::from(v.to_vec())))
.collect();
let mut resp = HttpResponse::Ok();
if let Some(content_type) = &cached.content_type {
resp.append_header((CONTENT_TYPE, &**content_type));
if let Some(content_type) = &metadata.content_type {
resp.append_header((CONTENT_TYPE, content_type.as_ref()));
}
if let Some(content_length) = &cached.content_length {
resp.append_header((CONTENT_LENGTH, &**content_length));
if let Some(content_length) = &metadata.content_length {
resp.append_header((CONTENT_LENGTH, content_length.to_string()));
}
if let Some(last_modified) = &cached.last_modified {
resp.append_header((LAST_MODIFIED, &**last_modified));
if let Some(last_modified) = &metadata.last_modified {
resp.append_header((LAST_MODIFIED, last_modified.to_rfc2822()));
}
ServerResponse::HttpResponse(push_headers(&mut resp).streaming(stream::iter(data)))