Support loading disk size from db

This commit is contained in:
Edward Shen 2021-05-20 00:38:28 -04:00
parent 6daadefaf1
commit a70f4bfdc3
Signed by: edward
GPG key ID: 19182661E818369F
4 changed files with 49 additions and 54 deletions

51
src/cache/disk.rs vendored
View file

@ -27,7 +27,7 @@ pub struct DiskCache {
enum DbMessage {
Get(Arc<PathBuf>),
Put(Arc<PathBuf>, u32),
Put(Arc<PathBuf>, u64),
}
impl DiskCache {
@ -53,9 +53,21 @@ impl DiskCache {
db
};
// This is intentional.
#[allow(clippy::cast_sign_loss)]
let disk_cur_size = {
let mut conn = db_pool.acquire().await.unwrap();
sqlx::query!("SELECT IFNULL(SUM(size), 0) AS size FROM Images")
.fetch_one(&mut conn)
.await
.map(|record| record.size)
.unwrap_or_default()
.unwrap_or_default() as u64
};
let new_self = Arc::new(Self {
disk_path,
disk_cur_size: AtomicU64::new(0),
disk_cur_size: AtomicU64::new(disk_cur_size),
db_update_channel_sender: db_tx,
});
@ -102,21 +114,24 @@ async fn db_listener(
}
DbMessage::Put(entry, size) => {
let key = entry.as_os_str().to_str();
let query = sqlx::query!(
"insert into Images (id, size, accessed) values (?, ?, ?) on conflict do nothing",
key,
size,
now,
)
.execute(&mut transaction)
.await;
if let Err(e) = query {
warn!("Failed to add {:?} to db: {}", key, e);
{
// This is intentional.
#[allow(clippy::cast_possible_wrap)]
let size = size as i64;
let query = sqlx::query!(
"insert into Images (id, size, accessed) values (?, ?, ?) on conflict do nothing",
key,
size,
now,
)
.execute(&mut transaction)
.await;
if let Err(e) = query {
warn!("Failed to add {:?} to db: {}", key, e);
}
}
cache
.disk_cur_size
.fetch_add(u64::from(size), Ordering::Release);
cache.disk_cur_size.fetch_add(size, Ordering::Release);
}
}
}
@ -205,7 +220,7 @@ impl Cache for DiskCache {
let path = Arc::new(self.disk_path.clone().join(PathBuf::from(&key)));
let path_0 = Arc::clone(&path);
let db_callback = |size: u32| async move {
let db_callback = |size: u64| async move {
std::mem::drop(channel.send(DbMessage::Put(path_0, size)).await);
};
@ -225,14 +240,14 @@ impl CallbackCache for DiskCache {
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
) -> Result<CacheStream, CacheError> {
let channel = self.db_update_channel_sender.clone();
let path = Arc::new(self.disk_path.clone().join(PathBuf::from(&key)));
let path_0 = Arc::clone(&path);
let db_callback = |size: u32| async move {
let db_callback = |size: u64| async move {
// We don't care about the result of the send
std::mem::drop(channel.send(DbMessage::Put(path_0, size)).await);
};

44
src/cache/fs.rs vendored
View file

@ -15,11 +15,10 @@
//! misses are treated as closer as a cache hit.
use std::collections::HashMap;
use std::convert::TryFrom;
use std::error::Error;
use std::fmt::Display;
use std::io::SeekFrom;
use std::num::NonZeroUsize;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
@ -27,7 +26,7 @@ use std::task::{Context, Poll};
use actix_web::error::PayloadError;
use bytes::{Buf, Bytes, BytesMut};
use futures::{Future, Stream, StreamExt};
use log::{debug, error, warn};
use log::{debug, warn};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::secretstream::{
@ -202,11 +201,11 @@ pub(super) async fn write_file<Fut, DbCallback>(
mut byte_stream: BoxedImageStream,
metadata: ImageMetadata,
db_callback: DbCallback,
on_complete: Option<Sender<(CacheKey, Bytes, ImageMetadata, usize)>>,
on_complete: Option<Sender<(CacheKey, Bytes, ImageMetadata, u64)>>,
) -> Result<(InnerStream, Option<Header>), std::io::Error>
where
Fut: 'static + Send + Sync + Future<Output = ()>,
DbCallback: 'static + Send + Sync + FnOnce(u32) -> Fut,
DbCallback: 'static + Send + Sync + FnOnce(u64) -> Fut,
{
let (tx, rx) = channel(WritingStatus::NotDone);
@ -237,7 +236,7 @@ where
tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async
let mut errored = false;
let mut bytes_written: u32 = 0;
let mut bytes_written: u64 = 0;
let mut acc_bytes = BytesMut::new();
let accumulate = on_complete.is_some();
writer.write_all(metadata_string.as_bytes()).await?;
@ -254,20 +253,7 @@ where
0 => break,
n => {
bytes.advance(n);
let n = if let Ok(n) = u32::try_from(n) {
n
} else {
error!("Tried to advance larger than an u32?");
errored = true;
break;
};
let (new_size, overflowed) = bytes_written.overflowing_add(n);
if overflowed {
error!("File size was larger than u32! Aborting!");
errored = true;
break;
}
bytes_written = new_size;
bytes_written += n as u64;
// We don't care if we don't have receivers
let _ = tx.send(WritingStatus::NotDone);
@ -311,12 +297,7 @@ where
if let Some(sender) = on_complete {
tokio::spawn(async move {
sender
.send((
cache_key,
acc_bytes.freeze(),
metadata,
bytes_written as usize,
))
.send((cache_key, acc_bytes.freeze(), metadata, bytes_written))
.await
});
}
@ -408,10 +389,10 @@ pub struct ConcurrentFsStream {
/// this reader will never complete.
receiver: Pin<Box<WatchStream<WritingStatus>>>,
/// The number of bytes the reader has read
bytes_read: usize,
bytes_read: u64,
/// The number of bytes that the writer has reported it has written. If the
/// writer has not reported yet, this value is None.
bytes_total: Option<NonZeroUsize>,
bytes_total: Option<NonZeroU64>,
}
impl ConcurrentFsStream {
@ -480,8 +461,7 @@ impl Stream for ConcurrentFsStream {
if let Poll::Ready(Some(WritingStatus::Done(n))) =
self.receiver.as_mut().poll_next_unpin(cx)
{
self.bytes_total =
Some(NonZeroUsize::new(n as usize).expect("Stored a 0 byte image?"))
self.bytes_total = Some(NonZeroU64::new(n).expect("Stored a 0 byte image?"))
}
// Okay, now we know if we've read enough bytes or not. If the
@ -504,7 +484,7 @@ impl Stream for ConcurrentFsStream {
Poll::Ready(Some(Ok(Bytes::new())))
} else {
// We have data! Give it to the reader!
self.bytes_read += filled;
self.bytes_read += filled as u64;
bytes.truncate(filled);
Poll::Ready(Some(Ok(bytes.into())))
}
@ -521,7 +501,7 @@ impl From<UpstreamError> for actix_web::Error {
#[derive(Debug, Clone, Copy)]
enum WritingStatus {
NotDone,
Done(u32),
Done(u64),
Error,
}

4
src/cache/mem.rs vendored
View file

@ -13,7 +13,7 @@ use lru::LruCache;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex;
type CacheValue = (Bytes, ImageMetadata, usize);
type CacheValue = (Bytes, ImageMetadata, u64);
/// Use LRU as the eviction strategy
pub type Lru = LruCache<CacheKey, CacheValue>;
@ -78,7 +78,7 @@ pub struct MemoryCache<InternalCacheImpl, InnerCache> {
inner: InnerCache,
cur_mem_size: AtomicU64,
mem_cache: Mutex<InternalCacheImpl>,
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
}
impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cache>

4
src/cache/mod.rs vendored
View file

@ -203,7 +203,7 @@ pub trait CallbackCache: Cache {
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
) -> Result<CacheStream, CacheError>;
}
@ -215,7 +215,7 @@ impl<T: CallbackCache> CallbackCache for Arc<T> {
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
) -> Result<CacheStream, CacheError> {
self.as_ref()
.put_with_on_completed_callback(key, image, metadata, on_complete)