diff --git a/src/cache/disk.rs b/src/cache/disk.rs index 09f1a78..f8aca83 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -15,7 +15,9 @@ use tokio::fs::remove_file; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_stream::wrappers::ReceiverStream; -use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; +use super::{ + BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, +}; pub struct DiskCache { disk_path: PathBuf, @@ -112,7 +114,9 @@ async fn db_listener( warn!("Failed to add {:?} to db: {}", key, e); } - cache.increase_usage(size); + cache + .disk_cur_size + .fetch_add(u64::from(size), Ordering::Release); } } } @@ -124,7 +128,8 @@ async fn db_listener( ); } - if cache.on_disk_size() >= max_on_disk_size { + let on_disk_size = (cache.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096; + if on_disk_size >= max_on_disk_size { let mut conn = match db_pool.acquire().await { Ok(conn) => conn, Err(e) => { @@ -163,7 +168,7 @@ async fn db_listener( tokio::spawn(remove_file(item.id)); } - cache.decrease_usage(size_freed); + cache.disk_cur_size.fetch_sub(size_freed, Ordering::Release); } } } @@ -211,28 +216,10 @@ impl Cache for DiskCache { CacheStream::new(inner, maybe_header).map_err(|_| CacheError::DecryptionFailure) }) } +} - #[inline] - fn increase_usage(&self, amt: u32) { - self.disk_cur_size - .fetch_add(u64::from(amt), Ordering::Release); - } - - #[inline] - fn decrease_usage(&self, amt: u64) { - self.disk_cur_size.fetch_sub(amt, Ordering::Release); - } - - #[inline] - fn on_disk_size(&self) -> u64 { - (self.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096 - } - - #[inline] - fn mem_size(&self) -> u64 { - 0 - } - +#[async_trait] +impl CallbackCache for DiskCache { async fn put_with_on_completed_callback( &self, key: CacheKey, @@ -257,14 +244,4 @@ impl Cache for DiskCache { CacheStream::new(inner, maybe_header).map_err(|_| CacheError::DecryptionFailure) }) } - - #[inline] - async fn put_internal(&self, _: CacheKey, _: Bytes, _: ImageMetadata, _: usize) { - unimplemented!() - } - - #[inline] - async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> { - None - } } diff --git a/src/cache/fs.rs b/src/cache/fs.rs index e67359c..e4527da 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -236,7 +236,7 @@ where tokio::spawn(async move { let path_buf = path_buf; // moves path buf into async let mut errored = false; - let mut bytes_written: u32 = 0; + let mut bytes_written = 0; let mut acc_bytes = BytesMut::new(); let accumulate = on_complete.is_some(); writer.write_all(metadata_string.as_bytes()).await?; diff --git a/src/cache/mem.rs b/src/cache/mem.rs index bc0c2d0..d4e50dc 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -2,8 +2,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use super::{ - BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata, InnerStream, - MemStream, + BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, + InnerStream, MemStream, }; use async_trait::async_trait; use bytes::Bytes; @@ -98,14 +98,28 @@ impl= max_mem_size { - if let Some((_, _, _, size)) = new_self.pop_memory().await { - new_self.decrease_usage(size as u64); + .cur_mem_size + .fetch_add(size as u64, Ordering::Release); + new_self + .mem_cache + .lock() + .await + .push(key, (bytes, metadata, size)); + + // Pop if too large + while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size { + let popped = new_self + .mem_cache + .lock() + .await + .pop() + .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)); + if let Some((_, _, _, size)) = popped { + new_self + .cur_mem_size + .fetch_sub(size as u64, Ordering::Release); } else { break; } @@ -118,8 +132,10 @@ impl Cache - for MemoryCache +impl Cache for MemoryCache +where + InternalCacheImpl: InternalMemoryCache, + InnerCache: CallbackCache, { #[inline] async fn get( @@ -152,61 +168,4 @@ impl Cache .put_with_on_completed_callback(key, image, metadata, self.master_sender.clone()) .await } - - #[inline] - fn increase_usage(&self, amt: u32) { - self.cur_mem_size - .fetch_add(u64::from(amt), Ordering::Release); - } - - #[inline] - fn decrease_usage(&self, amt: u64) { - self.cur_mem_size.fetch_sub(amt, Ordering::Release); - } - - #[inline] - fn on_disk_size(&self) -> u64 { - self.inner.on_disk_size() - } - - #[inline] - fn mem_size(&self) -> u64 { - self.cur_mem_size.load(Ordering::Acquire) - } - - #[inline] - async fn put_with_on_completed_callback( - &self, - key: CacheKey, - image: BoxedImageStream, - metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, - ) -> Result { - self.inner - .put_with_on_completed_callback(key, image, metadata, on_complete) - .await - } - - #[inline] - async fn put_internal( - &self, - key: CacheKey, - image: Bytes, - metadata: ImageMetadata, - size: usize, - ) { - self.mem_cache - .lock() - .await - .push(key, (image, metadata, size)); - } - - #[inline] - async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> { - self.mem_cache - .lock() - .await - .pop() - .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)) - } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index c3457d2..30968eb 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -172,37 +172,6 @@ pub trait Cache: Send + Sync { image: BoxedImageStream, metadata: ImageMetadata, ) -> Result; - - /// Increases the size of the cache. This is a double-dispatch method, so - /// see specific implementations for complete detail. This only accepts a - /// u32 as all files should be smaller than a u32 and some cache - /// implementations can only handle up to a u32. - fn increase_usage(&self, amt: u32); - - /// Decreases the size of the cache. This is a double-dispatch method, so - /// see specific implementations for complete detail. - fn decrease_usage(&self, amt: u64); - - /// Reports the on-disk size of the cache. - fn on_disk_size(&self) -> u64; - - /// Reports the memory size of the cache. - fn mem_size(&self) -> u64; - - async fn put_with_on_completed_callback( - &self, - key: CacheKey, - image: BoxedImageStream, - metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, - ) -> Result; - - /// Double-dispatch method. Used by cache implementations that require a - /// completed entry to put items into their cache. - async fn put_internal(&self, key: CacheKey, image: Bytes, metadata: ImageMetadata, size: usize); - - /// Pops an entry from the memory cache, if one exists. - async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>; } #[async_trait] @@ -224,27 +193,21 @@ impl Cache for std::sync::Arc { ) -> Result { self.as_ref().put(key, image, metadata).await } +} - #[inline] - fn increase_usage(&self, amt: u32) { - self.as_ref().increase_usage(amt) - } - - #[inline] - fn decrease_usage(&self, amt: u64) { - self.as_ref().decrease_usage(amt) - } - - #[inline] - fn on_disk_size(&self) -> u64 { - self.as_ref().on_disk_size() - } - - #[inline] - fn mem_size(&self) -> u64 { - self.as_ref().mem_size() - } +#[async_trait] +pub trait CallbackCache: Cache { + async fn put_with_on_completed_callback( + &self, + key: CacheKey, + image: BoxedImageStream, + metadata: ImageMetadata, + on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, + ) -> Result; +} +#[async_trait] +impl CallbackCache for std::sync::Arc { #[inline] async fn put_with_on_completed_callback( &self, @@ -257,22 +220,6 @@ impl Cache for std::sync::Arc { .put_with_on_completed_callback(key, image, metadata, on_complete) .await } - - #[inline] - async fn put_internal( - &self, - key: CacheKey, - image: Bytes, - metadata: ImageMetadata, - size: usize, - ) { - self.as_ref().put_internal(key, image, metadata, size).await - } - - #[inline] - async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> { - self.as_ref().pop_memory().await - } } pub struct CacheStream { diff --git a/src/routes.rs b/src/routes.rs index 0bd41b9..56f6895 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -55,7 +55,7 @@ impl Responder for ServerResponse { #[get("/{token}/data/{chapter_hash}/{file_name}")] async fn token_data( state: Data, - cache: Data>, + cache: Data, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -71,7 +71,7 @@ async fn token_data( #[get("/{token}/data-saver/{chapter_hash}/{file_name}")] async fn token_data_saver( state: Data, - cache: Data>, + cache: Data, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -189,7 +189,7 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder { #[allow(clippy::future_not_send)] async fn fetch_image( state: Data, - cache: Data>, + cache: Data, chapter_hash: String, file_name: String, is_data_saver: bool,