From 8556f379044bf448f9f964c072d36edc9b743ecd Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Thu, 15 Jul 2021 21:33:52 -0400 Subject: [PATCH] Extract internal cache listener to function --- src/cache/disk.rs | 2 ++ src/cache/mem.rs | 79 ++++++++++++++++++++++++++--------------------- 2 files changed, 45 insertions(+), 36 deletions(-) diff --git a/src/cache/disk.rs b/src/cache/disk.rs index cf779e7..34d7bee 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -125,6 +125,8 @@ async fn db_listener( db_pool: SqlitePool, max_on_disk_size: u64, ) { + // This is in a receiver stream to process up to 128 simultaneous db updates + // in one transaction let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); while let Some(messages) = recv_stream.next().await { let mut transaction = match db_pool.begin().await { diff --git a/src/cache/mem.rs b/src/cache/mem.rs index 49d87c2..96c2fed 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -7,7 +7,7 @@ use bytes::Bytes; use futures::FutureExt; use lfu_cache::LfuCache; use lru::LruCache; -use tokio::sync::mpsc::{channel, Sender}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; type CacheValue = (Bytes, ImageMetadata, u64); @@ -84,7 +84,7 @@ where ColdCache: 'static + Cache, { pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc { - let (tx, mut rx) = channel(100); + let (tx, rx) = channel(100); let new_self = Arc::new(Self { inner, cur_mem_size: AtomicU64::new(0), @@ -92,45 +92,52 @@ where master_sender: tx, }); - let new_self_0 = Arc::clone(&new_self); - tokio::spawn(async move { - let new_self = new_self_0; - let max_mem_size = max_mem_size.get() / 20 * 19; - while let Some((key, bytes, metadata, size)) = rx.recv().await { - // Add to memory cache - // We can add first because we constrain our memory usage to 95% - new_self - .cur_mem_size - .fetch_add(size as u64, Ordering::Release); - new_self - .mem_cache - .lock() - .await - .push(key, (bytes, metadata, size)); - - // Pop if too large - while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 { - let popped = new_self - .mem_cache - .lock() - .await - .pop() - .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)); - if let Some((_, _, _, size)) = popped { - new_self - .cur_mem_size - .fetch_sub(size as u64, Ordering::Release); - } else { - break; - } - } - } - }); + tokio::spawn(internal_cache_listener( + Arc::clone(&new_self), + max_mem_size, + rx, + )); new_self } } +async fn internal_cache_listener( + cache: Arc>, + max_mem_size: crate::units::Bytes, + mut rx: Receiver<(CacheKey, Bytes, ImageMetadata, u64)>, +) where + MemoryCacheImpl: InternalMemoryCache, + ColdCache: Cache, +{ + let max_mem_size = max_mem_size.get() / 20 * 19; + while let Some((key, bytes, metadata, size)) = rx.recv().await { + // Add to memory cache + // We can add first because we constrain our memory usage to 95% + cache.cur_mem_size.fetch_add(size as u64, Ordering::Release); + cache + .mem_cache + .lock() + .await + .push(key, (bytes, metadata, size)); + + // Pop if too large + while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 { + let popped = cache + .mem_cache + .lock() + .await + .pop() + .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)); + if let Some((_, _, _, size)) = popped { + cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release); + } else { + break; + } + } + } +} + #[async_trait] impl Cache for MemoryCache where