Add db queries

This commit is contained in:
Edward Shen 2021-04-22 21:21:32 -04:00
parent b01fa618b4
commit 288872e84b
Signed by: edward
GPG key ID: 19182661E818369F
5 changed files with 104 additions and 29 deletions

View file

@ -1,7 +1,6 @@
create table if not exists Images( create table if not exists Images(
id varchar primary key not null, id varchar primary key not null,
size integer not null, size integer not null,
accessed timestamp not null default CURRENT_TIMESTAMP, accessed timestamp not null default CURRENT_TIMESTAMP
disk_size integer as ((size + 4095) / 4096 * 4096)
); );
create index if not exists Images_accessed on Images(accessed); create index if not exists Images_accessed on Images(accessed);

View file

@ -6,4 +6,5 @@
# We can trust that our program will initialize the db at runtime the same way # We can trust that our program will initialize the db at runtime the same way
# as it pulls from the same file for initialization # as it pulls from the same file for initialization
mkdir cache
sqlite3 cache/metadata.sqlite < db_queries/init.sql sqlite3 cache/metadata.sqlite < db_queries/init.sql

28
src/cache/fs.rs vendored
View file

@ -1,13 +1,13 @@
use actix_web::error::PayloadError; use actix_web::error::PayloadError;
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use futures::{Stream, StreamExt}; use futures::{Future, Stream, StreamExt};
use log::{debug, error}; use log::{debug, error};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::num::NonZeroU64; use std::num::NonZeroU32;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -69,11 +69,15 @@ pub async fn read_file(
/// Maps the input byte stream into one that writes to disk instead, returning /// Maps the input byte stream into one that writes to disk instead, returning
/// a stream that reads from disk instead. /// a stream that reads from disk instead.
pub async fn write_file( pub async fn write_file<
Fut: 'static + Send + Sync + Future<Output = ()>,
F: 'static + Send + Sync + FnOnce(u32) -> Fut,
>(
path: &Path, path: &Path,
mut byte_stream: BoxedImageStream, mut byte_stream: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
notifier: UnboundedSender<u64>, notifier: UnboundedSender<u32>,
db_callback: F,
) -> Result<CacheStream, std::io::Error> { ) -> Result<CacheStream, std::io::Error> {
let (tx, rx) = channel(WritingStatus::NotDone); let (tx, rx) = channel(WritingStatus::NotDone);
@ -93,7 +97,7 @@ pub async fn write_file(
tokio::spawn(async move { tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async let path_buf = path_buf; // moves path buf into async
let mut errored = false; let mut errored = false;
let mut bytes_written: u64 = 0; let mut bytes_written: u32 = 0;
file.write_all(&metadata.as_bytes()).await?; file.write_all(&metadata.as_bytes()).await?;
while let Some(bytes) = byte_stream.next().await { while let Some(bytes) = byte_stream.next().await {
if let Ok(mut bytes) = bytes { if let Ok(mut bytes) = bytes {
@ -103,7 +107,7 @@ pub async fn write_file(
n => { n => {
bytes.advance(n); bytes.advance(n);
// We don't care if we don't have receivers // We don't care if we don't have receivers
bytes_written += n as u64; bytes_written += n as u32;
let _ = tx.send(WritingStatus::NotDone); let _ = tx.send(WritingStatus::NotDone);
} }
} }
@ -146,6 +150,8 @@ pub async fn write_file(
); );
} }
tokio::spawn(db_callback(bytes_written));
// We don't ever check this, so the return value doesn't matter // We don't ever check this, so the return value doesn't matter
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
}); });
@ -158,8 +164,8 @@ pub async fn write_file(
pub struct ConcurrentFsStream { pub struct ConcurrentFsStream {
file: Pin<Box<BufReader<File>>>, file: Pin<Box<BufReader<File>>>,
receiver: Pin<Box<WatchStream<WritingStatus>>>, receiver: Pin<Box<WatchStream<WritingStatus>>>,
bytes_read: u64, bytes_read: u32,
bytes_total: Option<NonZeroU64>, bytes_total: Option<NonZeroU32>,
} }
impl ConcurrentFsStream { impl ConcurrentFsStream {
@ -225,7 +231,7 @@ impl Stream for ConcurrentFsStream {
if let Poll::Ready(Some(WritingStatus::Done(n))) = if let Poll::Ready(Some(WritingStatus::Done(n))) =
self.receiver.as_mut().poll_next_unpin(cx) self.receiver.as_mut().poll_next_unpin(cx)
{ {
self.bytes_total = Some(NonZeroU64::new(n).unwrap()) self.bytes_total = Some(NonZeroU32::new(n).unwrap())
} }
// Okay, now we know if we've read enough bytes or not. If the // Okay, now we know if we've read enough bytes or not. If the
@ -248,7 +254,7 @@ impl Stream for ConcurrentFsStream {
Poll::Ready(Some(Ok(Bytes::new()))) Poll::Ready(Some(Ok(Bytes::new())))
} else { } else {
// We have data! Give it to the reader! // We have data! Give it to the reader!
self.bytes_read += filled as u64; self.bytes_read += filled as u32;
bytes.truncate(filled); bytes.truncate(filled);
Poll::Ready(Some(Ok(bytes.into()))) Poll::Ready(Some(Ok(bytes.into())))
} }
@ -265,6 +271,6 @@ impl From<UpstreamError> for actix_web::Error {
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum WritingStatus { enum WritingStatus {
NotDone, NotDone,
Done(u64), Done(u32),
Error, Error,
} }

95
src/cache/low_mem.rs vendored
View file

@ -1,29 +1,32 @@
//! Low memory caching stuff //! Low memory caching stuff
use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{atomic::Ordering, Arc}; use std::sync::atomic::{AtomicU64, Ordering};
use std::{path::PathBuf, sync::atomic::AtomicU64}; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use log::LevelFilter; use log::{warn, LevelFilter};
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool}; use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}; use tokio::{
fs::remove_file,
sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender},
};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
pub struct LowMemCache { pub struct LowMemCache {
disk_path: PathBuf, disk_path: PathBuf,
disk_max_size: u64,
disk_cur_size: AtomicU64, disk_cur_size: AtomicU64,
file_size_channel_sender: UnboundedSender<u64>, file_size_channel_sender: UnboundedSender<u32>,
db_update_channel_sender: Sender<DbMessage>, db_update_channel_sender: Sender<DbMessage>,
} }
enum DbMessage { enum DbMessage {
Get(Arc<CacheKey>), Get(Arc<CacheKey>),
Put(Arc<CacheKey>, ImageMetadata), Put(PathBuf, u32),
} }
impl LowMemCache { impl LowMemCache {
@ -53,7 +56,6 @@ impl LowMemCache {
let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self { let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self {
disk_path, disk_path,
disk_max_size,
disk_cur_size: AtomicU64::new(0), disk_cur_size: AtomicU64::new(0),
file_size_channel_sender: file_size_tx, file_size_channel_sender: file_size_tx,
db_update_channel_sender: db_tx, db_update_channel_sender: db_tx,
@ -63,10 +65,30 @@ impl LowMemCache {
// the channel, which informs the low memory cache the total size of the // the channel, which informs the low memory cache the total size of the
// item that was put into the cache. // item that was put into the cache.
let new_self_0 = Arc::clone(&new_self); let new_self_0 = Arc::clone(&new_self);
let db_pool_0 = db_pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
let db_pool = db_pool_0;
let max_on_disk_size = disk_max_size / 20 * 19;
// This will never return None, effectively a loop // This will never return None, effectively a loop
while let Some(new_size) = file_size_rx.recv().await { while let Some(new_size) = file_size_rx.recv().await {
new_self_0.increase_usage(new_size).await; new_self_0.increase_usage(new_size);
if new_self_0.on_disk_size() >= max_on_disk_size {
let mut conn = db_pool.acquire().await.unwrap();
let items = sqlx::query!(
"select id, size from Images order by accessed asc limit 1000"
)
.fetch_all(&mut conn)
.await
.unwrap();
let mut size_freed = 0;
for item in items {
size_freed += item.size as u64;
tokio::spawn(remove_file(item.id));
}
new_self_0.decrease_usage(size_freed);
}
} }
}); });
@ -75,9 +97,39 @@ impl LowMemCache {
let db_pool = db_pool; let db_pool = db_pool;
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
while let Some(messages) = recv_stream.next().await { while let Some(messages) = recv_stream.next().await {
let now = chrono::Utc::now();
let mut transaction = db_pool.begin().await.unwrap(); let mut transaction = db_pool.begin().await.unwrap();
for message in messages {} for message in messages {
match message {
DbMessage::Get(entry) => {
let key = entry.to_string();
let query = sqlx::query!(
"update Images set accessed = ? where id = ?",
now,
key
)
.execute(&mut transaction)
.await;
if let Err(e) = query {
warn!("Failed to update timestamp in db for {}: {}", key, e);
}
}
DbMessage::Put(entry, size) => {
let key = entry.as_os_str().to_str();
let query = sqlx::query!(
"insert into Images (id, size, accessed) values (?, ?, ?)",
key,
size,
now,
)
.execute(&mut transaction)
.await;
if let Err(e) = query {
warn!("Failed to add {:?} to db: {}", key, e);
}
}
}
}
transaction.commit().await.unwrap(); transaction.commit().await.unwrap();
} }
}); });
@ -113,22 +165,35 @@ impl Cache for LowMemCache {
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> { ) -> Result<CacheStream, CacheError> {
let channel = self.db_update_channel_sender.clone(); let channel = self.db_update_channel_sender.clone();
let key_0 = Arc::clone(&key);
tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await });
let path = self.disk_path.clone().join(PathBuf::from(key.as_ref())); let path = self.disk_path.clone().join(PathBuf::from(key.as_ref()));
let path_0 = path.clone();
let db_callback = |size: u32| async move {
let _ = channel.send(DbMessage::Put(path_0, size)).await;
};
super::fs::write_file( super::fs::write_file(
&path, &path,
image, image,
metadata, metadata,
self.file_size_channel_sender.clone(), self.file_size_channel_sender.clone(),
db_callback,
) )
.await .await
.map_err(Into::into) .map_err(Into::into)
} }
async fn increase_usage(&self, amt: u64) { #[inline]
self.disk_cur_size.fetch_add(amt, Ordering::Release); fn increase_usage(&self, amt: u32) {
self.disk_cur_size.fetch_add(amt as u64, Ordering::Release);
}
#[inline]
fn on_disk_size(&self) -> u64 {
(self.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096
}
fn decrease_usage(&self, amt: u64) {
self.disk_cur_size.fetch_sub(amt, Ordering::Release);
} }
} }

6
src/cache/mod.rs vendored
View file

@ -164,7 +164,11 @@ pub trait Cache: Send + Sync {
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError>; ) -> Result<CacheStream, CacheError>;
async fn increase_usage(&self, amt: u64); fn increase_usage(&self, amt: u32);
fn decrease_usage(&self, amt: u64);
fn on_disk_size(&self) -> u64;
} }
pub enum CacheStream { pub enum CacheStream {