Compare commits

..

No commits in common. "eab0449a02b220ea276daa55c557dbe1e20a53c6" and "53c0cb664a2b6b37af68027c5992295be9cddd7d" have entirely different histories.

10 changed files with 159 additions and 557 deletions

312
Cargo.lock generated
View file

@ -234,12 +234,6 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.50" version = "0.1.50"
@ -251,15 +245,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "atoi"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -304,18 +289,6 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitvec"
version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.9.0" version = "0.9.0"
@ -345,24 +318,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "build_const"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7"
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.6.1" version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.0.1" version = "1.0.1"
@ -477,15 +438,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "crc"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
dependencies = [
"build_const",
]
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
version = "1.2.1" version = "1.2.1"
@ -495,37 +447,6 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [
"autocfg",
"cfg-if",
"lazy_static",
]
[[package]] [[package]]
name = "ctrlc" name = "ctrlc"
version = "3.1.9" version = "3.1.9"
@ -612,12 +533,6 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.14" version = "0.3.14"
@ -759,15 +674,6 @@ dependencies = [
"ahash 0.4.7", "ahash 0.4.7",
] ]
[[package]]
name = "hashlink"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8"
dependencies = [
"hashbrown",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.3.2" version = "0.3.2"
@ -786,12 +692,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "http" name = "http"
version = "0.2.4" version = "0.2.4"
@ -928,19 +828,6 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lexical-core"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21f866863575d0e1d654fbeeabdc927292fdf862873dc3c96c6f753357e13374"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if",
"ryu",
"static_assertions",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.93" version = "0.2.93"
@ -958,17 +845,6 @@ dependencies = [
"pkg-config", "pkg-config",
] ]
[[package]]
name = "libsqlite3-sys"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19cb1effde5f834799ac5e5ef0e40d45027cd74f271b1de786ba8abb30e2164d"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "local-channel" name = "local-channel"
version = "0.1.2" version = "0.1.2"
@ -1038,7 +914,6 @@ dependencies = [
"serde_json", "serde_json",
"simple_logger", "simple_logger",
"sodiumoxide", "sodiumoxide",
"sqlx",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@ -1046,12 +921,6 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]] [[package]]
name = "matches" name = "matches"
version = "0.1.8" version = "0.1.8"
@ -1114,19 +983,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"lexical-core",
"memchr",
"version_check",
]
[[package]] [[package]]
name = "ntapi" name = "ntapi"
version = "0.3.6" version = "0.3.6"
@ -1318,12 +1174,6 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.3" version = "0.8.3"
@ -1562,19 +1412,6 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
[[package]]
name = "sha2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de"
dependencies = [
"block-buffer",
"cfg-if",
"cpuid-bool",
"digest",
"opaque-debug",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.3.0" version = "1.3.0"
@ -1636,105 +1473,6 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlformat"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d86e3c77ff882a828346ba401a7ef4b8e440df804491c6064fe8295765de71c"
dependencies = [
"lazy_static",
"maplit",
"nom",
"regex",
"unicode_categories",
]
[[package]]
name = "sqlx"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d582b9bc04ec6c03084196efc42c2226b018e9941f03ee62bd88921d500917c0"
dependencies = [
"sqlx-core",
"sqlx-macros",
]
[[package]]
name = "sqlx-core"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de52d1d473cebb2abb79c886ef6a8023e965e34c0676a99cfeac2cc7f0fde4c1"
dependencies = [
"ahash 0.7.2",
"atoi",
"bitflags",
"byteorder",
"bytes",
"chrono",
"crc",
"crossbeam-channel",
"crossbeam-queue",
"crossbeam-utils",
"either",
"futures-channel",
"futures-core",
"futures-util",
"hashlink",
"hex",
"itoa",
"libc",
"libsqlite3-sys",
"log",
"memchr",
"once_cell",
"parking_lot",
"percent-encoding",
"rustls",
"sha2",
"smallvec",
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"time 0.2.26",
"tokio-stream",
"url",
"webpki",
"webpki-roots",
"whoami",
]
[[package]]
name = "sqlx-macros"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a40f0be97e704d3fbf059e7e3333c3735639146a72d586c5534c70e79da88a4"
dependencies = [
"dotenv",
"either",
"futures",
"heck",
"proc-macro2",
"quote",
"sha2",
"sqlx-core",
"sqlx-rt",
"syn",
"url",
]
[[package]]
name = "sqlx-rt"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6ae97ab05063ed515cdc23d90253213aa24dda0a288c5ec079af3d10f9771bc"
dependencies = [
"actix-rt",
"once_cell",
"tokio",
"tokio-rustls",
]
[[package]] [[package]]
name = "standback" name = "standback"
version = "0.2.17" version = "0.2.17"
@ -1744,12 +1482,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]] [[package]]
name = "stdweb" name = "stdweb"
version = "0.4.20" version = "0.4.20"
@ -1799,16 +1531,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.0" version = "0.10.0"
@ -1826,12 +1548,6 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]] [[package]]
name = "termcolor" name = "termcolor"
version = "1.1.2" version = "1.1.2"
@ -2086,12 +1802,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "unicode_categories"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.7.1" version = "0.7.1"
@ -2111,12 +1821,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "vcpkg"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.2" version = "0.8.2"
@ -2242,16 +1946,6 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "whoami"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4abacf325c958dfeaf1046931d37f2a901b6dfe0968ee965a29e94c6766b2af6"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"
@ -2291,9 +1985,3 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"

