diff --git a/Cargo.lock b/Cargo.lock index 9acb15b..c5f32d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,6 +234,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "async-trait" version = "0.1.50" @@ -245,6 +251,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atoi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" +dependencies = [ + "num-traits", +] + [[package]] name = "atty" version = "0.2.14" @@ -289,6 +304,18 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "bitvec" +version = "0.19.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -318,12 +345,24 @@ dependencies = [ "libc", ] +[[package]] +name = "build_const" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7" + [[package]] name = "bumpalo" version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.0.1" @@ -438,6 +477,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" +[[package]] +name = "crc" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +dependencies = [ + "build_const", +] + [[package]] name = "crc32fast" version = "1.2.1" @@ -447,6 +495,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", +] + [[package]] name = "ctrlc" version = "3.1.9" @@ -533,6 +612,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "funty" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" + [[package]] name = "futures" version = "0.3.14" @@ -674,6 +759,15 @@ dependencies = [ "ahash 0.4.7", ] +[[package]] +name = "hashlink" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8" +dependencies = [ + "hashbrown", +] + [[package]] name = "heck" version = "0.3.2" @@ -692,6 +786,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.4" @@ -828,6 +928,19 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexical-core" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21f866863575d0e1d654fbeeabdc927292fdf862873dc3c96c6f753357e13374" +dependencies = [ + "arrayvec", + "bitflags", + "cfg-if", + "ryu", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.93" @@ -845,6 +958,17 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libsqlite3-sys" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19cb1effde5f834799ac5e5ef0e40d45027cd74f271b1de786ba8abb30e2164d" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "local-channel" version = "0.1.2" @@ -914,6 +1038,7 @@ dependencies = [ "serde_json", "simple_logger", "sodiumoxide", + "sqlx", "thiserror", "tokio", "tokio-stream", @@ -921,6 +1046,12 @@ dependencies = [ "url", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "matches" version = "0.1.8" @@ -983,6 +1114,19 @@ dependencies = [ "libc", ] +[[package]] +name = "nom" +version = "6.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2" +dependencies = [ + "bitvec", + "funty", + "lexical-core", + "memchr", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1174,6 +1318,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" + [[package]] name = "rand" version = "0.8.3" @@ -1412,6 +1562,19 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" +[[package]] +name = "sha2" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de" +dependencies = [ + "block-buffer", + "cfg-if", + "cpuid-bool", + "digest", + "opaque-debug", +] + [[package]] name = "signal-hook-registry" version = "1.3.0" @@ -1473,6 +1636,105 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "sqlformat" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d86e3c77ff882a828346ba401a7ef4b8e440df804491c6064fe8295765de71c" +dependencies = [ + "lazy_static", + "maplit", + "nom", + "regex", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d582b9bc04ec6c03084196efc42c2226b018e9941f03ee62bd88921d500917c0" +dependencies = [ + "sqlx-core", + "sqlx-macros", +] + +[[package]] +name = "sqlx-core" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de52d1d473cebb2abb79c886ef6a8023e965e34c0676a99cfeac2cc7f0fde4c1" +dependencies = [ + "ahash 0.7.2", + "atoi", + "bitflags", + "byteorder", + "bytes", + "chrono", + "crc", + "crossbeam-channel", + "crossbeam-queue", + "crossbeam-utils", + "either", + "futures-channel", + "futures-core", + "futures-util", + "hashlink", + "hex", + "itoa", + "libc", + "libsqlite3-sys", + "log", + "memchr", + "once_cell", + "parking_lot", + "percent-encoding", + "rustls", + "sha2", + "smallvec", + "sqlformat", + "sqlx-rt", + "stringprep", + "thiserror", + "time 0.2.26", + "tokio-stream", + "url", + "webpki", + "webpki-roots", + "whoami", +] + +[[package]] +name = "sqlx-macros" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a40f0be97e704d3fbf059e7e3333c3735639146a72d586c5534c70e79da88a4" +dependencies = [ + "dotenv", + "either", + "futures", + "heck", + "proc-macro2", + "quote", + "sha2", + "sqlx-core", + "sqlx-rt", + "syn", + "url", +] + +[[package]] +name = "sqlx-rt" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6ae97ab05063ed515cdc23d90253213aa24dda0a288c5ec079af3d10f9771bc" +dependencies = [ + "actix-rt", + "once_cell", + "tokio", + "tokio-rustls", +] + [[package]] name = "standback" version = "0.2.17" @@ -1482,6 +1744,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stdweb" version = "0.4.20" @@ -1531,6 +1799,16 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.10.0" @@ -1548,6 +1826,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "termcolor" version = "1.1.2" @@ -1802,6 +2086,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "untrusted" version = "0.7.1" @@ -1821,6 +2111,12 @@ dependencies = [ "serde", ] +[[package]] +name = "vcpkg" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d" + [[package]] name = "vec_map" version = "0.8.2" @@ -1946,6 +2242,16 @@ dependencies = [ "webpki", ] +[[package]] +name = "whoami" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4abacf325c958dfeaf1046931d37f2a901b6dfe0968ee965a29e94c6766b2af6" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1985,3 +2291,9 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" dependencies = [ "winapi", ] + +[[package]] +name = "wyz" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" diff --git a/Cargo.toml b/Cargo.toml index 8c6818c..acbd37d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ serde = "1" serde_json = "1" simple_logger = "1" sodiumoxide = "0.2" +sqlx = { version = "0.5", features = [ "runtime-actix-rustls", "sqlite", "time", "chrono", "macros" ] } thiserror = "1" tokio = { version = "1", features = [ "full", "parking_lot" ] } tokio-stream = { version = "0.1", features = [ "sync" ] } @@ -38,3 +39,4 @@ url = { version = "2", features = [ "serde" ] } [profile.release] lto = true codegen-units = 1 +debug = true \ No newline at end of file diff --git a/db_queries/init.sql b/db_queries/init.sql new file mode 100644 index 0000000..ac2b552 --- /dev/null +++ b/db_queries/init.sql @@ -0,0 +1,7 @@ +create table if not exists Images( + id varchar primary key not null, + size integer not null, + accessed timestamp not null default CURRENT_TIMESTAMP, + disk_size integer as ((size + 4095) / 4096 * 4096) +); +create index if not exists Images_accessed on Images(accessed); \ No newline at end of file diff --git a/init_cache.sh b/init_cache.sh new file mode 100755 index 0000000..1afd690 --- /dev/null +++ b/init_cache.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +# This script needs to be run once in order for compile time macros to not +# complain about a missing DB + +# We can trust that our program will initialize the db at runtime the same way +# as it pulls from the same file for initialization + +sqlite3 cache/metadata.sqlite < db_queries/init.sql \ No newline at end of file diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 1085c1f..9b5c91c 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -3,11 +3,13 @@ use bytes::{Buf, Bytes}; use futures::{Stream, StreamExt}; use log::{debug, error}; use once_cell::sync::Lazy; +use serde::Deserialize; +use std::collections::HashMap; use std::fmt::Display; +use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{collections::HashMap, num::NonZeroU64}; use tokio::fs::{create_dir_all, remove_file, File}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::sync::mpsc::UnboundedSender; @@ -16,7 +18,7 @@ use tokio::sync::RwLock; use tokio_stream::wrappers::WatchStream; use tokio_util::codec::{BytesCodec, FramedRead}; -use super::{BoxedImageStream, CacheStream, CacheStreamItem}; +use super::{BoxedImageStream, CacheStream, CacheStreamItem, ImageMetadata}; /// Keeps track of files that are currently being written to. /// @@ -38,26 +40,27 @@ static WRITING_STATUS: Lazy>>> = Lazy::new(|| RwLock::new(HashMap::new())); /// Tries to read from the file, returning a byte stream if it exists -pub async fn read_file(path: &Path) -> Option> { - if path.exists() { - let status = WRITING_STATUS.read().await.get(path).map(Clone::clone); +pub async fn read_file( + path: &Path, +) -> Option> { + let file = File::open(path).await.ok()?; + let std_file = file.try_clone().await.unwrap().into_std().await; - if let Some(status) = status { - Some( - ConcurrentFsStream::new(path, WatchStream::new(status)) - .await - .map(CacheStream::Concurrent), - ) - } else { - Some( - File::open(path) - .await - .map(|f| CacheStream::Completed(FramedRead::new(f, BytesCodec::new()))), - ) - } + let metadata = { + let mut de = serde_json::Deserializer::from_reader(std_file); + ImageMetadata::deserialize(&mut de).ok()? + }; + + let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) { + CacheStream::Concurrent(ConcurrentFsStream::from_file( + file, + WatchStream::new(status), + )) } else { - None - } + CacheStream::Completed(FramedRead::new(file, BytesCodec::new())) + }; + + Some(Ok((stream, metadata))) } /// Maps the input byte stream into one that writes to disk instead, returning @@ -65,6 +68,7 @@ pub async fn read_file(path: &Path) -> Option, ) -> Result { let (tx, rx) = channel(WritingStatus::NotDone); @@ -73,7 +77,7 @@ pub async fn write_file( let mut write_lock = WRITING_STATUS.write().await; let parent = path.parent().unwrap(); create_dir_all(parent).await?; - let file = File::create(path).await?; // we need to make sure the file exists and is truncated. + let file = File::create(path).await?; // we need to make sure the file exists and is truncated. write_lock.insert(path.to_path_buf(), rx.clone()); file }; @@ -84,6 +88,8 @@ pub async fn write_file( let path_buf = path_buf; // moves path buf into async let mut errored = false; let mut bytes_written: u64 = 0; + file.write_all(serde_json::to_string(&metadata).unwrap().as_bytes()) + .await?; while let Some(bytes) = byte_stream.next().await { if let Ok(mut bytes) = bytes { loop { @@ -113,17 +119,19 @@ pub async fn write_file( debug!("writing to file done"); } - let mut write_lock = WRITING_STATUS.write().await; - // This needs to be written atomically with the write lock, else - // it's possible we have an inconsistent state - // - // We don't really care if we have no receivers - if errored { - let _ = tx.send(WritingStatus::Error); - } else { - let _ = tx.send(WritingStatus::Done(bytes_written)); + { + let mut write_lock = WRITING_STATUS.write().await; + // This needs to be written atomically with the write lock, else + // it's possible we have an inconsistent state + // + // We don't really care if we have no receivers + if errored { + let _ = tx.send(WritingStatus::Error); + } else { + let _ = tx.send(WritingStatus::Done(bytes_written)); + } + write_lock.remove(&path_buf); } - write_lock.remove(&path_buf); // notify if let Err(e) = notifier.send(bytes_written) { @@ -154,12 +162,18 @@ impl ConcurrentFsStream { path: &Path, receiver: WatchStream, ) -> Result { - Ok(Self { - file: Box::pin(File::open(path).await?), + File::open(path) + .await + .map(|file| Self::from_file(file, receiver)) + } + + fn from_file(file: File, receiver: WatchStream) -> Self { + Self { + file: Box::pin(file), receiver: Box::pin(receiver), bytes_read: 0, bytes_total: None, - }) + } } } @@ -182,7 +196,7 @@ impl Stream for ConcurrentFsStream { // First, try to read from the file... // TODO: Might be more efficient to have a larger buffer - let mut bytes = [0; 8 * 1024].to_vec(); + let mut bytes = [0; 4 * 1024].to_vec(); let mut buffer = ReadBuf::new(&mut bytes); match self.file.as_mut().poll_read(cx, &mut buffer) { Poll::Ready(Ok(_)) => (), diff --git a/src/cache/generational.rs b/src/cache/generational.rs index 7cf595f..9f63c31 100644 --- a/src/cache/generational.rs +++ b/src/cache/generational.rs @@ -136,15 +136,16 @@ impl GenerationalCache { #[async_trait] impl Cache for GenerationalCache { async fn get( - &mut self, + &self, key: &CacheKey, - ) -> Option> { + ) -> Option> { + todo!(); if self.in_memory.contains(key) { return self .in_memory .get(key) // TODO: get rid of clone? - .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata))); + .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone()))); } if let Some(metadata) = self.on_disk.pop(key) { @@ -182,21 +183,23 @@ impl Cache for GenerationalCache { buffer.shrink_to_fit(); + todo!(); self.disk_cur_size -= buffer.len() as u64; - let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into()); + // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into()); - return Some(self.put(key.clone(), Box::new(image), metadata).await); + // return Some(self.put(key.clone(), Box::new(image), metadata).await); } None } async fn put( - &mut self, + &self, key: CacheKey, mut image: BoxedImageStream, metadata: ImageMetadata, - ) -> Result<(CacheStream, &ImageMetadata), CacheError> { + ) -> Result { + todo!(); let mut hot_evicted = vec![]; let image = { @@ -237,9 +240,10 @@ impl Cache for GenerationalCache { self.push_into_cold(key, image, metadata).await; } - self.get(&key).await.unwrap() + todo!(); + // self.get(&key).await.unwrap() } // noop - async fn increase_usage(&mut self, _amt: u64) {} + async fn increase_usage(&self, _amt: u64) {} } diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs index 8f92fbb..2b1c127 100644 --- a/src/cache/low_mem.rs +++ b/src/cache/low_mem.rs @@ -1,88 +1,128 @@ //! Low memory caching stuff -use std::{path::PathBuf, sync::Arc}; +use std::sync::{atomic::Ordering, Arc}; +use std::{path::PathBuf, sync::atomic::AtomicU64}; use async_trait::async_trait; -use lru::LruCache; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; -use tokio::sync::RwLock; +use futures::StreamExt; +use sqlx::SqlitePool; +use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}; +use tokio_stream::wrappers::ReceiverStream; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; pub struct LowMemCache { - on_disk: LruCache, disk_path: PathBuf, disk_max_size: u64, - disk_cur_size: u64, - master_sender: UnboundedSender, + disk_cur_size: AtomicU64, + file_size_channel_sender: UnboundedSender, + db_update_channel_sender: Sender, +} + +enum DbMessage { + Get(CacheKey), + Put(CacheKey, ImageMetadata), } impl LowMemCache { - /// Constructs a new low memory cache at the provided path and capacity. + /// Constructs a new low memory cache at the provided path and capaci ty. /// This internally spawns a task that will wait for filesystem /// notifications when a file has been written. #[allow(clippy::new_ret_no_self)] - pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc>> { - let (tx, mut rx) = unbounded_channel(); - let new_self: Arc>> = Arc::new(RwLock::new(Box::new(Self { - on_disk: LruCache::unbounded(), + pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc> { + let (file_size_tx, mut file_size_rx) = unbounded_channel(); + let (db_tx, db_rx) = channel(128); + let db_pool = { + let db_url = format!( + "sqlite:{}/metadata.sqlite?mode=rwc", + disk_path.to_str().unwrap() + ); + let db = SqlitePool::connect(&db_url).await.unwrap(); + + // Run db init + sqlx::query_file!("./db_queries/init.sql") + .execute(&mut db.acquire().await.unwrap()) + .await + .unwrap(); + + db + }; + + let new_self: Arc> = Arc::new(Box::new(Self { disk_path, disk_max_size, - disk_cur_size: 0, - master_sender: tx, - }))); + disk_cur_size: AtomicU64::new(0), + file_size_channel_sender: file_size_tx, + db_update_channel_sender: db_tx, + })); // Spawns a new task that continuously listens for events received by // the channel, which informs the low memory cache the total size of the // item that was put into the cache. let new_self_0 = Arc::clone(&new_self); tokio::spawn(async move { - while let Some(new_size) = rx.recv().await { - new_self_0.write().await.increase_usage(new_size).await; + // This will never return None, effectively a loop + while let Some(new_size) = file_size_rx.recv().await { + new_self_0.increase_usage(new_size).await; + } + }); + + // Spawn a new task that will listen for updates to the db. + tokio::spawn(async move { + let db_pool = db_pool; + let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); + while let Some(messages) = recv_stream.next().await { + let mut transaction = db_pool.begin().await.unwrap(); + for message in messages {} + + transaction.commit().await.unwrap(); } }); new_self } - - async fn prune(&mut self) { - todo!() - } } #[async_trait] impl Cache for LowMemCache { async fn get( - &mut self, + &self, key: &CacheKey, - ) -> Option> { - let metadata = self.on_disk.get(key)?; + ) -> Option> { + let channel = self.db_update_channel_sender.clone(); + let key_0 = key.clone(); + + tokio::spawn(async move { channel.send(DbMessage::Get(key_0)).await }); + let path = self.disk_path.clone().join(PathBuf::from(key.clone())); super::fs::read_file(&path) .await - .map(|res| res.map(|stream| (stream, metadata)).map_err(Into::into)) + .map(|res| res.map_err(Into::into)) } async fn put( - &mut self, + &self, key: CacheKey, image: BoxedImageStream, metadata: ImageMetadata, - ) -> Result<(CacheStream, &ImageMetadata), CacheError> { - let path = self.disk_path.clone().join(PathBuf::from(key.clone())); - self.on_disk.put(key.clone(), metadata); - super::fs::write_file(&path, image, self.master_sender.clone()) - .await - .map(move |stream| (stream, self.on_disk.get(&key).unwrap())) - .map_err(Into::into) + ) -> Result { + let channel = self.db_update_channel_sender.clone(); + let key_0 = key.clone(); + tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await }); + + let path = self.disk_path.clone().join(PathBuf::from(key)); + + super::fs::write_file( + &path, + image, + metadata, + self.file_size_channel_sender.clone(), + ) + .await + .map_err(Into::into) } - /// Increments the internal size counter, pruning if the value exceeds the - /// user-defined capacity. - async fn increase_usage(&mut self, amt: u64) { - self.disk_cur_size += amt; - if self.disk_cur_size > self.disk_max_size { - self.prune().await; - } + async fn increase_usage(&self, amt: u64) { + self.disk_cur_size.fetch_add(amt, Ordering::Release); } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 6cd5313..4371453 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -10,13 +10,14 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, FixedOffset}; use fs::ConcurrentFsStream; use futures::{Stream, StreamExt}; +use serde::{Deserialize, Serialize}; use thiserror::Error; +use tokio::fs::File; +use tokio_util::codec::{BytesCodec, FramedRead}; pub use fs::UpstreamError; pub use generational::GenerationalCache; pub use low_mem::LowMemCache; -use tokio::fs::File; -use tokio_util::codec::{BytesCodec, FramedRead}; mod fs; mod generational; @@ -45,16 +46,15 @@ impl From for PathBuf { #[derive(Clone)] pub struct CachedImage(pub Bytes); -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Serialize, Deserialize)] pub struct ImageMetadata { pub content_type: Option, - // If we can guarantee a non-zero u32 here we can save 4 bytes pub content_length: Option, pub last_modified: Option>, } // Confirmed by Ply to be these types: https://link.eddie.sh/ZXfk0 -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Serialize, Deserialize)] pub enum ImageContentType { Png, Jpeg, @@ -147,19 +147,17 @@ pub enum CacheError { #[async_trait] pub trait Cache: Send + Sync { - async fn get( - &mut self, - key: &CacheKey, - ) -> Option>; + async fn get(&self, key: &CacheKey) + -> Option>; async fn put( - &mut self, + &self, key: CacheKey, image: BoxedImageStream, metadata: ImageMetadata, - ) -> Result<(CacheStream, &ImageMetadata), CacheError>; + ) -> Result; - async fn increase_usage(&mut self, amt: u64); + async fn increase_usage(&self, amt: u64); } pub enum CacheStream { diff --git a/src/main.rs b/src/main.rs index 989a83b..5698671 100644 --- a/src/main.rs +++ b/src/main.rs @@ -124,14 +124,14 @@ async fn main() -> Result<(), std::io::Error> { } }); - let cache: Arc>> = if low_mem_mode { - LowMemCache::new(disk_quota, cache_path.clone()) + let cache: Arc> = if low_mem_mode { + LowMemCache::new(disk_quota, cache_path.clone()).await } else { - Arc::new(TokioRwLock::new(Box::new(GenerationalCache::new( + Arc::new(Box::new(GenerationalCache::new( memory_max_size, disk_quota, cache_path.clone(), - )))) + ))) }; let cache = Arc::clone(&cache); let cache1 = Arc::clone(&cache); diff --git a/src/routes.rs b/src/routes.rs index 8af5fd2..bc6283d 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -52,7 +52,7 @@ impl Responder for ServerResponse { #[get("/{token}/data/{chapter_hash}/{file_name}")] async fn token_data( state: Data, - cache: Data>>, + cache: Data>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -69,7 +69,7 @@ async fn token_data( #[get("/{token}/data-saver/{chapter_hash}/{file_name}")] async fn token_data_saver( state: Data, - cache: Data>>, + cache: Data>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -187,16 +187,16 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder { #[allow(clippy::future_not_send)] async fn fetch_image( state: Data, - cache: Data>>, + cache: Data>, chapter_hash: String, file_name: String, is_data_saver: bool, ) -> ServerResponse { let key = CacheKey(chapter_hash, file_name, is_data_saver); - match cache.write().await.get(&key).await { + match cache.get(&key).await { Some(Ok((image, metadata))) => { - return construct_response(image, metadata); + return construct_response(image, &metadata); } Some(Err(_)) => { return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish()); @@ -263,9 +263,9 @@ async fn fetch_image( debug!("Inserting into cache"); let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap(); - let (stream, metadata) = { - match cache.write().await.put(key, Box::new(body), metadata).await { - Ok((stream, metadata)) => (stream, *metadata), + let stream = { + match cache.put(key, Box::new(body), metadata).await { + Ok(stream) => stream, Err(e) => { warn!("Failed to insert into cache: {}", e); return ServerResponse::HttpResponse(