From 288872e84b3ca424d85b99d128d1ee2e9e19a0bb Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Thu, 22 Apr 2021 21:21:32 -0400 Subject: [PATCH] Add db queries --- db_queries/init.sql | 3 +- init_cache.sh | 1 + src/cache/fs.rs | 28 ++++++++----- src/cache/low_mem.rs | 95 +++++++++++++++++++++++++++++++++++++------- src/cache/mod.rs | 6 ++- 5 files changed, 104 insertions(+), 29 deletions(-) diff --git a/db_queries/init.sql b/db_queries/init.sql index ac2b552..a69d222 100644 --- a/db_queries/init.sql +++ b/db_queries/init.sql @@ -1,7 +1,6 @@ create table if not exists Images( id varchar primary key not null, size integer not null, - accessed timestamp not null default CURRENT_TIMESTAMP, - disk_size integer as ((size + 4095) / 4096 * 4096) + accessed timestamp not null default CURRENT_TIMESTAMP ); create index if not exists Images_accessed on Images(accessed); \ No newline at end of file diff --git a/init_cache.sh b/init_cache.sh index 1afd690..17ec19b 100755 --- a/init_cache.sh +++ b/init_cache.sh @@ -6,4 +6,5 @@ # We can trust that our program will initialize the db at runtime the same way # as it pulls from the same file for initialization +mkdir cache sqlite3 cache/metadata.sqlite < db_queries/init.sql \ No newline at end of file diff --git a/src/cache/fs.rs b/src/cache/fs.rs index c5d22fb..3954663 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -1,13 +1,13 @@ use actix_web::error::PayloadError; use bytes::{Buf, Bytes}; -use futures::{Stream, StreamExt}; +use futures::{Future, Stream, StreamExt}; use log::{debug, error}; use once_cell::sync::Lazy; use serde::Deserialize; use std::collections::HashMap; use std::fmt::Display; use std::io::SeekFrom; -use std::num::NonZeroU64; +use std::num::NonZeroU32; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -69,11 +69,15 @@ pub async fn read_file( /// Maps the input byte stream into one that writes to disk instead, returning /// a stream that reads from disk instead. -pub async fn write_file( +pub async fn write_file< + Fut: 'static + Send + Sync + Future, + F: 'static + Send + Sync + FnOnce(u32) -> Fut, +>( path: &Path, mut byte_stream: BoxedImageStream, metadata: ImageMetadata, - notifier: UnboundedSender, + notifier: UnboundedSender, + db_callback: F, ) -> Result { let (tx, rx) = channel(WritingStatus::NotDone); @@ -93,7 +97,7 @@ pub async fn write_file( tokio::spawn(async move { let path_buf = path_buf; // moves path buf into async let mut errored = false; - let mut bytes_written: u64 = 0; + let mut bytes_written: u32 = 0; file.write_all(&metadata.as_bytes()).await?; while let Some(bytes) = byte_stream.next().await { if let Ok(mut bytes) = bytes { @@ -103,7 +107,7 @@ pub async fn write_file( n => { bytes.advance(n); // We don't care if we don't have receivers - bytes_written += n as u64; + bytes_written += n as u32; let _ = tx.send(WritingStatus::NotDone); } } @@ -146,6 +150,8 @@ pub async fn write_file( ); } + tokio::spawn(db_callback(bytes_written)); + // We don't ever check this, so the return value doesn't matter Ok::<_, std::io::Error>(()) }); @@ -158,8 +164,8 @@ pub async fn write_file( pub struct ConcurrentFsStream { file: Pin>>, receiver: Pin>>, - bytes_read: u64, - bytes_total: Option, + bytes_read: u32, + bytes_total: Option, } impl ConcurrentFsStream { @@ -225,7 +231,7 @@ impl Stream for ConcurrentFsStream { if let Poll::Ready(Some(WritingStatus::Done(n))) = self.receiver.as_mut().poll_next_unpin(cx) { - self.bytes_total = Some(NonZeroU64::new(n).unwrap()) + self.bytes_total = Some(NonZeroU32::new(n).unwrap()) } // Okay, now we know if we've read enough bytes or not. If the @@ -248,7 +254,7 @@ impl Stream for ConcurrentFsStream { Poll::Ready(Some(Ok(Bytes::new()))) } else { // We have data! Give it to the reader! - self.bytes_read += filled as u64; + self.bytes_read += filled as u32; bytes.truncate(filled); Poll::Ready(Some(Ok(bytes.into()))) } @@ -265,6 +271,6 @@ impl From for actix_web::Error { #[derive(Debug, Clone, Copy)] enum WritingStatus { NotDone, - Done(u64), + Done(u32), Error, } diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs index 73f0f32..0c90a91 100644 --- a/src/cache/low_mem.rs +++ b/src/cache/low_mem.rs @@ -1,29 +1,32 @@ //! Low memory caching stuff +use std::path::PathBuf; use std::str::FromStr; -use std::sync::{atomic::Ordering, Arc}; -use std::{path::PathBuf, sync::atomic::AtomicU64}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use async_trait::async_trait; use futures::StreamExt; -use log::LevelFilter; +use log::{warn, LevelFilter}; use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool}; -use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}; +use tokio::{ + fs::remove_file, + sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}, +}; use tokio_stream::wrappers::ReceiverStream; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; pub struct LowMemCache { disk_path: PathBuf, - disk_max_size: u64, disk_cur_size: AtomicU64, - file_size_channel_sender: UnboundedSender, + file_size_channel_sender: UnboundedSender, db_update_channel_sender: Sender, } enum DbMessage { Get(Arc), - Put(Arc, ImageMetadata), + Put(PathBuf, u32), } impl LowMemCache { @@ -53,7 +56,6 @@ impl LowMemCache { let new_self: Arc> = Arc::new(Box::new(Self { disk_path, - disk_max_size, disk_cur_size: AtomicU64::new(0), file_size_channel_sender: file_size_tx, db_update_channel_sender: db_tx, @@ -63,10 +65,30 @@ impl LowMemCache { // the channel, which informs the low memory cache the total size of the // item that was put into the cache. let new_self_0 = Arc::clone(&new_self); + let db_pool_0 = db_pool.clone(); tokio::spawn(async move { + let db_pool = db_pool_0; + let max_on_disk_size = disk_max_size / 20 * 19; // This will never return None, effectively a loop while let Some(new_size) = file_size_rx.recv().await { - new_self_0.increase_usage(new_size).await; + new_self_0.increase_usage(new_size); + if new_self_0.on_disk_size() >= max_on_disk_size { + let mut conn = db_pool.acquire().await.unwrap(); + let items = sqlx::query!( + "select id, size from Images order by accessed asc limit 1000" + ) + .fetch_all(&mut conn) + .await + .unwrap(); + + let mut size_freed = 0; + for item in items { + size_freed += item.size as u64; + tokio::spawn(remove_file(item.id)); + } + + new_self_0.decrease_usage(size_freed); + } } }); @@ -75,9 +97,39 @@ impl LowMemCache { let db_pool = db_pool; let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); while let Some(messages) = recv_stream.next().await { + let now = chrono::Utc::now(); let mut transaction = db_pool.begin().await.unwrap(); - for message in messages {} - + for message in messages { + match message { + DbMessage::Get(entry) => { + let key = entry.to_string(); + let query = sqlx::query!( + "update Images set accessed = ? where id = ?", + now, + key + ) + .execute(&mut transaction) + .await; + if let Err(e) = query { + warn!("Failed to update timestamp in db for {}: {}", key, e); + } + } + DbMessage::Put(entry, size) => { + let key = entry.as_os_str().to_str(); + let query = sqlx::query!( + "insert into Images (id, size, accessed) values (?, ?, ?)", + key, + size, + now, + ) + .execute(&mut transaction) + .await; + if let Err(e) = query { + warn!("Failed to add {:?} to db: {}", key, e); + } + } + } + } transaction.commit().await.unwrap(); } }); @@ -113,22 +165,35 @@ impl Cache for LowMemCache { metadata: ImageMetadata, ) -> Result { let channel = self.db_update_channel_sender.clone(); - let key_0 = Arc::clone(&key); - tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await }); let path = self.disk_path.clone().join(PathBuf::from(key.as_ref())); + let path_0 = path.clone(); + let db_callback = |size: u32| async move { + let _ = channel.send(DbMessage::Put(path_0, size)).await; + }; super::fs::write_file( &path, image, metadata, self.file_size_channel_sender.clone(), + db_callback, ) .await .map_err(Into::into) } - async fn increase_usage(&self, amt: u64) { - self.disk_cur_size.fetch_add(amt, Ordering::Release); + #[inline] + fn increase_usage(&self, amt: u32) { + self.disk_cur_size.fetch_add(amt as u64, Ordering::Release); + } + + #[inline] + fn on_disk_size(&self) -> u64 { + (self.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096 + } + + fn decrease_usage(&self, amt: u64) { + self.disk_cur_size.fetch_sub(amt, Ordering::Release); } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index ec21e53..a467d54 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -164,7 +164,11 @@ pub trait Cache: Send + Sync { metadata: ImageMetadata, ) -> Result; - async fn increase_usage(&self, amt: u64); + fn increase_usage(&self, amt: u32); + + fn decrease_usage(&self, amt: u64); + + fn on_disk_size(&self) -> u64; } pub enum CacheStream {