Remove cache from hot path

This commit is contained in:
Edward Shen 2021-04-22 12:44:02 -04:00
parent 53c0cb664a
commit efd6b5c60c
Signed by: edward
GPG key ID: 19182661E818369F
10 changed files with 494 additions and 108 deletions

312
Cargo.lock generated
View file

@ -234,6 +234,12 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.50" version = "0.1.50"
@ -245,6 +251,15 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "atoi"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -289,6 +304,18 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitvec"
version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.9.0" version = "0.9.0"
@ -318,12 +345,24 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "build_const"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7"
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.6.1" version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.0.1" version = "1.0.1"
@ -438,6 +477,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "crc"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
dependencies = [
"build_const",
]
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
version = "1.2.1" version = "1.2.1"
@ -447,6 +495,37 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [
"autocfg",
"cfg-if",
"lazy_static",
]
[[package]] [[package]]
name = "ctrlc" name = "ctrlc"
version = "3.1.9" version = "3.1.9"
@ -533,6 +612,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.14" version = "0.3.14"
@ -674,6 +759,15 @@ dependencies = [
"ahash 0.4.7", "ahash 0.4.7",
] ]
[[package]]
name = "hashlink"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8"
dependencies = [
"hashbrown",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.3.2" version = "0.3.2"
@ -692,6 +786,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "http" name = "http"
version = "0.2.4" version = "0.2.4"
@ -828,6 +928,19 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lexical-core"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21f866863575d0e1d654fbeeabdc927292fdf862873dc3c96c6f753357e13374"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if",
"ryu",
"static_assertions",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.93" version = "0.2.93"
@ -845,6 +958,17 @@ dependencies = [
"pkg-config", "pkg-config",
] ]
[[package]]
name = "libsqlite3-sys"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19cb1effde5f834799ac5e5ef0e40d45027cd74f271b1de786ba8abb30e2164d"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "local-channel" name = "local-channel"
version = "0.1.2" version = "0.1.2"
@ -914,6 +1038,7 @@ dependencies = [
"serde_json", "serde_json",
"simple_logger", "simple_logger",
"sodiumoxide", "sodiumoxide",
"sqlx",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@ -921,6 +1046,12 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]] [[package]]
name = "matches" name = "matches"
version = "0.1.8" version = "0.1.8"
@ -983,6 +1114,19 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"lexical-core",
"memchr",
"version_check",
]
[[package]] [[package]]
name = "ntapi" name = "ntapi"
version = "0.3.6" version = "0.3.6"
@ -1174,6 +1318,12 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.3" version = "0.8.3"
@ -1412,6 +1562,19 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
[[package]]
name = "sha2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de"
dependencies = [
"block-buffer",
"cfg-if",
"cpuid-bool",
"digest",
"opaque-debug",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.3.0" version = "1.3.0"
@ -1473,6 +1636,105 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlformat"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d86e3c77ff882a828346ba401a7ef4b8e440df804491c6064fe8295765de71c"
dependencies = [
"lazy_static",
"maplit",
"nom",
"regex",
"unicode_categories",
]
[[package]]
name = "sqlx"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d582b9bc04ec6c03084196efc42c2226b018e9941f03ee62bd88921d500917c0"
dependencies = [
"sqlx-core",
"sqlx-macros",
]
[[package]]
name = "sqlx-core"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de52d1d473cebb2abb79c886ef6a8023e965e34c0676a99cfeac2cc7f0fde4c1"
dependencies = [
"ahash 0.7.2",
"atoi",
"bitflags",
"byteorder",
"bytes",
"chrono",
"crc",
"crossbeam-channel",
"crossbeam-queue",
"crossbeam-utils",
"either",
"futures-channel",
"futures-core",
"futures-util",
"hashlink",
"hex",
"itoa",
"libc",
"libsqlite3-sys",
"log",
"memchr",
"once_cell",
"parking_lot",
"percent-encoding",
"rustls",
"sha2",
"smallvec",
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"time 0.2.26",
"tokio-stream",
"url",
"webpki",
"webpki-roots",
"whoami",
]
[[package]]
name = "sqlx-macros"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a40f0be97e704d3fbf059e7e3333c3735639146a72d586c5534c70e79da88a4"
dependencies = [
"dotenv",
"either",
"futures",
"heck",
"proc-macro2",
"quote",
"sha2",
"sqlx-core",
"sqlx-rt",
"syn",
"url",
]
[[package]]
name = "sqlx-rt"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6ae97ab05063ed515cdc23d90253213aa24dda0a288c5ec079af3d10f9771bc"
dependencies = [
"actix-rt",
"once_cell",
"tokio",
"tokio-rustls",
]
[[package]] [[package]]
name = "standback" name = "standback"
version = "0.2.17" version = "0.2.17"
@ -1482,6 +1744,12 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]] [[package]]
name = "stdweb" name = "stdweb"
version = "0.4.20" version = "0.4.20"
@ -1531,6 +1799,16 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.0" version = "0.10.0"
@ -1548,6 +1826,12 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]] [[package]]
name = "termcolor" name = "termcolor"
version = "1.1.2" version = "1.1.2"
@ -1802,6 +2086,12 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "unicode_categories"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.7.1" version = "0.7.1"
@ -1821,6 +2111,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "vcpkg"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.2" version = "0.8.2"
@ -1946,6 +2242,16 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "whoami"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4abacf325c958dfeaf1046931d37f2a901b6dfe0968ee965a29e94c6766b2af6"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"
@ -1985,3 +2291,9 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"

