diff --git a/src/cache/disk.rs b/src/cache/disk.rs index f8aca83..5ba5f74 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -27,7 +27,7 @@ pub struct DiskCache { enum DbMessage { Get(Arc), - Put(Arc, u32), + Put(Arc, u64), } impl DiskCache { @@ -53,9 +53,21 @@ impl DiskCache { db }; + // This is intentional. + #[allow(clippy::cast_sign_loss)] + let disk_cur_size = { + let mut conn = db_pool.acquire().await.unwrap(); + sqlx::query!("SELECT IFNULL(SUM(size), 0) AS size FROM Images") + .fetch_one(&mut conn) + .await + .map(|record| record.size) + .unwrap_or_default() + .unwrap_or_default() as u64 + }; + let new_self = Arc::new(Self { disk_path, - disk_cur_size: AtomicU64::new(0), + disk_cur_size: AtomicU64::new(disk_cur_size), db_update_channel_sender: db_tx, }); @@ -102,21 +114,24 @@ async fn db_listener( } DbMessage::Put(entry, size) => { let key = entry.as_os_str().to_str(); - let query = sqlx::query!( - "insert into Images (id, size, accessed) values (?, ?, ?) on conflict do nothing", - key, - size, - now, - ) - .execute(&mut transaction) - .await; - if let Err(e) = query { - warn!("Failed to add {:?} to db: {}", key, e); + { + // This is intentional. + #[allow(clippy::cast_possible_wrap)] + let size = size as i64; + let query = sqlx::query!( + "insert into Images (id, size, accessed) values (?, ?, ?) on conflict do nothing", + key, + size, + now, + ) + .execute(&mut transaction) + .await; + if let Err(e) = query { + warn!("Failed to add {:?} to db: {}", key, e); + } } - cache - .disk_cur_size - .fetch_add(u64::from(size), Ordering::Release); + cache.disk_cur_size.fetch_add(size, Ordering::Release); } } } @@ -205,7 +220,7 @@ impl Cache for DiskCache { let path = Arc::new(self.disk_path.clone().join(PathBuf::from(&key))); let path_0 = Arc::clone(&path); - let db_callback = |size: u32| async move { + let db_callback = |size: u64| async move { std::mem::drop(channel.send(DbMessage::Put(path_0, size)).await); }; @@ -225,14 +240,14 @@ impl CallbackCache for DiskCache { key: CacheKey, image: BoxedImageStream, metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, + on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, ) -> Result { let channel = self.db_update_channel_sender.clone(); let path = Arc::new(self.disk_path.clone().join(PathBuf::from(&key))); let path_0 = Arc::clone(&path); - let db_callback = |size: u32| async move { + let db_callback = |size: u64| async move { // We don't care about the result of the send std::mem::drop(channel.send(DbMessage::Put(path_0, size)).await); }; diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 37b3fa4..25863dd 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -15,11 +15,10 @@ //! misses are treated as closer as a cache hit. use std::collections::HashMap; -use std::convert::TryFrom; use std::error::Error; use std::fmt::Display; use std::io::SeekFrom; -use std::num::NonZeroUsize; +use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -27,7 +26,7 @@ use std::task::{Context, Poll}; use actix_web::error::PayloadError; use bytes::{Buf, Bytes, BytesMut}; use futures::{Future, Stream, StreamExt}; -use log::{debug, error, warn}; +use log::{debug, warn}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::secretstream::{ @@ -202,11 +201,11 @@ pub(super) async fn write_file( mut byte_stream: BoxedImageStream, metadata: ImageMetadata, db_callback: DbCallback, - on_complete: Option>, + on_complete: Option>, ) -> Result<(InnerStream, Option
), std::io::Error> where Fut: 'static + Send + Sync + Future, - DbCallback: 'static + Send + Sync + FnOnce(u32) -> Fut, + DbCallback: 'static + Send + Sync + FnOnce(u64) -> Fut, { let (tx, rx) = channel(WritingStatus::NotDone); @@ -237,7 +236,7 @@ where tokio::spawn(async move { let path_buf = path_buf; // moves path buf into async let mut errored = false; - let mut bytes_written: u32 = 0; + let mut bytes_written: u64 = 0; let mut acc_bytes = BytesMut::new(); let accumulate = on_complete.is_some(); writer.write_all(metadata_string.as_bytes()).await?; @@ -254,20 +253,7 @@ where 0 => break, n => { bytes.advance(n); - let n = if let Ok(n) = u32::try_from(n) { - n - } else { - error!("Tried to advance larger than an u32?"); - errored = true; - break; - }; - let (new_size, overflowed) = bytes_written.overflowing_add(n); - if overflowed { - error!("File size was larger than u32! Aborting!"); - errored = true; - break; - } - bytes_written = new_size; + bytes_written += n as u64; // We don't care if we don't have receivers let _ = tx.send(WritingStatus::NotDone); @@ -311,12 +297,7 @@ where if let Some(sender) = on_complete { tokio::spawn(async move { sender - .send(( - cache_key, - acc_bytes.freeze(), - metadata, - bytes_written as usize, - )) + .send((cache_key, acc_bytes.freeze(), metadata, bytes_written)) .await }); } @@ -408,10 +389,10 @@ pub struct ConcurrentFsStream { /// this reader will never complete. receiver: Pin>>, /// The number of bytes the reader has read - bytes_read: usize, + bytes_read: u64, /// The number of bytes that the writer has reported it has written. If the /// writer has not reported yet, this value is None. - bytes_total: Option, + bytes_total: Option, } impl ConcurrentFsStream { @@ -480,8 +461,7 @@ impl Stream for ConcurrentFsStream { if let Poll::Ready(Some(WritingStatus::Done(n))) = self.receiver.as_mut().poll_next_unpin(cx) { - self.bytes_total = - Some(NonZeroUsize::new(n as usize).expect("Stored a 0 byte image?")) + self.bytes_total = Some(NonZeroU64::new(n).expect("Stored a 0 byte image?")) } // Okay, now we know if we've read enough bytes or not. If the @@ -504,7 +484,7 @@ impl Stream for ConcurrentFsStream { Poll::Ready(Some(Ok(Bytes::new()))) } else { // We have data! Give it to the reader! - self.bytes_read += filled; + self.bytes_read += filled as u64; bytes.truncate(filled); Poll::Ready(Some(Ok(bytes.into()))) } @@ -521,7 +501,7 @@ impl From for actix_web::Error { #[derive(Debug, Clone, Copy)] enum WritingStatus { NotDone, - Done(u32), + Done(u64), Error, } diff --git a/src/cache/mem.rs b/src/cache/mem.rs index d4e50dc..36b3fc8 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -13,7 +13,7 @@ use lru::LruCache; use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::Mutex; -type CacheValue = (Bytes, ImageMetadata, usize); +type CacheValue = (Bytes, ImageMetadata, u64); /// Use LRU as the eviction strategy pub type Lru = LruCache; @@ -78,7 +78,7 @@ pub struct MemoryCache { inner: InnerCache, cur_mem_size: AtomicU64, mem_cache: Mutex, - master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, + master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, } impl diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 83296d0..39846f9 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -203,7 +203,7 @@ pub trait CallbackCache: Cache { key: CacheKey, image: BoxedImageStream, metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, + on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, ) -> Result; } @@ -215,7 +215,7 @@ impl CallbackCache for Arc { key: CacheKey, image: BoxedImageStream, metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, + on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, ) -> Result { self.as_ref() .put_with_on_completed_callback(key, image, metadata, on_complete)