From 9904ba2cfcafc70d1fb4ec5536f84bfda9b36ea6 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Mon, 19 Apr 2021 22:01:32 -0400 Subject: [PATCH] use callback based pruning --- .gitignore | 4 +- Cargo.lock | 152 ++++++-------------------------------- Cargo.toml | 4 +- src/cache/fs.rs | 131 ++++++++++++++++---------------- src/cache/generational.rs | 3 + src/cache/low_mem.rs | 41 ++++++++-- src/cache/mod.rs | 5 +- src/main.rs | 19 ++--- 8 files changed, 136 insertions(+), 223 deletions(-) diff --git a/.gitignore b/.gitignore index 00f8953..13f631d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target .env -cache \ No newline at end of file +cache +flamegraph*.svg +perf.data* \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 4d852f4..9acb15b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ "actix-tls", "actix-utils", "ahash 0.7.2", - "base64 0.13.0", + "base64", "bitflags", "brotli2", "bytes", @@ -54,7 +54,7 @@ dependencies = [ "rand", "regex", "serde", - "sha-1 0.9.4", + "sha-1", "smallvec", "time 0.2.26", "tokio", @@ -268,15 +268,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b" -[[package]] -name = "base64" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e" -dependencies = [ - "byteorder", -] - [[package]] name = "base64" version = "0.13.0" @@ -298,34 +289,13 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" -[[package]] -name = "block-buffer" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" -dependencies = [ - "block-padding", - "byte-tools", - "byteorder", - "generic-array 0.12.4", -] - [[package]] name = "block-buffer" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "generic-array 0.14.4", -] - -[[package]] -name = "block-padding" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" -dependencies = [ - "byte-tools", + "generic-array", ] [[package]] @@ -354,18 +324,6 @@ version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" -[[package]] -name = "byte-tools" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" - -[[package]] -name = "byteorder" -version = "1.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" - [[package]] name = "bytes" version = "1.0.1" @@ -511,22 +469,13 @@ dependencies = [ "syn", ] -[[package]] -name = "digest" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" -dependencies = [ - "generic-array 0.12.4", -] - [[package]] name = "digest" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" dependencies = [ - "generic-array 0.14.4", + "generic-array", ] [[package]] @@ -556,12 +505,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "fake-simd" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" - [[package]] name = "flate2" version = "1.0.20" @@ -682,15 +625,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" -dependencies = [ - "typenum", -] - [[package]] name = "generic-array" version = "0.14.4" @@ -758,12 +692,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hex" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" - [[package]] name = "http" version = "0.2.4" @@ -968,7 +896,7 @@ version = "0.2.1" dependencies = [ "actix-web", "async-trait", - "base64 0.13.0", + "base64", "bincode", "bytes", "chrono", @@ -986,9 +914,9 @@ dependencies = [ "serde_json", "simple_logger", "sodiumoxide", - "ssri", "thiserror", "tokio", + "tokio-stream", "tokio-util", "url", ] @@ -1099,12 +1027,6 @@ version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" -[[package]] -name = "opaque-debug" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" - [[package]] name = "opaque-debug" version = "0.3.0" @@ -1324,7 +1246,7 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2296f2fac53979e8ccbc4a1136b25dcefd37be9ed7e4a1f6b05a6029c84ff124" dependencies = [ - "base64 0.13.0", + "base64", "bytes", "encoding_rs", "futures-core", @@ -1384,7 +1306,7 @@ version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" dependencies = [ - "base64 0.13.0", + "base64", "log", "ring", "sct", @@ -1471,29 +1393,17 @@ dependencies = [ "serde", ] -[[package]] -name = "sha-1" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df" -dependencies = [ - "block-buffer 0.7.3", - "digest 0.8.1", - "fake-simd", - "opaque-debug 0.2.3", -] - [[package]] name = "sha-1" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f" dependencies = [ - "block-buffer 0.9.0", + "block-buffer", "cfg-if", "cpuid-bool", - "digest 0.9.0", - "opaque-debug 0.3.0", + "digest", + "opaque-debug", ] [[package]] @@ -1502,18 +1412,6 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" -[[package]] -name = "sha2" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69" -dependencies = [ - "block-buffer 0.7.3", - "digest 0.8.1", - "fake-simd", - "opaque-debug 0.2.3", -] - [[package]] name = "signal-hook-registry" version = "1.3.0" @@ -1575,22 +1473,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "ssri" -version = "5.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e875e58f12c33f832d09cff94b079aee84a48d8188d7a7132b3434358afb70c9" -dependencies = [ - "base64 0.10.1", - "digest 0.8.1", - "hex", - "serde", - "serde_derive", - "sha-1 0.8.2", - "sha2", - "thiserror", -] - [[package]] name = "standback" version = "0.2.17" @@ -1820,6 +1702,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.6.6" diff --git a/Cargo.toml b/Cargo.toml index 024f8b2..edb9d4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,12 +29,12 @@ serde = "1" serde_json = "1" simple_logger = "1" sodiumoxide = "0.2" -ssri = "5" thiserror = "1" tokio = { version = "1", features = [ "full", "parking_lot" ] } +tokio-stream = { version = "0.1", features = [ "sync" ] } tokio-util = { version = "0.6", features = [ "codec" ] } url = { version = "2", features = [ "serde" ] } [profile.release] lto = true -codegen-units = 1 +codegen-units = 1 \ No newline at end of file diff --git a/src/cache/fs.rs b/src/cache/fs.rs index f845950..f5b8342 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -1,19 +1,19 @@ use actix_web::error::PayloadError; +use bytes::Buf; use futures::{Stream, StreamExt}; -use log::debug; +use log::{debug, error}; use once_cell::sync::Lazy; use std::collections::HashMap; use std::fmt::Display; use std::path::{Path, PathBuf}; use std::pin::Pin; -use std::sync::atomic::{AtomicU8, Ordering}; -use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Duration; use tokio::fs::{create_dir_all, remove_file, File}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::watch::{channel, Receiver}; use tokio::sync::RwLock; -use tokio::time::Interval; +use tokio_stream::wrappers::WatchStream; use tokio_util::codec::{BytesCodec, FramedRead}; use super::{BoxedImageStream, CacheStream, CacheStreamItem}; @@ -34,17 +34,17 @@ use super::{BoxedImageStream, CacheStream, CacheStreamItem}; /// We effectively use `WRITING_STATUS` as a status relay to ensure concurrent /// reads to the file while it's being written to will wait for writing to be /// completed. -static WRITING_STATUS: Lazy>>> = +static WRITING_STATUS: Lazy>>> = Lazy::new(|| RwLock::new(HashMap::new())); /// Tries to read from the file, returning a byte stream if it exists pub async fn read_file(path: &Path) -> Option> { if path.exists() { - let status = WRITING_STATUS.read().await.get(path).map(Arc::clone); + let status = WRITING_STATUS.read().await.get(path).map(Clone::clone); if let Some(status) = status { Some( - ConcurrentFsStream::new(path, status) + ConcurrentFsStream::new(path, WatchStream::new(status)) .await .map(CacheStream::Concurrent), ) @@ -65,27 +65,38 @@ pub async fn read_file(path: &Path) -> Option, ) -> Result { - let done_writing_flag = Arc::new(CacheStatus::new()); + let (tx, rx) = channel(WritingStatus::NotDone); let mut file = { let mut write_lock = WRITING_STATUS.write().await; let parent = path.parent().unwrap(); create_dir_all(parent).await?; - let file = File::create(path).await?; // we need to make sure the file exists and is truncated. - write_lock.insert(path.to_path_buf(), Arc::clone(&done_writing_flag)); + let file = File::create(path).await?; // we need to make sure the file exists and is truncated. + write_lock.insert(path.to_path_buf(), rx.clone()); file }; - let write_flag = Arc::clone(&done_writing_flag); // need owned variant because async lifetime let path_buf = path.to_path_buf(); tokio::spawn(async move { let path_buf = path_buf; // moves path buf into async let mut errored = false; + let mut bytes_written: u64 = 0; while let Some(bytes) = byte_stream.next().await { - if let Ok(bytes) = bytes { - file.write_all(&bytes).await? + if let Ok(mut bytes) = bytes { + loop { + match file.write(&bytes).await? { + 0 => break, + n => { + bytes.advance(n); + // We don't care if we don't have receivers + bytes_written += n as u64; + let _ = tx.send(WritingStatus::NotDone); + } + } + } } else { errored = true; break; @@ -105,35 +116,45 @@ pub async fn write_file( let mut write_lock = WRITING_STATUS.write().await; // This needs to be written atomically with the write lock, else // it's possible we have an inconsistent state + // + // We don't really care if we have no receivers if errored { - write_flag.store(WritingStatus::Error); + let _ = tx.send(WritingStatus::Error); } else { - write_flag.store(WritingStatus::Done); + let _ = tx.send(WritingStatus::Done); } write_lock.remove(&path_buf); + // notify + if let Err(e) = notifier.send(bytes_written) { + error!( + "Failed to notify cache of new entry size: {}. Cache no longer can prune FS!", + e + ); + } + // We don't ever check this, so the return value doesn't matter Ok::<_, std::io::Error>(()) }); Ok(CacheStream::Concurrent( - ConcurrentFsStream::new(path, done_writing_flag).await?, + ConcurrentFsStream::new(path, WatchStream::new(rx)).await?, )) } pub struct ConcurrentFsStream { file: Pin>, - sleep: Pin>, - is_file_done_writing: Arc, + receiver: Pin>>, } impl ConcurrentFsStream { - async fn new(path: &Path, is_done: Arc) -> Result { + async fn new( + path: &Path, + receiver: WatchStream, + ) -> Result { Ok(Self { file: Box::pin(File::open(path).await?), - // 0.5ms - sleep: Box::pin(tokio::time::interval(Duration::from_micros(250))), - is_file_done_writing: is_done, + receiver: Box::pin(receiver), }) } } @@ -154,30 +175,29 @@ impl Stream for ConcurrentFsStream { type Item = CacheStreamItem; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let status = self.is_file_done_writing.load(); - - let mut bytes = [0; 1460].to_vec(); - let mut buffer = ReadBuf::new(&mut bytes); - let polled_result = self.file.as_mut().poll_read(cx, &mut buffer); - let filled = buffer.filled().len(); - match (status, filled) { - // Prematurely reached EOF, schedule a poll in the future - (WritingStatus::NotDone, 0) => { - let _ = self.sleep.as_mut().poll_tick(cx); - Poll::Pending - } - // We got an error, abort the read. - (WritingStatus::Error, _) => Poll::Ready(Some(Err(UpstreamError))), - _ => { - bytes.truncate(filled); - polled_result.map(|_| { - if bytes.is_empty() { - None - } else { - Some(Ok(bytes.into())) + match self.receiver.as_mut().poll_next_unpin(cx) { + Poll::Ready(status) => { + let mut bytes = [0; 1460].to_vec(); + let mut buffer = ReadBuf::new(&mut bytes); + let polled_result = self.file.as_mut().poll_read(cx, &mut buffer); + let filled = buffer.filled().len(); + match (status, filled) { + (Some(WritingStatus::NotDone), 0) => Poll::Pending, + // We got an error, abort the read. + (Some(WritingStatus::Error), _) => Poll::Ready(Some(Err(UpstreamError))), + _ => { + bytes.truncate(filled); + polled_result.map(|_| { + if bytes.is_empty() { + None + } else { + Some(Ok(bytes.into())) + } + }) } - }) + } } + Poll::Pending => Poll::Pending, } } } @@ -189,26 +209,7 @@ impl From for actix_web::Error { } } -struct CacheStatus(AtomicU8); - -impl CacheStatus { - #[inline] - const fn new() -> Self { - Self(AtomicU8::new(WritingStatus::NotDone as u8)) - } - - #[inline] - fn store(&self, status: WritingStatus) { - self.0.store(status as u8, Ordering::Release); - } - - #[inline] - fn load(&self) -> WritingStatus { - self.0.load(Ordering::Acquire).into() - } -} - -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum WritingStatus { NotDone = 0, Done, diff --git a/src/cache/generational.rs b/src/cache/generational.rs index 5630fd8..7cf595f 100644 --- a/src/cache/generational.rs +++ b/src/cache/generational.rs @@ -239,4 +239,7 @@ impl Cache for GenerationalCache { self.get(&key).await.unwrap() } + + // noop + async fn increase_usage(&mut self, _amt: u64) {} } diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs index 485a699..a7f7218 100644 --- a/src/cache/low_mem.rs +++ b/src/cache/low_mem.rs @@ -1,10 +1,11 @@ //! Low memory caching stuff -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use async_trait::async_trait; -use log::warn; use lru::LruCache; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::RwLock; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; @@ -13,16 +14,37 @@ pub struct LowMemCache { disk_path: PathBuf, disk_max_size: u64, disk_cur_size: u64, + master_sender: UnboundedSender, } impl LowMemCache { - pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Self { - Self { + pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc>> { + let (tx, mut rx) = unbounded_channel(); + let new_self: Arc>> = Arc::new(RwLock::new(Box::new(Self { on_disk: LruCache::unbounded(), disk_path, disk_max_size, disk_cur_size: 0, - } + master_sender: tx, + }))); + + let new_self_0 = Arc::clone(&new_self); + tokio::spawn(async move { + loop { + let new_size = match rx.recv().await { + Some(v) => v, + None => break, + }; + + new_self_0.write().await.increase_usage(new_size).await; + } + }); + + new_self.clone() + } + + async fn prune(&mut self) { + todo!() } } @@ -47,13 +69,16 @@ impl Cache for LowMemCache { ) -> Result<(CacheStream, &ImageMetadata), CacheError> { let path = self.disk_path.clone().join(PathBuf::from(key.clone())); self.on_disk.put(key.clone(), metadata); - super::fs::write_file(&path, image) + super::fs::write_file(&path, image, self.master_sender.clone()) .await .map(move |stream| (stream, self.on_disk.get(&key).unwrap())) .map_err(Into::into) } - async fn prune(&mut self) { - warn!("Trimming has not been implemented yet. Cache is unbounded!"); + async fn increase_usage(&mut self, amt: u64) { + self.disk_cur_size += amt; + if self.disk_cur_size > self.disk_max_size { + self.prune().await; + } } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 8d41945..aa0a2a2 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -10,7 +10,6 @@ use bytes::Bytes; use chrono::{DateTime, FixedOffset}; use fs::ConcurrentFsStream; use futures::{Stream, StreamExt}; -use log::debug; use thiserror::Error; pub use fs::UpstreamError; @@ -159,9 +158,7 @@ pub trait Cache: Send + Sync { metadata: ImageMetadata, ) -> Result<(CacheStream, &ImageMetadata), CacheError>; - async fn prune(&mut self) { - debug!("Would trim but cache does not implement trimming!"); - } + async fn increase_usage(&mut self, amt: u64); } pub enum CacheStream { diff --git a/src/main.rs b/src/main.rs index 16c95b7..4d069dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -120,27 +120,18 @@ async fn main() -> Result<(), std::io::Error> { } }); - let cache: Box = if low_mem_mode { - Box::new(LowMemCache::new(disk_quota, cache_path.clone())) + let cache: Arc>> = if low_mem_mode { + LowMemCache::new(disk_quota, cache_path.clone()) } else { - Box::new(GenerationalCache::new( + Arc::new(TokioRwLock::new(Box::new(GenerationalCache::new( memory_max_size, disk_quota, cache_path.clone(), - )) + )))) }; - let cache = Arc::new(TokioRwLock::new(cache)); + let cache = Arc::clone(&cache); let cache1 = Arc::clone(&cache); - // Spawn periodic cache trimming - spawn(async move { - let mut interval = time::interval(Duration::from_secs(3 * 60)); - loop { - interval.tick().await; - cache.write().await.prune().await; - } - }); - // Start HTTPS server HttpServer::new(move || { App::new()