diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 3954663..1335eb5 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -1,7 +1,7 @@ use actix_web::error::PayloadError; use bytes::{Buf, Bytes}; use futures::{Future, Stream, StreamExt}; -use log::{debug, error}; +use log::debug; use once_cell::sync::Lazy; use serde::Deserialize; use std::collections::HashMap; @@ -13,7 +13,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{create_dir_all, remove_file, File}; use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf}; -use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch::{channel, Receiver}; use tokio::sync::RwLock; use tokio_stream::wrappers::WatchStream; @@ -76,7 +75,6 @@ pub async fn write_file< path: &Path, mut byte_stream: BoxedImageStream, metadata: ImageMetadata, - notifier: UnboundedSender, db_callback: F, ) -> Result { let (tx, rx) = channel(WritingStatus::NotDone); @@ -142,14 +140,6 @@ pub async fn write_file< write_lock.remove(&path_buf); } - // notify - if let Err(e) = notifier.send(bytes_written) { - error!( - "Failed to notify cache of new entry size: {}. Cache no longer can prune FS!", - e - ); - } - tokio::spawn(db_callback(bytes_written)); // We don't ever check this, so the return value doesn't matter diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs index ea9f353..7b42d21 100644 --- a/src/cache/low_mem.rs +++ b/src/cache/low_mem.rs @@ -11,7 +11,7 @@ use log::{warn, LevelFilter}; use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool}; use tokio::{ fs::remove_file, - sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}, + sync::mpsc::{channel, Sender}, }; use tokio_stream::wrappers::ReceiverStream; @@ -20,7 +20,6 @@ use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMet pub struct LowMemCache { disk_path: PathBuf, disk_cur_size: AtomicU64, - file_size_channel_sender: UnboundedSender, db_update_channel_sender: Sender, } @@ -35,7 +34,6 @@ impl LowMemCache { /// notifications when a file has been written. #[allow(clippy::new_ret_no_self)] pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc> { - let (file_size_tx, mut file_size_rx) = unbounded_channel(); let (db_tx, db_rx) = channel(128); let db_pool = { let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_str().unwrap()); @@ -57,7 +55,6 @@ impl LowMemCache { let new_self: Arc> = Arc::new(Box::new(Self { disk_path, disk_cur_size: AtomicU64::new(0), - file_size_channel_sender: file_size_tx, db_update_channel_sender: db_tx, })); @@ -65,36 +62,11 @@ impl LowMemCache { // the channel, which informs the low memory cache the total size of the // item that was put into the cache. let new_self_0 = Arc::clone(&new_self); - let db_pool_0 = db_pool.clone(); - tokio::spawn(async move { - let db_pool = db_pool_0; - let max_on_disk_size = disk_max_size / 20 * 19; - // This will never return None, effectively a loop - while let Some(new_size) = file_size_rx.recv().await { - new_self_0.increase_usage(new_size); - if new_self_0.on_disk_size() >= max_on_disk_size { - let mut conn = db_pool.acquire().await.unwrap(); - let items = sqlx::query!( - "select id, size from Images order by accessed asc limit 1000" - ) - .fetch_all(&mut conn) - .await - .unwrap(); - - let mut size_freed = 0; - for item in items { - size_freed += item.size as u64; - tokio::spawn(remove_file(item.id)); - } - - new_self_0.decrease_usage(size_freed); - } - } - }); // Spawn a new task that will listen for updates to the db. tokio::spawn(async move { let db_pool = db_pool; + let max_on_disk_size = disk_max_size / 20 * 19; let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); while let Some(messages) = recv_stream.next().await { let now = chrono::Utc::now(); @@ -127,10 +99,30 @@ impl LowMemCache { if let Err(e) = query { warn!("Failed to add {:?} to db: {}", key, e); } + + new_self_0.increase_usage(size); } } } transaction.commit().await.unwrap(); + + if new_self_0.on_disk_size() >= max_on_disk_size { + let mut conn = db_pool.acquire().await.unwrap(); + let items = sqlx::query!( + "select id, size from Images order by accessed asc limit 1000" + ) + .fetch_all(&mut conn) + .await + .unwrap(); + + let mut size_freed = 0; + for item in items { + size_freed += item.size as u64; + tokio::spawn(remove_file(item.id)); + } + + new_self_0.decrease_usage(size_freed); + } } }); @@ -174,15 +166,9 @@ impl Cache for LowMemCache { let db_callback = |size: u32| async move { let _ = channel.send(DbMessage::Put(path_0, size)).await; }; - super::fs::write_file( - &path, - image, - metadata, - self.file_size_channel_sender.clone(), - db_callback, - ) - .await - .map_err(Into::into) + super::fs::write_file(&path, image, metadata, db_callback) + .await + .map_err(Into::into) } #[inline]