diff --git a/src/cache/disk.rs b/src/cache/disk.rs index 34d7bee..bbf8707 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, warn}; use crate::units::Bytes; -use super::{Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata}; +use super::{Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata}; #[derive(Debug)] pub struct DiskCache { @@ -386,7 +386,7 @@ impl CallbackCache for DiskCache { key: CacheKey, image: bytes::Bytes, metadata: ImageMetadata, - on_complete: Sender<(CacheKey, bytes::Bytes, ImageMetadata, u64)>, + on_complete: Sender, ) -> Result<(), CacheError> { let channel = self.db_update_channel_sender.clone(); diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 44b9c5d..bae294e 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -39,7 +39,7 @@ use tokio_util::codec::{BytesCodec, FramedRead}; use tracing::{debug, instrument, warn}; use super::compat::LegacyImageMetadata; -use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY}; +use super::{CacheEntry, CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY}; /// Attempts to lookup the file on disk, returning a byte stream if it exists. /// Note that this could return two types of streams, depending on if the file @@ -220,11 +220,11 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> { /// that is called with a completed cache entry. pub(super) async fn write_file( path: &Path, - cache_key: CacheKey, - bytes: Bytes, + key: CacheKey, + data: Bytes, metadata: ImageMetadata, db_callback: DbCallback, - on_complete: Option>, + on_complete: Option>, ) -> Result<(), std::io::Error> where Fut: 'static + Send + Sync + Future, @@ -254,8 +254,8 @@ where let mut error = writer.write_all(metadata_string.as_bytes()).await.err(); if error.is_none() { - debug!("decrypted write {:x?}", &bytes[..40]); - error = writer.write_all(&bytes).await.err(); + debug!("decrypted write {:x?}", &data[..40]); + error = writer.write_all(&data).await.err(); } if let Some(e) = error { @@ -270,13 +270,18 @@ where writer.flush().await?; debug!("writing to file done"); - let bytes_written = (metadata_size + bytes.len()) as u64; - tokio::spawn(db_callback(bytes_written)); + let on_disk_size = (metadata_size + data.len()) as u64; + tokio::spawn(db_callback(on_disk_size)); if let Some(sender) = on_complete { tokio::spawn(async move { sender - .send((cache_key, bytes, metadata, bytes_written)) + .send(CacheEntry { + key, + data, + metadata, + on_disk_size, + }) .await }); } diff --git a/src/cache/mem.rs b/src/cache/mem.rs index 96c2fed..5e235d2 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -1,7 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use super::{Cache, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream}; +use super::{Cache, CacheEntry, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream}; use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; @@ -75,7 +75,7 @@ pub struct MemoryCache { inner: ColdCache, cur_mem_size: AtomicU64, mem_cache: Mutex, - master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, + master_sender: Sender, } impl MemoryCache @@ -105,21 +105,29 @@ where async fn internal_cache_listener( cache: Arc>, max_mem_size: crate::units::Bytes, - mut rx: Receiver<(CacheKey, Bytes, ImageMetadata, u64)>, + mut rx: Receiver, ) where MemoryCacheImpl: InternalMemoryCache, ColdCache: Cache, { let max_mem_size = max_mem_size.get() / 20 * 19; - while let Some((key, bytes, metadata, size)) = rx.recv().await { + while let Some(CacheEntry { + key, + data, + metadata, + on_disk_size, + }) = rx.recv().await + { // Add to memory cache // We can add first because we constrain our memory usage to 95% - cache.cur_mem_size.fetch_add(size as u64, Ordering::Release); + cache + .cur_mem_size + .fetch_add(on_disk_size as u64, Ordering::Release); cache .mem_cache .lock() .await - .push(key, (bytes, metadata, size)); + .push(key, (data, metadata, on_disk_size)); // Pop if too large while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 { diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 0562ddc..a17a8df 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -212,7 +212,7 @@ pub trait CallbackCache: Cache { key: CacheKey, image: Bytes, metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, + on_complete: Sender, ) -> Result<(), CacheError>; } @@ -224,13 +224,21 @@ impl CallbackCache for Arc { key: CacheKey, image: Bytes, metadata: ImageMetadata, - on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, + on_complete: Sender, ) -> Result<(), CacheError> { self.as_ref() .put_with_on_completed_callback(key, image, metadata, on_complete) .await } } + +pub struct CacheEntry { + key: CacheKey, + data: Bytes, + metadata: ImageMetadata, + on_disk_size: u64, +} + pub enum CacheStream { Memory(MemStream), Completed(FramedRead>, BytesCodec>),