Compare commits

..

No commits in common. "54c8fe1cb32f8a533b868e4207cfabf9ecf85a39" and "fc930285f003ef133617e6e7561fcb52cf6cc972" have entirely different histories.

4 changed files with 51 additions and 81 deletions

6
src/cache/disk.rs vendored
View file

@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, warn};
use crate::units::Bytes; use crate::units::Bytes;
use super::{Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata}; use super::{Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata};
#[derive(Debug)] #[derive(Debug)]
pub struct DiskCache { pub struct DiskCache {
@ -125,8 +125,6 @@ async fn db_listener(
db_pool: SqlitePool, db_pool: SqlitePool,
max_on_disk_size: u64, max_on_disk_size: u64,
) { ) {
// This is in a receiver stream to process up to 128 simultaneous db updates
// in one transaction
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128); let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
while let Some(messages) = recv_stream.next().await { while let Some(messages) = recv_stream.next().await {
let mut transaction = match db_pool.begin().await { let mut transaction = match db_pool.begin().await {
@ -386,7 +384,7 @@ impl CallbackCache for DiskCache {
key: CacheKey, key: CacheKey,
image: bytes::Bytes, image: bytes::Bytes,
metadata: ImageMetadata, metadata: ImageMetadata,
on_complete: Sender<CacheEntry>, on_complete: Sender<(CacheKey, bytes::Bytes, ImageMetadata, u64)>,
) -> Result<(), CacheError> { ) -> Result<(), CacheError> {
let channel = self.db_update_channel_sender.clone(); let channel = self.db_update_channel_sender.clone();

23
src/cache/fs.rs vendored
View file

@ -39,7 +39,7 @@ use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::{debug, instrument, warn}; use tracing::{debug, instrument, warn};
use super::compat::LegacyImageMetadata; use super::compat::LegacyImageMetadata;
use super::{CacheEntry, CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY}; use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY};
/// Attempts to lookup the file on disk, returning a byte stream if it exists. /// Attempts to lookup the file on disk, returning a byte stream if it exists.
/// Note that this could return two types of streams, depending on if the file /// Note that this could return two types of streams, depending on if the file
@ -220,11 +220,11 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
/// that is called with a completed cache entry. /// that is called with a completed cache entry.
pub(super) async fn write_file<Fut, DbCallback>( pub(super) async fn write_file<Fut, DbCallback>(
path: &Path, path: &Path,
key: CacheKey, cache_key: CacheKey,
data: Bytes, bytes: Bytes,
metadata: ImageMetadata, metadata: ImageMetadata,
db_callback: DbCallback, db_callback: DbCallback,
on_complete: Option<Sender<CacheEntry>>, on_complete: Option<Sender<(CacheKey, Bytes, ImageMetadata, u64)>>,
) -> Result<(), std::io::Error> ) -> Result<(), std::io::Error>
where where
Fut: 'static + Send + Sync + Future<Output = ()>, Fut: 'static + Send + Sync + Future<Output = ()>,
@ -254,8 +254,8 @@ where
let mut error = writer.write_all(metadata_string.as_bytes()).await.err(); let mut error = writer.write_all(metadata_string.as_bytes()).await.err();
if error.is_none() { if error.is_none() {
debug!("decrypted write {:x?}", &data[..40]); debug!("decrypted write {:x?}", &bytes[..40]);
error = writer.write_all(&data).await.err(); error = writer.write_all(&bytes).await.err();
} }
if let Some(e) = error { if let Some(e) = error {
@ -270,18 +270,13 @@ where
writer.flush().await?; writer.flush().await?;
debug!("writing to file done"); debug!("writing to file done");
let on_disk_size = (metadata_size + data.len()) as u64; let bytes_written = (metadata_size + bytes.len()) as u64;
tokio::spawn(db_callback(on_disk_size)); tokio::spawn(db_callback(bytes_written));
if let Some(sender) = on_complete { if let Some(sender) = on_complete {
tokio::spawn(async move { tokio::spawn(async move {
sender sender
.send(CacheEntry { .send((cache_key, bytes, metadata, bytes_written))
key,
data,
metadata,
on_disk_size,
})
.await .await
}); });
} }

91
src/cache/mem.rs vendored
View file

@ -1,13 +1,13 @@
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use super::{Cache, CacheEntry, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream}; use super::{Cache, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures::FutureExt; use futures::FutureExt;
use lfu_cache::LfuCache; use lfu_cache::LfuCache;
use lru::LruCache; use lru::LruCache;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
type CacheValue = (Bytes, ImageMetadata, u64); type CacheValue = (Bytes, ImageMetadata, u64);
@ -75,7 +75,7 @@ pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
inner: ColdCache, inner: ColdCache,
cur_mem_size: AtomicU64, cur_mem_size: AtomicU64,
mem_cache: Mutex<MemoryCacheImpl>, mem_cache: Mutex<MemoryCacheImpl>,
master_sender: Sender<CacheEntry>, master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
} }
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache> impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
@ -84,7 +84,7 @@ where
ColdCache: 'static + Cache, ColdCache: 'static + Cache,
{ {
pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> { pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
let (tx, rx) = channel(100); let (tx, mut rx) = channel(100);
let new_self = Arc::new(Self { let new_self = Arc::new(Self {
inner, inner,
cur_mem_size: AtomicU64::new(0), cur_mem_size: AtomicU64::new(0),
@ -92,60 +92,45 @@ where
master_sender: tx, master_sender: tx,
}); });
tokio::spawn(internal_cache_listener( let new_self_0 = Arc::clone(&new_self);
Arc::clone(&new_self), tokio::spawn(async move {
max_mem_size, let new_self = new_self_0;
rx, let max_mem_size = max_mem_size.get() / 20 * 19;
)); while let Some((key, bytes, metadata, size)) = rx.recv().await {
// Add to memory cache
// We can add first because we constrain our memory usage to 95%
new_self
.cur_mem_size
.fetch_add(size as u64, Ordering::Release);
new_self
.mem_cache
.lock()
.await
.push(key, (bytes, metadata, size));
// Pop if too large
while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
let popped = new_self
.mem_cache
.lock()
.await
.pop()
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
if let Some((_, _, _, size)) = popped {
new_self
.cur_mem_size
.fetch_sub(size as u64, Ordering::Release);
} else {
break;
}
}
}
});
new_self new_self
} }
} }
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
cache: Arc<MemoryCache<MemoryCacheImpl, ColdCache>>,
max_mem_size: crate::units::Bytes,
mut rx: Receiver<CacheEntry>,
) where
MemoryCacheImpl: InternalMemoryCache,
ColdCache: Cache,
{
let max_mem_size = max_mem_size.get() / 20 * 19;
while let Some(CacheEntry {
key,
data,
metadata,
on_disk_size,
}) = rx.recv().await
{
// Add to memory cache
// We can add first because we constrain our memory usage to 95%
cache
.cur_mem_size
.fetch_add(on_disk_size as u64, Ordering::Release);
cache
.mem_cache
.lock()
.await
.push(key, (data, metadata, on_disk_size));
// Pop if too large
while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
let popped = cache
.mem_cache
.lock()
.await
.pop()
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
if let Some((_, _, _, size)) = popped {
cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release);
} else {
break;
}
}
}
}
#[async_trait] #[async_trait]
impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache> impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
where where

12
src/cache/mod.rs vendored
View file

@ -212,7 +212,7 @@ pub trait CallbackCache: Cache {
key: CacheKey, key: CacheKey,
image: Bytes, image: Bytes,
metadata: ImageMetadata, metadata: ImageMetadata,
on_complete: Sender<CacheEntry>, on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
) -> Result<(), CacheError>; ) -> Result<(), CacheError>;
} }
@ -224,21 +224,13 @@ impl<T: CallbackCache> CallbackCache for Arc<T> {
key: CacheKey, key: CacheKey,
image: Bytes, image: Bytes,
metadata: ImageMetadata, metadata: ImageMetadata,
on_complete: Sender<CacheEntry>, on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
) -> Result<(), CacheError> { ) -> Result<(), CacheError> {
self.as_ref() self.as_ref()
.put_with_on_completed_callback(key, image, metadata, on_complete) .put_with_on_completed_callback(key, image, metadata, on_complete)
.await .await
} }
} }
pub struct CacheEntry {
key: CacheKey,
data: Bytes,
metadata: ImageMetadata,
on_disk_size: u64,
}
pub enum CacheStream { pub enum CacheStream {
Memory(MemStream), Memory(MemStream),
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>), Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>),