diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs index 1f9c447..7a7ebf6 100644 --- a/src/cache/low_mem.rs +++ b/src/cache/low_mem.rs @@ -10,8 +10,8 @@ use futures::StreamExt; use log::{warn, LevelFilter}; use sqlx::sqlite::SqliteConnectOptions; use sqlx::{ConnectOptions, SqlitePool}; -use tokio::fs::remove_file; use tokio::sync::mpsc::{channel, Sender}; +use tokio::{fs::remove_file, sync::mpsc::Receiver}; use tokio_stream::wrappers::ReceiverStream; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; @@ -57,76 +57,77 @@ impl LowMemCache { db_update_channel_sender: db_tx, })); - // Spawns a new task that continuously listens for events received by - // the channel, which informs the low memory cache the total size of the - // item that was put into the cache. - let new_self_0 = Arc::clone(&new_self); + tokio::spawn(db_listener( + Arc::clone(&new_self), + db_rx, + db_pool, + disk_max_size / 20 * 19, + )); - // Spawn a new task that will listen for updates to the db, pruning if - // the size becomes too large - tokio::spawn(async move { - let db_pool = db_pool; - let max_on_disk_size = disk_max_size / 20 * 19; - let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); - while let Some(messages) = recv_stream.next().await { - let now = chrono::Utc::now(); - let mut transaction = db_pool.begin().await.unwrap(); - for message in messages { - match message { - DbMessage::Get(entry) => { - let key = entry.as_os_str().to_str(); - let query = sqlx::query!( - "update Images set accessed = ? where id = ?", - now, - key - ) + new_self + } +} + +/// Spawn a new task that will listen for updates to the db, pruning if the size +/// becomes too large. +async fn db_listener( + cache: Arc>, + db_rx: Receiver, + db_pool: SqlitePool, + max_on_disk_size: u64, +) { + let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); + while let Some(messages) = recv_stream.next().await { + let now = chrono::Utc::now(); + let mut transaction = db_pool.begin().await.unwrap(); + for message in messages { + match message { + DbMessage::Get(entry) => { + let key = entry.as_os_str().to_str(); + let query = + sqlx::query!("update Images set accessed = ? where id = ?", now, key) .execute(&mut transaction) .await; - if let Err(e) = query { - warn!("Failed to update timestamp in db for {:?}: {}", key, e); - } - } - DbMessage::Put(entry, size) => { - let key = entry.as_os_str().to_str(); - let query = sqlx::query!( - "insert into Images (id, size, accessed) values (?, ?, ?)", - key, - size, - now, - ) - .execute(&mut transaction) - .await; - if let Err(e) = query { - warn!("Failed to add {:?} to db: {}", key, e); - } - - new_self_0.increase_usage(size); - } + if let Err(e) = query { + warn!("Failed to update timestamp in db for {:?}: {}", key, e); } } - transaction.commit().await.unwrap(); - - if new_self_0.on_disk_size() >= max_on_disk_size { - let mut conn = db_pool.acquire().await.unwrap(); - let items = sqlx::query!( - "select id, size from Images order by accessed asc limit 1000" + DbMessage::Put(entry, size) => { + let key = entry.as_os_str().to_str(); + let query = sqlx::query!( + "insert into Images (id, size, accessed) values (?, ?, ?)", + key, + size, + now, ) + .execute(&mut transaction) + .await; + if let Err(e) = query { + warn!("Failed to add {:?} to db: {}", key, e); + } + + cache.increase_usage(size); + } + } + } + transaction.commit().await.unwrap(); + + if cache.on_disk_size() >= max_on_disk_size { + let mut conn = db_pool.acquire().await.unwrap(); + let items = + sqlx::query!("select id, size from Images order by accessed asc limit 1000") .fetch_all(&mut conn) .await .unwrap(); - let mut size_freed = 0; - for item in items { - size_freed += item.size as u64; - tokio::spawn(remove_file(item.id)); - } - - new_self_0.decrease_usage(size_freed); - } + let mut size_freed = 0; + for item in items { + size_freed += item.size as u64; + tokio::spawn(remove_file(item.id)); } - }); - new_self + cache.decrease_usage(size_freed); + } } } @@ -150,7 +151,7 @@ impl Cache for LowMemCache { async fn put( &self, - key: &CacheKey, + key: CacheKey, image: BoxedImageStream, metadata: ImageMetadata, ) -> Result { @@ -162,6 +163,7 @@ impl Cache for LowMemCache { let db_callback = |size: u32| async move { let _ = channel.send(DbMessage::Put(path_0, size)).await; }; + super::fs::write_file(&path, image, metadata, db_callback) .await .map_err(Into::into) diff --git a/src/cache/mod.rs b/src/cache/mod.rs index e4d33df..cb65ab7 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -157,7 +157,7 @@ pub trait Cache: Send + Sync { async fn put( &self, - key: &CacheKey, + key: CacheKey, image: BoxedImageStream, metadata: ImageMetadata, ) -> Result; diff --git a/src/routes.rs b/src/routes.rs index be88540..27a8a83 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -263,7 +263,7 @@ async fn fetch_image( let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap(); let stream = { - match cache.put(&key, Box::new(body), metadata).await { + match cache.put(key, Box::new(body), metadata).await { Ok(stream) => stream, Err(e) => { warn!("Failed to insert into cache: {}", e);