View file

@ -29,6 +29,7 @@ serde = "1"
serde_json = "1" serde_json = "1"
simple_logger = "1" simple_logger = "1"
sodiumoxide = "0.2" sodiumoxide = "0.2"
sqlx = { version = "0.5", features = [ "runtime-actix-rustls", "sqlite", "time", "chrono", "macros" ] }
thiserror = "1" thiserror = "1"
tokio = { version = "1", features = [ "full", "parking_lot" ] } tokio = { version = "1", features = [ "full", "parking_lot" ] }
tokio-stream = { version = "0.1", features = [ "sync" ] } tokio-stream = { version = "0.1", features = [ "sync" ] }
@ -38,3 +39,4 @@ url = { version = "2", features = [ "serde" ] }
[profile.release] [profile.release]
lto = true lto = true
codegen-units = 1 codegen-units = 1
debug = true

7
db_queries/init.sql Normal file
View file

@ -0,0 +1,7 @@
create table if not exists Images(
id varchar primary key not null,
size integer not null,
accessed timestamp not null default CURRENT_TIMESTAMP,
disk_size integer as ((size + 4095) / 4096 * 4096)
);
create index if not exists Images_accessed on Images(accessed);

9
init_cache.sh Executable file
View file

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# This script needs to be run once in order for compile time macros to not
# complain about a missing DB
# We can trust that our program will initialize the db at runtime the same way
# as it pulls from the same file for initialization
sqlite3 cache/metadata.sqlite < db_queries/init.sql

84
src/cache/fs.rs vendored
View file