View file

@ -29,7 +29,6 @@ serde = "1"
serde_json = "1" serde_json = "1"
simple_logger = "1" simple_logger = "1"
sodiumoxide = "0.2" sodiumoxide = "0.2"
sqlx = { version = "0.5", features = [ "runtime-actix-rustls", "sqlite", "time", "chrono", "macros" ] }
thiserror = "1" thiserror = "1"
tokio = { version = "1", features = [ "full", "parking_lot" ] } tokio = { version = "1", features = [ "full", "parking_lot" ] }
tokio-stream = { version = "0.1", features = [ "sync" ] } tokio-stream = { version = "0.1", features = [ "sync" ] }
@ -39,4 +38,3 @@ url = { version = "2", features = [ "serde" ] }
[profile.release] [profile.release]
lto = true lto = true
codegen-units = 1 codegen-units = 1
debug = true

View file

@ -1,7 +0,0 @@
create table if not exists Images(
id varchar primary key not null,
size integer not null,
accessed timestamp not null default CURRENT_TIMESTAMP,
disk_size integer as ((size + 4095) / 4096 * 4096)
);
create index if not exists Images_accessed on Images(accessed);

View file

@ -1,9 +0,0 @@
#!/usr/bin/env bash
# This script needs to be run once in order for compile time macros to not
# complain about a missing DB
# We can trust that our program will initialize the db at runtime the same way
# as it pulls from the same file for initialization
sqlite3 cache/metadata.sqlite < db_queries/init.sql

84
src/cache/fs.rs vendored
View file

