Move task to separate function

This commit is contained in:
Edward Shen 2021-04-22 21:55:26 -04:00
parent a86cd3edf5
commit 77cd416849
Signed by: edward
GPG key ID: 19182661E818369F
3 changed files with 64 additions and 62 deletions

122
src/cache/low_mem.rs vendored
View file

@ -10,8 +10,8 @@ use futures::StreamExt;
use log::{warn, LevelFilter}; use log::{warn, LevelFilter};
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
use sqlx::{ConnectOptions, SqlitePool}; use sqlx::{ConnectOptions, SqlitePool};
use tokio::fs::remove_file;
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::{channel, Sender};
use tokio::{fs::remove_file, sync::mpsc::Receiver};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
@ -57,76 +57,77 @@ impl LowMemCache {
db_update_channel_sender: db_tx, db_update_channel_sender: db_tx,
})); }));
// Spawns a new task that continuously listens for events received by tokio::spawn(db_listener(
// the channel, which informs the low memory cache the total size of the Arc::clone(&new_self),
// item that was put into the cache. db_rx,
let new_self_0 = Arc::clone(&new_self); db_pool,
disk_max_size / 20 * 19,
));
// Spawn a new task that will listen for updates to the db, pruning if new_self
// the size becomes too large }
tokio::spawn(async move { }
let db_pool = db_pool;
let max_on_disk_size = disk_max_size / 20 * 19; /// Spawn a new task that will listen for updates to the db, pruning if the size
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); /// becomes too large.
while let Some(messages) = recv_stream.next().await { async fn db_listener(
let now = chrono::Utc::now(); cache: Arc<Box<dyn Cache>>,
let mut transaction = db_pool.begin().await.unwrap(); db_rx: Receiver<DbMessage>,
for message in messages { db_pool: SqlitePool,
match message { max_on_disk_size: u64,
DbMessage::Get(entry) => { ) {
let key = entry.as_os_str().to_str(); let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
let query = sqlx::query!( while let Some(messages) = recv_stream.next().await {
"update Images set accessed = ? where id = ?", let now = chrono::Utc::now();
now, let mut transaction = db_pool.begin().await.unwrap();
key for message in messages {
) match message {
DbMessage::Get(entry) => {
let key = entry.as_os_str().to_str();
let query =
sqlx::query!("update Images set accessed = ? where id = ?", now, key)
.execute(&mut transaction) .execute(&mut transaction)
.await; .await;
if let Err(e) = query { if let Err(e) = query {
warn!("Failed to update timestamp in db for {:?}: {}", key, e); warn!("Failed to update timestamp in db for {:?}: {}", key, e);
}
}
DbMessage::Put(entry, size) => {
let key = entry.as_os_str().to_str();
let query = sqlx::query!(
"insert into Images (id, size, accessed) values (?, ?, ?)",
key,
size,
now,
)
.execute(&mut transaction)
.await;
if let Err(e) = query {
warn!("Failed to add {:?} to db: {}", key, e);
}
new_self_0.increase_usage(size);
}
} }
} }
transaction.commit().await.unwrap(); DbMessage::Put(entry, size) => {
let key = entry.as_os_str().to_str();
if new_self_0.on_disk_size() >= max_on_disk_size { let query = sqlx::query!(
let mut conn = db_pool.acquire().await.unwrap(); "insert into Images (id, size, accessed) values (?, ?, ?)",
let items = sqlx::query!( key,
"select id, size from Images order by accessed asc limit 1000" size,
now,
) )
.execute(&mut transaction)
.await;
if let Err(e) = query {
warn!("Failed to add {:?} to db: {}", key, e);
}
cache.increase_usage(size);
}
}
}
transaction.commit().await.unwrap();
if cache.on_disk_size() >= max_on_disk_size {
let mut conn = db_pool.acquire().await.unwrap();
let items =
sqlx::query!("select id, size from Images order by accessed asc limit 1000")
.fetch_all(&mut conn) .fetch_all(&mut conn)
.await .await
.unwrap(); .unwrap();
let mut size_freed = 0; let mut size_freed = 0;
for item in items { for item in items {
size_freed += item.size as u64; size_freed += item.size as u64;
tokio::spawn(remove_file(item.id)); tokio::spawn(remove_file(item.id));
}
new_self_0.decrease_usage(size_freed);
}
} }
});
new_self cache.decrease_usage(size_freed);
}
} }
} }
@ -150,7 +151,7 @@ impl Cache for LowMemCache {
async fn put( async fn put(
&self, &self,
key: &CacheKey, key: CacheKey,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> { ) -> Result<CacheStream, CacheError> {
@ -162,6 +163,7 @@ impl Cache for LowMemCache {
let db_callback = |size: u32| async move { let db_callback = |size: u32| async move {
let _ = channel.send(DbMessage::Put(path_0, size)).await; let _ = channel.send(DbMessage::Put(path_0, size)).await;
}; };
super::fs::write_file(&path, image, metadata, db_callback) super::fs::write_file(&path, image, metadata, db_callback)
.await .await
.map_err(Into::into) .map_err(Into::into)

2
src/cache/mod.rs vendored
View file

@ -157,7 +157,7 @@ pub trait Cache: Send + Sync {
async fn put( async fn put(
&self, &self,
key: &CacheKey, key: CacheKey,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError>; ) -> Result<CacheStream, CacheError>;

View file

@ -263,7 +263,7 @@ async fn fetch_image(
let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap(); let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap();
let stream = { let stream = {
match cache.put(&key, Box::new(body), metadata).await { match cache.put(key, Box::new(body), metadata).await {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
warn!("Failed to insert into cache: {}", e); warn!("Failed to insert into cache: {}", e);