@ -3,11 +3,13 @@ use bytes::{Buf, Bytes};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::{debug, error}; use log::{debug, error};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::Deserialize;
use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{collections::HashMap, num::NonZeroU64};
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
@ -16,7 +18,7 @@ use tokio::sync::RwLock;
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
use super::{BoxedImageStream, CacheStream, CacheStreamItem}; use super::{BoxedImageStream, CacheStream, CacheStreamItem, ImageMetadata};
/// Keeps track of files that are currently being written to. /// Keeps track of files that are currently being written to.
/// ///
@ -38,26 +40,27 @@ static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
Lazy::new(|| RwLock::new(HashMap::new())); Lazy::new(|| RwLock::new(HashMap::new()));
/// Tries to read from the file, returning a byte stream if it exists /// Tries to read from the file, returning a byte stream if it exists
pub async fn read_file(path: &Path) -> Option<Result<CacheStream, std::io::Error>> { pub async fn read_file(
if path.exists() { path: &Path,
let status = WRITING_STATUS.read().await.get(path).map(Clone::clone); ) -> Option<Result<(CacheStream, ImageMetadata), std::io::Error>> {
let file = File::open(path).await.ok()?;
let std_file = file.try_clone().await.unwrap().into_std().await;
if let Some(status) = status { let metadata = {
Some( let mut de = serde_json::Deserializer::from_reader(std_file);
ConcurrentFsStream::new(path, WatchStream::new(status)) ImageMetadata::deserialize(&mut de).ok()?
.await };
.map(CacheStream::Concurrent),
) let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
} else { CacheStream::Concurrent(ConcurrentFsStream::from_file(
Some( file,
File::open(path) WatchStream::new(status),
.await ))
.map(|f| CacheStream::Completed(FramedRead::new(f, BytesCodec::new()))),
)
}
} else { } else {
None CacheStream::Completed(FramedRead::new(file, BytesCodec::new()))
} };
Some(Ok((stream, metadata)))
} }
/// Maps the input byte stream into one that writes to disk instead, returning /// Maps the input byte stream into one that writes to disk instead, returning
@ -65,6 +68,7 @@ pub async fn read_file(path: &Path) -> Option<Result<CacheStream, std::io::Error
pub async fn write_file( pub async fn write_file(
path: &Path, path: &Path,
mut byte_stream: BoxedImageStream, mut byte_stream: BoxedImageStream,
metadata: ImageMetadata,
notifier: UnboundedSender<u64>, notifier: UnboundedSender<u64>,
) -> Result<CacheStream, std::io::Error> { ) -> Result<CacheStream, std::io::Error> {
let (tx, rx) = channel(WritingStatus::NotDone); let (tx, rx) = channel(WritingStatus::NotDone);
@ -73,7 +77,7 @@ pub async fn write_file(
let mut write_lock = WRITING_STATUS.write().await; let mut write_lock = WRITING_STATUS.write().await;
let parent = path.parent().unwrap(); let parent = path.parent().unwrap();
create_dir_all(parent).await?; create_dir_all(parent).await?;
let file = File::create(path).await?; // we need to make sure the file exists and is truncated. let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
write_lock.insert(path.to_path_buf(), rx.clone()); write_lock.insert(path.to_path_buf(), rx.clone());
file file
}; };
@ -84,6 +88,8 @@ pub async fn write_file(
let path_buf = path_buf; // moves path buf into async let path_buf = path_buf; // moves path buf into async
let mut errored = false; let mut errored = false;
let mut bytes_written: u64 = 0; let mut bytes_written: u64 = 0;
file.write_all(serde_json::to_string(&metadata).unwrap().as_bytes())
.await?;
while let Some(bytes) = byte_stream.next().await { while let Some(bytes) = byte_stream.next().await {
if let Ok(mut bytes) = bytes { if let Ok(mut bytes) = bytes {
loop { loop {
@ -113,17 +119,19 @@ pub async fn write_file(
debug!("writing to file done"); debug!("writing to file done");
} }
let mut write_lock = WRITING_STATUS.write().await; {
// This needs to be written atomically with the write lock, else let mut write_lock = WRITING_STATUS.write().await;
// it's possible we have an inconsistent state // This needs to be written atomically with the write lock, else
// // it's possible we have an inconsistent state
// We don't really care if we have no receivers //
if errored { // We don't really care if we have no receivers
let _ = tx.send(WritingStatus::Error); if errored {
} else { let _ = tx.send(WritingStatus::Error);
let _ = tx.send(WritingStatus::Done(bytes_written)); } else {
let _ = tx.send(WritingStatus::Done(bytes_written));
}
write_lock.remove(&path_buf);
} }
write_lock.remove(&path_buf);
// notify // notify
if let Err(e) = notifier.send(bytes_written) { if let Err(e) = notifier.send(bytes_written) {
@ -154,12 +162,18 @@ impl ConcurrentFsStream {
path: &Path, path: &Path,
receiver: WatchStream<WritingStatus>, receiver: WatchStream<WritingStatus>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { File::open(path)
file: Box::pin(File::open(path).await?), .await
.map(|file| Self::from_file(file, receiver))
}
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
Self {
file: Box::pin(file),
receiver: Box::pin(receiver), receiver: Box::pin(receiver),
bytes_read: 0, bytes_read: 0,
bytes_total: None, bytes_total: None,
}) }
} }
} }
@ -182,7 +196,7 @@ impl Stream for ConcurrentFsStream {
// First, try to read from the file... // First, try to read from the file...
// TODO: Might be more efficient to have a larger buffer // TODO: Might be more efficient to have a larger buffer
let mut bytes = [0; 8 * 1024].to_vec(); let mut bytes = [0; 4 * 1024].to_vec();
let mut buffer = ReadBuf::new(&mut bytes); let mut buffer = ReadBuf::new(&mut bytes);
match self.file.as_mut().poll_read(cx, &mut buffer) { match self.file.as_mut().poll_read(cx, &mut buffer) {
Poll::Ready(Ok(_)) => (), Poll::Ready(Ok(_)) => (),

View file

@ -136,15 +136,16 @@ impl GenerationalCache {
#[async_trait] #[async_trait]
impl Cache for GenerationalCache { impl Cache for GenerationalCache {
async fn get( async fn get(
&mut self, &self,
key: &CacheKey, key: &CacheKey,
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> { ) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
todo!();
if self.in_memory.contains(key) { if self.in_memory.contains(key) {
return self return self
.in_memory .in_memory
.get(key) .get(key)
// TODO: get rid of clone? // TODO: get rid of clone?
.map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata))); .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone())));
} }
if let Some(metadata) = self.on_disk.pop(key) { if let Some(metadata) = self.on_disk.pop(key) {
@ -182,21 +183,23 @@ impl Cache for GenerationalCache {
buffer.shrink_to_fit(); buffer.shrink_to_fit();
todo!();
self.disk_cur_size -= buffer.len() as u64; self.disk_cur_size -= buffer.len() as u64;
let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into()); // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
return Some(self.put(key.clone(), Box::new(image), metadata).await); // return Some(self.put(key.clone(), Box::new(image), metadata).await);
} }
None None
} }
async fn put( async fn put(
&mut self, &self,
key: CacheKey, key: CacheKey,
mut image: BoxedImageStream, mut image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError> { ) -> Result<CacheStream, CacheError> {
todo!();
let mut hot_evicted = vec![]; let mut hot_evicted = vec![];
let image = { let image = {
@ -237,9 +240,10 @@ impl Cache for GenerationalCache {
self.push_into_cold(key, image, metadata).await; self.push_into_cold(key, image, metadata).await;
} }
self.get(&key).await.unwrap() todo!();
// self.get(&key).await.unwrap()
} }
// noop // noop
async fn increase_usage(&mut self, _amt: u64) {} async fn increase_usage(&self, _amt: u64) {}
} }

120
src/cache/low_mem.rs vendored
View file

@ -1,88 +1,128 @@
//! Low memory caching stuff //! Low memory caching stuff
use std::{path::PathBuf, sync::Arc}; use std::sync::{atomic::Ordering, Arc};
use std::{path::PathBuf, sync::atomic::AtomicU64};
use async_trait::async_trait; use async_trait::async_trait;
use lru::LruCache; use futures::StreamExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use sqlx::SqlitePool;
use tokio::sync::RwLock; use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender};
use tokio_stream::wrappers::ReceiverStream;
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
pub struct LowMemCache { pub struct LowMemCache {
on_disk: LruCache<CacheKey, ImageMetadata>,
disk_path: PathBuf, disk_path: PathBuf,
disk_max_size: u64, disk_max_size: u64,
disk_cur_size: u64, disk_cur_size: AtomicU64,
master_sender: UnboundedSender<u64>, file_size_channel_sender: UnboundedSender<u64>,
db_update_channel_sender: Sender<DbMessage>,
}
enum DbMessage {
Get(CacheKey),
Put(CacheKey, ImageMetadata),
} }
impl LowMemCache { impl LowMemCache {
/// Constructs a new low memory cache at the provided path and capacity. /// Constructs a new low memory cache at the provided path and capaci ty.
/// This internally spawns a task that will wait for filesystem /// This internally spawns a task that will wait for filesystem
/// notifications when a file has been written. /// notifications when a file has been written.
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<RwLock<Box<dyn Cache>>> { pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Box<dyn Cache>> {
let (tx, mut rx) = unbounded_channel(); let (file_size_tx, mut file_size_rx) = unbounded_channel();
let new_self: Arc<RwLock<Box<dyn Cache>>> = Arc::new(RwLock::new(Box::new(Self { let (db_tx, db_rx) = channel(128);
on_disk: LruCache::unbounded(), let db_pool = {
let db_url = format!(
"sqlite:{}/metadata.sqlite?mode=rwc",
disk_path.to_str().unwrap()
);
let db = SqlitePool::connect(&db_url).await.unwrap();
// Run db init
sqlx::query_file!("./db_queries/init.sql")
.execute(&mut db.acquire().await.unwrap())
.await
.unwrap();
db
};
let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self {
disk_path, disk_path,
disk_max_size, disk_max_size,
disk_cur_size: 0, disk_cur_size: AtomicU64::new(0),
master_sender: tx, file_size_channel_sender: file_size_tx,
}))); db_update_channel_sender: db_tx,
}));
// Spawns a new task that continuously listens for events received by // Spawns a new task that continuously listens for events received by
// the channel, which informs the low memory cache the total size of the // the channel, which informs the low memory cache the total size of the
// item that was put into the cache. // item that was put into the cache.
let new_self_0 = Arc::clone(&new_self); let new_self_0 = Arc::clone(&new_self);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(new_size) = rx.recv().await { // This will never return None, effectively a loop
new_self_0.write().await.increase_usage(new_size).await; while let Some(new_size) = file_size_rx.recv().await {
new_self_0.increase_usage(new_size).await;
}
});
// Spawn a new task that will listen for updates to the db.
tokio::spawn(async move {
let db_pool = db_pool;
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
while let Some(messages) = recv_stream.next().await {
let mut transaction = db_pool.begin().await.unwrap();
for message in messages {}
transaction.commit().await.unwrap();
} }
}); });
new_self new_self
} }
async fn prune(&mut self) {
todo!()
}
} }
#[async_trait] #[async_trait]
impl Cache for LowMemCache { impl Cache for LowMemCache {
async fn get( async fn get(
&mut self, &self,
key: &CacheKey, key: &CacheKey,
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> { ) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
let metadata = self.on_disk.get(key)?; let channel = self.db_update_channel_sender.clone();
let key_0 = key.clone();
tokio::spawn(async move { channel.send(DbMessage::Get(key_0)).await });
let path = self.disk_path.clone().join(PathBuf::from(key.clone())); let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
super::fs::read_file(&path) super::fs::read_file(&path)
.await .await
.map(|res| res.map(|stream| (stream, metadata)).map_err(Into::into)) .map(|res| res.map_err(Into::into))
} }
async fn put( async fn put(
&mut self, &self,
key: CacheKey, key: CacheKey,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError> { ) -> Result<CacheStream, CacheError> {
let path = self.disk_path.clone().join(PathBuf::from(key.clone())); let channel = self.db_update_channel_sender.clone();
self.on_disk.put(key.clone(), metadata); let key_0 = key.clone();
super::fs::write_file(&path, image, self.master_sender.clone()) tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await });
.await
.map(move |stream| (stream, self.on_disk.get(&key).unwrap())) let path = self.disk_path.clone().join(PathBuf::from(key));
.map_err(Into::into)
super::fs::write_file(
&path,
image,
metadata,
self.file_size_channel_sender.clone(),
)
.await
.map_err(Into::into)
} }
/// Increments the internal size counter, pruning if the value exceeds the async fn increase_usage(&self, amt: u64) {
/// user-defined capacity. self.disk_cur_size.fetch_add(amt, Ordering::Release);
async fn increase_usage(&mut self, amt: u64) {
self.disk_cur_size += amt;
if self.disk_cur_size > self.disk_max_size {
self.prune().await;
}
} }
} }

