remove unnecessary channel

This commit is contained in:
Edward Shen 2021-04-22 21:34:31 -04:00
parent 758b0ec78d
commit 8d95fe3f07
Signed by: edward
GPG key ID: 19182661E818369F
2 changed files with 26 additions and 50 deletions

12
src/cache/fs.rs vendored
View file

@ -1,7 +1,7 @@
use actix_web::error::PayloadError; use actix_web::error::PayloadError;
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use futures::{Future, Stream, StreamExt}; use futures::{Future, Stream, StreamExt};
use log::{debug, error}; use log::debug;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
@ -13,7 +13,6 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch::{channel, Receiver}; use tokio::sync::watch::{channel, Receiver};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
@ -76,7 +75,6 @@ pub async fn write_file<
path: &Path, path: &Path,
mut byte_stream: BoxedImageStream, mut byte_stream: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
notifier: UnboundedSender<u32>,
db_callback: F, db_callback: F,
) -> Result<CacheStream, std::io::Error> { ) -> Result<CacheStream, std::io::Error> {
let (tx, rx) = channel(WritingStatus::NotDone); let (tx, rx) = channel(WritingStatus::NotDone);
@ -142,14 +140,6 @@ pub async fn write_file<
write_lock.remove(&path_buf); write_lock.remove(&path_buf);
} }
// notify
if let Err(e) = notifier.send(bytes_written) {
error!(
"Failed to notify cache of new entry size: {}. Cache no longer can prune FS!",
e
);
}
tokio::spawn(db_callback(bytes_written)); tokio::spawn(db_callback(bytes_written));
// We don't ever check this, so the return value doesn't matter // We don't ever check this, so the return value doesn't matter

60
src/cache/low_mem.rs vendored
View file

@ -11,7 +11,7 @@ use log::{warn, LevelFilter};
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool}; use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
use tokio::{ use tokio::{
fs::remove_file, fs::remove_file,
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}, sync::mpsc::{channel, Sender},
}; };
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
@ -20,7 +20,6 @@ use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMet
pub struct LowMemCache { pub struct LowMemCache {
disk_path: PathBuf, disk_path: PathBuf,
disk_cur_size: AtomicU64, disk_cur_size: AtomicU64,
file_size_channel_sender: UnboundedSender<u32>,
db_update_channel_sender: Sender<DbMessage>, db_update_channel_sender: Sender<DbMessage>,
} }
@ -35,7 +34,6 @@ impl LowMemCache {
/// notifications when a file has been written. /// notifications when a file has been written.
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Box<dyn Cache>> { pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Box<dyn Cache>> {
let (file_size_tx, mut file_size_rx) = unbounded_channel();
let (db_tx, db_rx) = channel(128); let (db_tx, db_rx) = channel(128);
let db_pool = { let db_pool = {
let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_str().unwrap()); let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_str().unwrap());
@ -57,7 +55,6 @@ impl LowMemCache {
let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self { let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self {
disk_path, disk_path,
disk_cur_size: AtomicU64::new(0), disk_cur_size: AtomicU64::new(0),
file_size_channel_sender: file_size_tx,
db_update_channel_sender: db_tx, db_update_channel_sender: db_tx,
})); }));
@ -65,36 +62,11 @@ impl LowMemCache {
// the channel, which informs the low memory cache the total size of the // the channel, which informs the low memory cache the total size of the
// item that was put into the cache. // item that was put into the cache.
let new_self_0 = Arc::clone(&new_self); let new_self_0 = Arc::clone(&new_self);
let db_pool_0 = db_pool.clone();
tokio::spawn(async move {
let db_pool = db_pool_0;
let max_on_disk_size = disk_max_size / 20 * 19;
// This will never return None, effectively a loop
while let Some(new_size) = file_size_rx.recv().await {
new_self_0.increase_usage(new_size);
if new_self_0.on_disk_size() >= max_on_disk_size {
let mut conn = db_pool.acquire().await.unwrap();
let items = sqlx::query!(
"select id, size from Images order by accessed asc limit 1000"
)
.fetch_all(&mut conn)
.await
.unwrap();
let mut size_freed = 0;
for item in items {
size_freed += item.size as u64;
tokio::spawn(remove_file(item.id));
}
new_self_0.decrease_usage(size_freed);
}
}
});
// Spawn a new task that will listen for updates to the db. // Spawn a new task that will listen for updates to the db.
tokio::spawn(async move { tokio::spawn(async move {
let db_pool = db_pool; let db_pool = db_pool;
let max_on_disk_size = disk_max_size / 20 * 19;
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
while let Some(messages) = recv_stream.next().await { while let Some(messages) = recv_stream.next().await {
let now = chrono::Utc::now(); let now = chrono::Utc::now();
@ -127,10 +99,30 @@ impl LowMemCache {
if let Err(e) = query { if let Err(e) = query {
warn!("Failed to add {:?} to db: {}", key, e); warn!("Failed to add {:?} to db: {}", key, e);
} }
new_self_0.increase_usage(size);
} }
} }
} }
transaction.commit().await.unwrap(); transaction.commit().await.unwrap();
if new_self_0.on_disk_size() >= max_on_disk_size {
let mut conn = db_pool.acquire().await.unwrap();
let items = sqlx::query!(
"select id, size from Images order by accessed asc limit 1000"
)
.fetch_all(&mut conn)
.await
.unwrap();
let mut size_freed = 0;
for item in items {
size_freed += item.size as u64;
tokio::spawn(remove_file(item.id));
}
new_self_0.decrease_usage(size_freed);
}
} }
}); });
@ -174,13 +166,7 @@ impl Cache for LowMemCache {
let db_callback = |size: u32| async move { let db_callback = |size: u32| async move {
let _ = channel.send(DbMessage::Put(path_0, size)).await; let _ = channel.send(DbMessage::Put(path_0, size)).await;
}; };
super::fs::write_file( super::fs::write_file(&path, image, metadata, db_callback)
&path,
image,
metadata,
self.file_size_channel_sender.clone(),
db_callback,
)
.await .await
.map_err(Into::into) .map_err(Into::into)
} }