Extract internal cache listener to function

This commit is contained in:
Edward Shen 2021-07-15 21:33:52 -04:00
parent fc930285f0
commit 8556f37904
Signed by: edward
GPG key ID: 19182661E818369F
2 changed files with 45 additions and 36 deletions

2
src/cache/disk.rs vendored
View file

@ -125,6 +125,8 @@ async fn db_listener(
db_pool: SqlitePool, db_pool: SqlitePool,
max_on_disk_size: u64, max_on_disk_size: u64,
) { ) {
// This is in a receiver stream to process up to 128 simultaneous db updates
// in one transaction
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
while let Some(messages) = recv_stream.next().await { while let Some(messages) = recv_stream.next().await {
let mut transaction = match db_pool.begin().await { let mut transaction = match db_pool.begin().await {

79
src/cache/mem.rs vendored
View file

@ -7,7 +7,7 @@ use bytes::Bytes;
use futures::FutureExt; use futures::FutureExt;
use lfu_cache::LfuCache; use lfu_cache::LfuCache;
use lru::LruCache; use lru::LruCache;
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
type CacheValue = (Bytes, ImageMetadata, u64); type CacheValue = (Bytes, ImageMetadata, u64);
@ -84,7 +84,7 @@ where
ColdCache: 'static + Cache, ColdCache: 'static + Cache,
{ {
pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> { pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
let (tx, mut rx) = channel(100); let (tx, rx) = channel(100);
let new_self = Arc::new(Self { let new_self = Arc::new(Self {
inner, inner,
cur_mem_size: AtomicU64::new(0), cur_mem_size: AtomicU64::new(0),
@ -92,45 +92,52 @@ where
master_sender: tx, master_sender: tx,
}); });
let new_self_0 = Arc::clone(&new_self); tokio::spawn(internal_cache_listener(
tokio::spawn(async move { Arc::clone(&new_self),
let new_self = new_self_0; max_mem_size,
let max_mem_size = max_mem_size.get() / 20 * 19; rx,
while let Some((key, bytes, metadata, size)) = rx.recv().await { ));
// Add to memory cache
// We can add first because we constrain our memory usage to 95%
new_self
.cur_mem_size
.fetch_add(size as u64, Ordering::Release);
new_self
.mem_cache
.lock()
.await
.push(key, (bytes, metadata, size));
// Pop if too large
while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
let popped = new_self
.mem_cache
.lock()
.await
.pop()
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
if let Some((_, _, _, size)) = popped {
new_self
.cur_mem_size
.fetch_sub(size as u64, Ordering::Release);
} else {
break;
}
}
}
});
new_self new_self
} }
} }
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
cache: Arc<MemoryCache<MemoryCacheImpl, ColdCache>>,
max_mem_size: crate::units::Bytes,
mut rx: Receiver<(CacheKey, Bytes, ImageMetadata, u64)>,
) where
MemoryCacheImpl: InternalMemoryCache,
ColdCache: Cache,
{
let max_mem_size = max_mem_size.get() / 20 * 19;
while let Some((key, bytes, metadata, size)) = rx.recv().await {
// Add to memory cache
// We can add first because we constrain our memory usage to 95%
cache.cur_mem_size.fetch_add(size as u64, Ordering::Release);
cache
.mem_cache
.lock()
.await
.push(key, (bytes, metadata, size));
// Pop if too large
while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
let popped = cache
.mem_cache
.lock()
.await
.pop()
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
if let Some((_, _, _, size)) = popped {
cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release);
} else {
break;
}
}
}
}
#[async_trait] #[async_trait]
impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache> impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
where where