diff --git a/src/cache/mem_lfu.rs b/src/cache/mem.rs similarity index 70% rename from src/cache/mem_lfu.rs rename to src/cache/mem.rs index 9311d77..6bcd428 100644 --- a/src/cache/mem_lfu.rs +++ b/src/cache/mem.rs @@ -9,18 +9,31 @@ use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; use lfu_cache::LfuCache; +use lru::LruCache; use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::Mutex; +type CacheValue = (Bytes, ImageMetadata, usize); + +pub type Lru = LruCache; +pub type Lfu = LfuCache; + +pub trait InternalMemoryCache: Sync + Send { + fn unbounded() -> Self; + fn get(&mut self, key: &CacheKey) -> Option<&CacheValue>; + fn push(&mut self, key: CacheKey, data: CacheValue); + fn pop(&mut self) -> Option<(CacheKey, CacheValue)>; +} + /// Memory accelerated disk cache. Uses an LRU in memory to speed up reads. -pub struct MemoryLfuCache { +pub struct MemoryCache { inner: Arc>, cur_mem_size: AtomicU64, - mem_cache: Mutex>, + mem_cache: Mutex, master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, } -impl MemoryLfuCache { +impl MemoryCache { #[allow(clippy::new_ret_no_self)] pub async fn new( disk_max_size: u64, @@ -31,7 +44,7 @@ impl MemoryLfuCache { let new_self = Arc::new(Box::new(Self { inner: DiskCache::new(disk_max_size, disk_path).await, cur_mem_size: AtomicU64::new(0), - mem_cache: Mutex::new(LfuCache::unbounded()), + mem_cache: Mutex::new(InternalCacheImpl::unbounded()), master_sender: tx, }) as Box); @@ -57,7 +70,7 @@ impl MemoryLfuCache { } #[async_trait] -impl Cache for MemoryLfuCache { +impl Cache for MemoryCache { #[inline] async fn get( &self, @@ -131,7 +144,7 @@ impl Cache for MemoryLfuCache { self.mem_cache .lock() .await - .insert(key, (image, metadata, size)); + .push(key, (image, metadata, size)); } #[inline] @@ -139,7 +152,51 @@ impl Cache for MemoryLfuCache { self.mem_cache .lock() .await - .pop_lfu_key_value() + .pop() .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)) } } + +impl InternalMemoryCache for Lfu { + #[inline] + fn unbounded() -> Self { + Self::unbounded() + } + + #[inline] + fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> { + self.get(key) + } + + #[inline] + fn push(&mut self, key: CacheKey, data: CacheValue) { + self.insert(key, data); + } + + #[inline] + fn pop(&mut self) -> Option<(CacheKey, CacheValue)> { + self.pop_lfu_key_value() + } +} + +impl InternalMemoryCache for Lru { + #[inline] + fn unbounded() -> Self { + Self::unbounded() + } + + #[inline] + fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> { + self.get(key) + } + + #[inline] + fn push(&mut self, key: CacheKey, data: CacheValue) { + self.put(key, data); + } + + #[inline] + fn pop(&mut self) -> Option<(CacheKey, CacheValue)> { + self.pop_lru() + } +} diff --git a/src/cache/mem_lru.rs b/src/cache/mem_lru.rs deleted file mode 100644 index 9f868a2..0000000 --- a/src/cache/mem_lru.rs +++ /dev/null @@ -1,145 +0,0 @@ -use std::path::PathBuf; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; - -use crate::cache::DiskCache; - -use super::{BoxedImageStream, Cache, CacheKey, CacheStream, ImageMetadata, MemStream}; -use async_trait::async_trait; -use bytes::Bytes; -use futures::FutureExt; -use lru::LruCache; -use tokio::sync::mpsc::{channel, Sender}; -use tokio::sync::Mutex; - -/// Memory accelerated disk cache. Uses an LRU in memory to speed up reads. -pub struct MemoryLruCache { - inner: Arc>, - cur_mem_size: AtomicU64, - mem_cache: Mutex>, - master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, -} - -impl MemoryLruCache { - #[allow(clippy::new_ret_no_self)] - pub async fn new( - disk_max_size: u64, - disk_path: PathBuf, - max_mem_size: u64, - ) -> Arc> { - let (tx, mut rx) = channel(100); - let new_self = Arc::new(Box::new(Self { - inner: DiskCache::new(disk_max_size, disk_path).await, - cur_mem_size: AtomicU64::new(0), - mem_cache: Mutex::new(LruCache::unbounded()), - master_sender: tx, - }) as Box); - - let new_self_0 = Arc::clone(&new_self); - tokio::spawn(async move { - let new_self = new_self_0; - let max_mem_size = max_mem_size / 20 * 19; - while let Some((key, bytes, metadata, size)) = rx.recv().await { - new_self.increase_usage(size as u32); - new_self.put_internal(key, bytes, metadata, size).await; - while new_self.mem_size() >= max_mem_size { - if let Some((_, _, _, size)) = new_self.pop_memory().await { - new_self.decrease_usage(size as u64); - } else { - break; - } - } - } - }); - - new_self - } -} - -#[async_trait] -impl Cache for MemoryLruCache { - #[inline] - async fn get( - &self, - key: &CacheKey, - ) -> Option> { - match self.mem_cache.lock().now_or_never() { - Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| { - Ok((CacheStream::Memory(MemStream(bytes.clone())), *metadata)) - }) { - Some(v) => Some(v), - None => self.inner.get(key).await, - }, - None => self.inner.get(key).await, - } - } - - #[inline] - async fn put( - &self, - key: CacheKey, - image: BoxedImageStream, - metadata: ImageMetadata, - ) -> Result { - self.inner - .put_with_on_completed_callback(key, image, metadata, self.master_sender.clone()) - .await - } - - #[inline] - fn increase_usage(&self, amt: u32) { - self.cur_mem_size - .fetch_add(u64::from(amt), Ordering::Release); - } - - #[inline] - fn decrease_usage(&self, amt: u64) { - self.cur_mem_size.fetch_sub(amt, Ordering::Release); - } - - #[inline] - fn on_disk_size(&self) -> u64 { - self.inner.on_disk_size() - } - - #[inline] - fn mem_size(&self) -> u64 { - self.cur_mem_size.load(Ordering::Acquire) - } - - #[inline] - async fn put_with_on_completed_callback( - &self, - key: CacheKey, - image: BoxedImageStream, - metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, - ) -> Result { - self.inner - .put_with_on_completed_callback(key, image, metadata, on_complete) - .await - } - - #[inline] - async fn put_internal( - &self, - key: CacheKey, - image: Bytes, - metadata: ImageMetadata, - size: usize, - ) { - self.mem_cache - .lock() - .await - .put(key, (image, metadata, size)); - } - - #[inline] - async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> { - self.mem_cache - .lock() - .await - .pop_lru() - .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)) - } -} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index b79821a..ce5a6d9 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -19,13 +19,11 @@ use tokio_util::codec::{BytesCodec, FramedRead}; pub use disk::DiskCache; pub use fs::UpstreamError; -pub use mem_lfu::MemoryLfuCache; -pub use mem_lru::MemoryLruCache; +pub use mem::MemoryCache; mod disk; mod fs; -mod mem_lfu; -mod mem_lru; +pub mod mem; #[derive(PartialEq, Eq, Hash, Clone)] pub struct CacheKey(pub String, pub String, pub bool); diff --git a/src/main.rs b/src/main.rs index b4921c6..ae5f9ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,11 +27,9 @@ use state::{RwLockServerState, ServerState}; use stop::send_stop; use thiserror::Error; +use crate::cache::{mem, MemoryCache}; +use crate::config::UnstableOptions; use crate::state::DynamicServerCert; -use crate::{ - cache::{MemoryLfuCache, MemoryLruCache}, - config::UnstableOptions, -}; mod cache; mod config; @@ -141,9 +139,9 @@ async fn main() -> Result<(), Box> { DiskCache::new(disk_quota, cache_path.clone()).await } else { if use_lfu { - MemoryLfuCache::new(disk_quota, cache_path.clone(), memory_max_size).await + MemoryCache::::new(disk_quota, cache_path.clone(), memory_max_size).await } else { - MemoryLruCache::new(disk_quota, cache_path.clone(), memory_max_size).await + MemoryCache::::new(disk_quota, cache_path.clone(), memory_max_size).await } };