22
src/cache/mod.rs vendored
View file

@ -10,13 +10,14 @@ use bytes::{Bytes, BytesMut};
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use fs::ConcurrentFsStream; use fs::ConcurrentFsStream;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
pub use fs::UpstreamError; pub use fs::UpstreamError;
pub use generational::GenerationalCache; pub use generational::GenerationalCache;
pub use low_mem::LowMemCache; pub use low_mem::LowMemCache;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
mod fs; mod fs;
mod generational; mod generational;
@ -45,16 +46,15 @@ impl From<CacheKey> for PathBuf {
#[derive(Clone)] #[derive(Clone)]
pub struct CachedImage(pub Bytes); pub struct CachedImage(pub Bytes);
#[derive(Copy, Clone)] #[derive(Copy, Clone, Serialize, Deserialize)]
pub struct ImageMetadata { pub struct ImageMetadata {
pub content_type: Option<ImageContentType>, pub content_type: Option<ImageContentType>,
// If we can guarantee a non-zero u32 here we can save 4 bytes
pub content_length: Option<u32>, pub content_length: Option<u32>,
pub last_modified: Option<DateTime<FixedOffset>>, pub last_modified: Option<DateTime<FixedOffset>>,
} }
// Confirmed by Ply to be these types: https://link.eddie.sh/ZXfk0 // Confirmed by Ply to be these types: https://link.eddie.sh/ZXfk0
#[derive(Copy, Clone)] #[derive(Copy, Clone, Serialize, Deserialize)]
pub enum ImageContentType { pub enum ImageContentType {
Png, Png,
Jpeg, Jpeg,
@ -147,19 +147,17 @@ pub enum CacheError {
#[async_trait] #[async_trait]
pub trait Cache: Send + Sync { pub trait Cache: Send + Sync {
async fn get( async fn get(&self, key: &CacheKey)
&mut self, -> Option<Result<(CacheStream, ImageMetadata), CacheError>>;
key: &CacheKey,
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>>;
async fn put( async fn put(
&mut self, &self,
key: CacheKey, key: CacheKey,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError>; ) -> Result<CacheStream, CacheError>;
async fn increase_usage(&mut self, amt: u64); async fn increase_usage(&self, amt: u64);
} }
pub enum CacheStream { pub enum CacheStream {

View file

@ -124,14 +124,14 @@ async fn main() -> Result<(), std::io::Error> {
} }
}); });
let cache: Arc<TokioRwLock<Box<dyn Cache>>> = if low_mem_mode { let cache: Arc<Box<dyn Cache>> = if low_mem_mode {
LowMemCache::new(disk_quota, cache_path.clone()) LowMemCache::new(disk_quota, cache_path.clone()).await
} else { } else {
Arc::new(TokioRwLock::new(Box::new(GenerationalCache::new( Arc::new(Box::new(GenerationalCache::new(
memory_max_size, memory_max_size,
disk_quota, disk_quota,
cache_path.clone(), cache_path.clone(),
)))) )))
}; };
let cache = Arc::clone(&cache); let cache = Arc::clone(&cache);
let cache1 = Arc::clone(&cache); let cache1 = Arc::clone(&cache);

View file

@ -52,7 +52,7 @@ impl Responder for ServerResponse {
#[get("/{token}/data/{chapter_hash}/{file_name}")] #[get("/{token}/data/{chapter_hash}/{file_name}")]
async fn token_data( async fn token_data(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<RwLock<Box<dyn Cache>>>, cache: Data<Box<dyn Cache>>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -69,7 +69,7 @@ async fn token_data(
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")] #[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
async fn token_data_saver( async fn token_data_saver(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<RwLock<Box<dyn Cache>>>, cache: Data<Box<dyn Cache>>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -187,16 +187,16 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
#[allow(clippy::future_not_send)] #[allow(clippy::future_not_send)]
async fn fetch_image( async fn fetch_image(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<RwLock<Box<dyn Cache>>>, cache: Data<Box<dyn Cache>>,
chapter_hash: String, chapter_hash: String,
file_name: String, file_name: String,
is_data_saver: bool, is_data_saver: bool,
) -> ServerResponse { ) -> ServerResponse {
let key = CacheKey(chapter_hash, file_name, is_data_saver); let key = CacheKey(chapter_hash, file_name, is_data_saver);
match cache.write().await.get(&key).await { match cache.get(&key).await {
Some(Ok((image, metadata))) => { Some(Ok((image, metadata))) => {
return construct_response(image, metadata); return construct_response(image, &metadata);
} }
Some(Err(_)) => { Some(Err(_)) => {
return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish()); return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish());
@ -263,9 +263,9 @@ async fn fetch_image(
debug!("Inserting into cache"); debug!("Inserting into cache");
let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap(); let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap();
let (stream, metadata) = { let stream = {
match cache.write().await.put(key, Box::new(body), metadata).await { match cache.put(key, Box::new(body), metadata).await {
Ok((stream, metadata)) => (stream, *metadata), Ok(stream) => stream,
Err(e) => { Err(e) => {
warn!("Failed to insert into cache: {}", e); warn!("Failed to insert into cache: {}", e);
return ServerResponse::HttpResponse( return ServerResponse::HttpResponse(