@ -3,13 +3,11 @@ use bytes::{Buf, Bytes};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::{debug, error}; use log::{debug, error};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::Deserialize;
use std::fmt::Display; use std::fmt::Display;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, num::NonZeroU64};
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
@ -18,7 +16,7 @@ use tokio::sync::RwLock;
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
use super::{BoxedImageStream, CacheStream, CacheStreamItem, ImageMetadata}; use super::{BoxedImageStream, CacheStream, CacheStreamItem};
/// Keeps track of files that are currently being written to. /// Keeps track of files that are currently being written to.
/// ///
@ -40,27 +38,26 @@ static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
Lazy::new(|| RwLock::new(HashMap::new())); Lazy::new(|| RwLock::new(HashMap::new()));
/// Tries to read from the file, returning a byte stream if it exists /// Tries to read from the file, returning a byte stream if it exists
pub async fn read_file( pub async fn read_file(path: &Path) -> Option<Result<CacheStream, std::io::Error>> {
path: &Path, if path.exists() {
) -> Option<Result<(CacheStream, ImageMetadata), std::io::Error>> { let status = WRITING_STATUS.read().await.get(path).map(Clone::clone);
let file = File::open(path).await.ok()?;
let std_file = file.try_clone().await.unwrap().into_std().await;
let metadata = { if let Some(status) = status {
let mut de = serde_json::Deserializer::from_reader(std_file); Some(
ImageMetadata::deserialize(&mut de).ok()? ConcurrentFsStream::new(path, WatchStream::new(status))
}; .await
.map(CacheStream::Concurrent),
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) { )
CacheStream::Concurrent(ConcurrentFsStream::from_file( } else {
file, Some(
WatchStream::new(status), File::open(path)
)) .await
.map(|f| CacheStream::Completed(FramedRead::new(f, BytesCodec::new()))),
)
}
} else { } else {
CacheStream::Completed(FramedRead::new(file, BytesCodec::new())) None
}; }
Some(Ok((stream, metadata)))
} }
/// Maps the input byte stream into one that writes to disk instead, returning /// Maps the input byte stream into one that writes to disk instead, returning
@ -68,7 +65,6 @@ pub async fn read_file(
pub async fn write_file( pub async fn write_file(
path: &Path, path: &Path,
mut byte_stream: BoxedImageStream, mut byte_stream: BoxedImageStream,
metadata: ImageMetadata,
notifier: UnboundedSender<u64>, notifier: UnboundedSender<u64>,
) -> Result<CacheStream, std::io::Error> { ) -> Result<CacheStream, std::io::Error> {
let (tx, rx) = channel(WritingStatus::NotDone); let (tx, rx) = channel(WritingStatus::NotDone);
@ -77,7 +73,7 @@ pub async fn write_file(
let mut write_lock = WRITING_STATUS.write().await; let mut write_lock = WRITING_STATUS.write().await;
let parent = path.parent().unwrap(); let parent = path.parent().unwrap();
create_dir_all(parent).await?; create_dir_all(parent).await?;
let file = File::create(path).await?; // we need to make sure the file exists and is truncated. let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
write_lock.insert(path.to_path_buf(), rx.clone()); write_lock.insert(path.to_path_buf(), rx.clone());
file file
}; };
@ -88,8 +84,6 @@ pub async fn write_file(
let path_buf = path_buf; // moves path buf into async let path_buf = path_buf; // moves path buf into async
let mut errored = false; let mut errored = false;
let mut bytes_written: u64 = 0; let mut bytes_written: u64 = 0;
file.write_all(serde_json::to_string(&metadata).unwrap().as_bytes())
.await?;
while let Some(bytes) = byte_stream.next().await { while let Some(bytes) = byte_stream.next().await {
if let Ok(mut bytes) = bytes { if let Ok(mut bytes) = bytes {
loop { loop {
@ -119,19 +113,17 @@ pub async fn write_file(
debug!("writing to file done"); debug!("writing to file done");
} }
{ let mut write_lock = WRITING_STATUS.write().await;
let mut write_lock = WRITING_STATUS.write().await; // This needs to be written atomically with the write lock, else
// This needs to be written atomically with the write lock, else // it's possible we have an inconsistent state
// it's possible we have an inconsistent state //
// // We don't really care if we have no receivers
// We don't really care if we have no receivers if errored {
if errored { let _ = tx.send(WritingStatus::Error);
let _ = tx.send(WritingStatus::Error); } else {
} else { let _ = tx.send(WritingStatus::Done(bytes_written));
let _ = tx.send(WritingStatus::Done(bytes_written));
}
write_lock.remove(&path_buf);
} }
write_lock.remove(&path_buf);
// notify // notify
if let Err(e) = notifier.send(bytes_written) { if let Err(e) = notifier.send(bytes_written) {
@ -162,18 +154,12 @@ impl ConcurrentFsStream {
path: &Path, path: &Path,
receiver: WatchStream<WritingStatus>, receiver: WatchStream<WritingStatus>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
File::open(path) Ok(Self {
.await file: Box::pin(File::open(path).await?),
.map(|file| Self::from_file(file, receiver))
}
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
Self {
file: Box::pin(file),
receiver: Box::pin(receiver), receiver: Box::pin(receiver),
bytes_read: 0, bytes_read: 0,
bytes_total: None, bytes_total: None,
} })
} }
} }
@ -196,7 +182,7 @@ impl Stream for ConcurrentFsStream {
// First, try to read from the file... // First, try to read from the file...
// TODO: Might be more efficient to have a larger buffer // TODO: Might be more efficient to have a larger buffer
let mut bytes = [0; 4 * 1024].to_vec(); let mut bytes = [0; 8 * 1024].to_vec();
let mut buffer = ReadBuf::new(&mut bytes); let mut buffer = ReadBuf::new(&mut bytes);
match self.file.as_mut().poll_read(cx, &mut buffer) { match self.file.as_mut().poll_read(cx, &mut buffer) {
Poll::Ready(Ok(_)) => (), Poll::Ready(Ok(_)) => (),

View file

@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc}; use std::path::PathBuf;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@ -136,70 +136,67 @@ impl GenerationalCache {
#[async_trait] #[async_trait]
impl Cache for GenerationalCache { impl Cache for GenerationalCache {
async fn get( async fn get(
&self, &mut self,
key: Arc<CacheKey>, key: &CacheKey,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> { ) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> {
todo!(); if self.in_memory.contains(key) {
// if self.in_memory.contains(key) { return self
// return self .in_memory
// .in_memory .get(key)
// .get(key) // TODO: get rid of clone?
// // TODO: get rid of clone? .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata)));
// .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone()))); }
// }
// if let Some(metadata) = self.on_disk.pop(key) { if let Some(metadata) = self.on_disk.pop(key) {
// let new_key_path = { let new_key_path = {
// let mut root = self.disk_path.clone(); let mut root = self.disk_path.clone();
// root.push(PathBuf::from(key.clone())); root.push(PathBuf::from(key.clone()));
// root root
// }; };
// // extract from disk, if it exists // extract from disk, if it exists
// let file = File::open(&new_key_path).await; let file = File::open(&new_key_path).await;
// let mut buffer = metadata let mut buffer = metadata
// .content_length .content_length
// .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize)); .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize));
// match file { match file {
// Ok(mut file) => { Ok(mut file) => {
// match file.read_to_end(&mut buffer).await { match file.read_to_end(&mut buffer).await {
// Ok(_) => { Ok(_) => {
// // We don't particularly care if we fail to delete from disk since // We don't particularly care if we fail to delete from disk since
// // if it's an error it means it's already been dropped. // if it's an error it means it's already been dropped.
// tokio::spawn(remove_file(new_key_path)); tokio::spawn(remove_file(new_key_path));
// } }
// Err(e) => { Err(e) => {
// warn!("Failed to read from {:?}: {}", new_key_path, e); warn!("Failed to read from {:?}: {}", new_key_path, e);
// } }
// } }
// } }
// Err(e) => { Err(e) => {
// warn!("Failed to open {:?}: {}", new_key_path, e); warn!("Failed to open {:?}: {}", new_key_path, e);
// return None; return None;
// } }
// } }
// buffer.shrink_to_fit(); buffer.shrink_to_fit();
// todo!(); self.disk_cur_size -= buffer.len() as u64;
// self.disk_cur_size -= buffer.len() as u64; let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
// // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
// // return Some(self.put(key.clone(), Box::new(image), metadata).await); return Some(self.put(key.clone(), Box::new(image), metadata).await);
// } }
None None
} }
async fn put( async fn put(
&self, &mut self,
key: Arc<CacheKey>, key: CacheKey,
mut image: BoxedImageStream, mut image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> { ) -> Result<(CacheStream, &ImageMetadata), CacheError> {
todo!();
let mut hot_evicted = vec![]; let mut hot_evicted = vec![];
let image = { let image = {
@ -227,14 +224,12 @@ impl Cache for GenerationalCache {
} }
} }
todo!(); self.in_memory.put(key.clone(), (image, metadata));
// self.in_memory.put(key.clone(), (image, metadata));
self.memory_cur_size += new_img_size; self.memory_cur_size += new_img_size;
} else { } else {
// Image was larger than memory capacity, push directly into cold // Image was larger than memory capacity, push directly into cold
// storage. // storage.
todo!(); self.push_into_cold(key.clone(), image, metadata).await;
// self.push_into_cold(key.clone(), image, metadata).await;
}; };
// Push evicted hot entires into cold storage. // Push evicted hot entires into cold storage.
@ -242,10 +237,9 @@ impl Cache for GenerationalCache {
self.push_into_cold(key, image, metadata).await; self.push_into_cold(key, image, metadata).await;
} }
todo!(); self.get(&key).await.unwrap()
// self.get(&key).await.unwrap()
} }
// noop // noop
async fn increase_usage(&self, _amt: u64) {} async fn increase_usage(&mut self, _amt: u64) {}
} }

129
src/cache/low_mem.rs vendored
View file

@ -1,131 +1,88 @@
//! Low memory caching stuff //! Low memory caching stuff
use std::sync::{atomic::Ordering, Arc}; use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::atomic::AtomicU64};
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use lru::LruCache;
use sqlx::SqlitePool; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}; use tokio::sync::RwLock;
use tokio_stream::wrappers::ReceiverStream;
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
pub struct LowMemCache { pub struct LowMemCache {
on_disk: LruCache<CacheKey, ImageMetadata>,
disk_path: PathBuf, disk_path: PathBuf,
disk_max_size: u64, disk_max_size: u64,
disk_cur_size: AtomicU64, disk_cur_size: u64,
file_size_channel_sender: UnboundedSender<u64>, master_sender: UnboundedSender<u64>,
db_update_channel_sender: Sender<DbMessage>,
}
enum DbMessage {
Get(Arc<CacheKey>),
Put(Arc<CacheKey>, ImageMetadata),
} }
impl LowMemCache { impl LowMemCache {
/// Constructs a new low memory cache at the provided path and capaci ty. /// Constructs a new low memory cache at the provided path and capacity.
/// This internally spawns a task that will wait for filesystem /// This internally spawns a task that will wait for filesystem
/// notifications when a file has been written. /// notifications when a file has been written.
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Box<dyn Cache>> { pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<RwLock<Box<dyn Cache>>> {
let (file_size_tx, mut file_size_rx) = unbounded_channel(); let (tx, mut rx) = unbounded_channel();
let (db_tx, db_rx) = channel(128); let new_self: Arc<RwLock<Box<dyn Cache>>> = Arc::new(RwLock::new(Box::new(Self {
let db_pool = { on_disk: LruCache::unbounded(),
let db_url = format!(
"sqlite:{}/metadata.sqlite?mode=rwc",
disk_path.to_str().unwrap()
);
let db = SqlitePool::connect(&db_url).await.unwrap();
// Run db init
sqlx::query_file!("./db_queries/init.sql")
.execute(&mut db.acquire().await.unwrap())
.await
.unwrap();
db
};
let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self {
disk_path, disk_path,
disk_max_size, disk_max_size,
disk_cur_size: AtomicU64::new(0), disk_cur_size: 0,
file_size_channel_sender: file_size_tx, master_sender: tx,
db_update_channel_sender: db_tx, })));
}));
// Spawns a new task that continuously listens for events received by // Spawns a new task that continuously listens for events received by
// the channel, which informs the low memory cache the total size of the // the channel, which informs the low memory cache the total size of the
// item that was put into the cache. // item that was put into the cache.
let new_self_0 = Arc::clone(&new_self); let new_self_0 = Arc::clone(&new_self);
tokio::spawn(async move { tokio::spawn(async move {
// This will never return None, effectively a loop while let Some(new_size) = rx.recv().await {
while let Some(new_size) = file_size_rx.recv().await { new_self_0.write().await.increase_usage(new_size).await;
new_self_0.increase_usage(new_size).await;
}
});
// Spawn a new task that will listen for updates to the db.
tokio::spawn(async move {
let db_pool = db_pool;
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
while let Some(messages) = recv_stream.next().await {
let mut transaction = db_pool.begin().await.unwrap();
for message in messages {}
transaction.commit().await.unwrap();
} }
}); });
new_self new_self
} }
async fn prune(&mut self) {
todo!()
}
} }
#[async_trait] #[async_trait]
impl Cache for LowMemCache { impl Cache for LowMemCache {
async fn get( async fn get(
&self, &mut self,
key: Arc<CacheKey>, key: &CacheKey,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> { ) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> {
let channel = self.db_update_channel_sender.clone(); let metadata = self.on_disk.get(key)?;
let key_0 = Arc::clone(&key); let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
tokio::spawn(async move { channel.send(DbMessage::Get(key_0)).await });
let path = self
.disk_path
.clone()
.join(PathBuf::from(Arc::clone(&key).as_ref()));
super::fs::read_file(&path) super::fs::read_file(&path)
.await .await
.map(|res| res.map_err(Into::into)) .map(|res| res.map(|stream| (stream, metadata)).map_err(Into::into))
} }
async fn put( async fn put(
&self, &mut self,
key: Arc<CacheKey>, key: CacheKey,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> { ) -> Result<(CacheStream, &ImageMetadata), CacheError> {
let channel = self.db_update_channel_sender.clone(); let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
let key_0 = Arc::clone(&key); self.on_disk.put(key.clone(), metadata);
tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await }); super::fs::write_file(&path, image, self.master_sender.clone())
.await
let path = self.disk_path.clone().join(PathBuf::from(key.as_ref())); .map(move |stream| (stream, self.on_disk.get(&key).unwrap()))
.map_err(Into::into)
super::fs::write_file(
&path,
image,
metadata,
self.file_size_channel_sender.clone(),
)
.await
.map_err(Into::into)
} }
async fn increase_usage(&self, amt: u64) { /// Increments the internal size counter, pruning if the value exceeds the
self.disk_cur_size.fetch_add(amt, Ordering::Release); /// user-defined capacity.
async fn increase_usage(&mut self, amt: u64) {
self.disk_cur_size += amt;
if self.disk_cur_size > self.disk_max_size {
self.prune().await;
}
} }
} }

33
src/cache/mod.rs vendored
View file

@ -1,8 +1,8 @@
use std::fmt::Display;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt::Display, sync::Arc};
use actix_web::http::HeaderValue; use actix_web::http::HeaderValue;
use async_trait::async_trait; use async_trait::async_trait;
@ -10,14 +10,13 @@ use bytes::{Bytes, BytesMut};
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use fs::ConcurrentFsStream; use fs::ConcurrentFsStream;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
pub use fs::UpstreamError; pub use fs::UpstreamError;
pub use generational::GenerationalCache; pub use generational::GenerationalCache;
pub use low_mem::LowMemCache; pub use low_mem::LowMemCache;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
mod fs; mod fs;
mod generational; mod generational;
@ -43,25 +42,19 @@ impl From<CacheKey> for PathBuf {
} }
} }
impl From<&CacheKey> for PathBuf {
#[inline]
fn from(key: &CacheKey) -> Self {
key.to_string().into()
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct CachedImage(pub Bytes); pub struct CachedImage(pub Bytes);
#[derive(Copy, Clone, Serialize, Deserialize)] #[derive(Copy, Clone)]
pub struct ImageMetadata { pub struct ImageMetadata {
pub content_type: Option<ImageContentType>, pub content_type: Option<ImageContentType>,
// If we can guarantee a non-zero u32 here we can save 4 bytes
pub content_length: Option<u32>, pub content_length: Option<u32>,
pub last_modified: Option<DateTime<FixedOffset>>, pub last_modified: Option<DateTime<FixedOffset>>,
} }
// Confirmed by Ply to be these types: https://link.eddie.sh/ZXfk0 // Confirmed by Ply to be these types: https://link.eddie.sh/ZXfk0
#[derive(Copy, Clone, Serialize, Deserialize)] #[derive(Copy, Clone)]
pub enum ImageContentType { pub enum ImageContentType {
Png, Png,
Jpeg, Jpeg,
@ -155,18 +148,18 @@ pub enum CacheError {
#[async_trait] #[async_trait]
pub trait Cache: Send + Sync { pub trait Cache: Send + Sync {
async fn get( async fn get(
&self, &mut self,
key: Arc<CacheKey>, key: &CacheKey,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>>; ) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>>;
async fn put( async fn put(
&self, &mut self,
key: Arc<CacheKey>, key: CacheKey,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError>; ) -> Result<(CacheStream, &ImageMetadata), CacheError>;
async fn increase_usage(&self, amt: u64); async fn increase_usage(&mut self, amt: u64);
} }
pub enum CacheStream { pub enum CacheStream {

View file

@ -22,6 +22,7 @@ use simple_logger::SimpleLogger;
use state::{RwLockServerState, ServerState}; use state::{RwLockServerState, ServerState};
use stop::send_stop; use stop::send_stop;
use thiserror::Error; use thiserror::Error;
use tokio::sync::RwLock as TokioRwLock;
mod cache; mod cache;
mod config; mod config;
@ -123,14 +124,14 @@ async fn main() -> Result<(), std::io::Error> {
} }
}); });
let cache: Arc<Box<dyn Cache>> = if low_mem_mode { let cache: Arc<TokioRwLock<Box<dyn Cache>>> = if low_mem_mode {
LowMemCache::new(disk_quota, cache_path.clone()).await LowMemCache::new(disk_quota, cache_path.clone())
} else { } else {
Arc::new(Box::new(GenerationalCache::new( Arc::new(TokioRwLock::new(Box::new(GenerationalCache::new(
memory_max_size, memory_max_size,
disk_quota, disk_quota,
cache_path.clone(), cache_path.clone(),
))) ))))
}; };
let cache = Arc::clone(&cache); let cache = Arc::clone(&cache);
let cache1 = Arc::clone(&cache); let cache1 = Arc::clone(&cache);

View file

@ -1,4 +1,4 @@
use std::sync::{atomic::Ordering, Arc}; use std::sync::atomic::Ordering;
use actix_web::http::header::{ use actix_web::http::header::{
ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH,
@ -15,6 +15,7 @@ use log::{debug, error, info, warn};
use serde::Deserialize; use serde::Deserialize;
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
use thiserror::Error; use thiserror::Error;
use tokio::sync::RwLock;
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError}; use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
use crate::client_api_version; use crate::client_api_version;
@ -51,7 +52,7 @@ impl Responder for ServerResponse {
#[get("/{token}/data/{chapter_hash}/{file_name}")] #[get("/{token}/data/{chapter_hash}/{file_name}")]
async fn token_data( async fn token_data(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Box<dyn Cache>>, cache: Data<RwLock<Box<dyn Cache>>>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -68,7 +69,7 @@ async fn token_data(
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")] #[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
async fn token_data_saver( async fn token_data_saver(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Box<dyn Cache>>, cache: Data<RwLock<Box<dyn Cache>>>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -186,16 +187,16 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
#[allow(clippy::future_not_send)] #[allow(clippy::future_not_send)]
async fn fetch_image( async fn fetch_image(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Box<dyn Cache>>, cache: Data<RwLock<Box<dyn Cache>>>,
chapter_hash: String, chapter_hash: String,
file_name: String, file_name: String,
is_data_saver: bool, is_data_saver: bool,
) -> ServerResponse { ) -> ServerResponse {
let key = Arc::new(CacheKey(chapter_hash, file_name, is_data_saver)); let key = CacheKey(chapter_hash, file_name, is_data_saver);
match cache.get(Arc::clone(&key)).await { match cache.write().await.get(&key).await {
Some(Ok((image, metadata))) => { Some(Ok((image, metadata))) => {
return construct_response(image, &metadata); return construct_response(image, metadata);
} }
Some(Err(_)) => { Some(Err(_)) => {
return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish()); return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish());
@ -262,9 +263,9 @@ async fn fetch_image(
debug!("Inserting into cache"); debug!("Inserting into cache");
let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap(); let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap();
let stream = { let (stream, metadata) = {
match cache.put(key, Box::new(body), metadata).await { match cache.write().await.put(key, Box::new(body), metadata).await {
Ok(stream) => stream, Ok((stream, metadata)) => (stream, *metadata),
Err(e) => { Err(e) => {
warn!("Failed to insert into cache: {}", e); warn!("Failed to insert into cache: {}", e);
return ServerResponse::HttpResponse( return ServerResponse::HttpResponse(