Compare commits
2 commits
fc930285f0
...
54c8fe1cb3
Author | SHA1 | Date | |
---|---|---|---|
54c8fe1cb3 | |||
8556f37904 |
4 changed files with 81 additions and 51 deletions
6
src/cache/disk.rs
vendored
6
src/cache/disk.rs
vendored
|
@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, warn};
|
||||||
|
|
||||||
use crate::units::Bytes;
|
use crate::units::Bytes;
|
||||||
|
|
||||||
use super::{Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata};
|
use super::{Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DiskCache {
|
pub struct DiskCache {
|
||||||
|
@ -125,6 +125,8 @@ async fn db_listener(
|
||||||
db_pool: SqlitePool,
|
db_pool: SqlitePool,
|
||||||
max_on_disk_size: u64,
|
max_on_disk_size: u64,
|
||||||
) {
|
) {
|
||||||
|
// This is in a receiver stream to process up to 128 simultaneous db updates
|
||||||
|
// in one transaction
|
||||||
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
|
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
|
||||||
while let Some(messages) = recv_stream.next().await {
|
while let Some(messages) = recv_stream.next().await {
|
||||||
let mut transaction = match db_pool.begin().await {
|
let mut transaction = match db_pool.begin().await {
|
||||||
|
@ -384,7 +386,7 @@ impl CallbackCache for DiskCache {
|
||||||
key: CacheKey,
|
key: CacheKey,
|
||||||
image: bytes::Bytes,
|
image: bytes::Bytes,
|
||||||
metadata: ImageMetadata,
|
metadata: ImageMetadata,
|
||||||
on_complete: Sender<(CacheKey, bytes::Bytes, ImageMetadata, u64)>,
|
on_complete: Sender<CacheEntry>,
|
||||||
) -> Result<(), CacheError> {
|
) -> Result<(), CacheError> {
|
||||||
let channel = self.db_update_channel_sender.clone();
|
let channel = self.db_update_channel_sender.clone();
|
||||||
|
|
||||||
|
|
23
src/cache/fs.rs
vendored
23
src/cache/fs.rs
vendored
|
@ -39,7 +39,7 @@ use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
use tracing::{debug, instrument, warn};
|
use tracing::{debug, instrument, warn};
|
||||||
|
|
||||||
use super::compat::LegacyImageMetadata;
|
use super::compat::LegacyImageMetadata;
|
||||||
use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY};
|
use super::{CacheEntry, CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY};
|
||||||
|
|
||||||
/// Attempts to lookup the file on disk, returning a byte stream if it exists.
|
/// Attempts to lookup the file on disk, returning a byte stream if it exists.
|
||||||
/// Note that this could return two types of streams, depending on if the file
|
/// Note that this could return two types of streams, depending on if the file
|
||||||
|
@ -220,11 +220,11 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
||||||
/// that is called with a completed cache entry.
|
/// that is called with a completed cache entry.
|
||||||
pub(super) async fn write_file<Fut, DbCallback>(
|
pub(super) async fn write_file<Fut, DbCallback>(
|
||||||
path: &Path,
|
path: &Path,
|
||||||
cache_key: CacheKey,
|
key: CacheKey,
|
||||||
bytes: Bytes,
|
data: Bytes,
|
||||||
metadata: ImageMetadata,
|
metadata: ImageMetadata,
|
||||||
db_callback: DbCallback,
|
db_callback: DbCallback,
|
||||||
on_complete: Option<Sender<(CacheKey, Bytes, ImageMetadata, u64)>>,
|
on_complete: Option<Sender<CacheEntry>>,
|
||||||
) -> Result<(), std::io::Error>
|
) -> Result<(), std::io::Error>
|
||||||
where
|
where
|
||||||
Fut: 'static + Send + Sync + Future<Output = ()>,
|
Fut: 'static + Send + Sync + Future<Output = ()>,
|
||||||
|
@ -254,8 +254,8 @@ where
|
||||||
|
|
||||||
let mut error = writer.write_all(metadata_string.as_bytes()).await.err();
|
let mut error = writer.write_all(metadata_string.as_bytes()).await.err();
|
||||||
if error.is_none() {
|
if error.is_none() {
|
||||||
debug!("decrypted write {:x?}", &bytes[..40]);
|
debug!("decrypted write {:x?}", &data[..40]);
|
||||||
error = writer.write_all(&bytes).await.err();
|
error = writer.write_all(&data).await.err();
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(e) = error {
|
if let Some(e) = error {
|
||||||
|
@ -270,13 +270,18 @@ where
|
||||||
writer.flush().await?;
|
writer.flush().await?;
|
||||||
debug!("writing to file done");
|
debug!("writing to file done");
|
||||||
|
|
||||||
let bytes_written = (metadata_size + bytes.len()) as u64;
|
let on_disk_size = (metadata_size + data.len()) as u64;
|
||||||
tokio::spawn(db_callback(bytes_written));
|
tokio::spawn(db_callback(on_disk_size));
|
||||||
|
|
||||||
if let Some(sender) = on_complete {
|
if let Some(sender) = on_complete {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
sender
|
sender
|
||||||
.send((cache_key, bytes, metadata, bytes_written))
|
.send(CacheEntry {
|
||||||
|
key,
|
||||||
|
data,
|
||||||
|
metadata,
|
||||||
|
on_disk_size,
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
91
src/cache/mem.rs
vendored
91
src/cache/mem.rs
vendored
|
@ -1,13 +1,13 @@
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::{Cache, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream};
|
use super::{Cache, CacheEntry, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use lfu_cache::LfuCache;
|
use lfu_cache::LfuCache;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use tokio::sync::mpsc::{channel, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
type CacheValue = (Bytes, ImageMetadata, u64);
|
type CacheValue = (Bytes, ImageMetadata, u64);
|
||||||
|
@ -75,7 +75,7 @@ pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
|
||||||
inner: ColdCache,
|
inner: ColdCache,
|
||||||
cur_mem_size: AtomicU64,
|
cur_mem_size: AtomicU64,
|
||||||
mem_cache: Mutex<MemoryCacheImpl>,
|
mem_cache: Mutex<MemoryCacheImpl>,
|
||||||
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
|
master_sender: Sender<CacheEntry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
|
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
|
||||||
|
@ -84,7 +84,7 @@ where
|
||||||
ColdCache: 'static + Cache,
|
ColdCache: 'static + Cache,
|
||||||
{
|
{
|
||||||
pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
|
pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
|
||||||
let (tx, mut rx) = channel(100);
|
let (tx, rx) = channel(100);
|
||||||
let new_self = Arc::new(Self {
|
let new_self = Arc::new(Self {
|
||||||
inner,
|
inner,
|
||||||
cur_mem_size: AtomicU64::new(0),
|
cur_mem_size: AtomicU64::new(0),
|
||||||
|
@ -92,45 +92,60 @@ where
|
||||||
master_sender: tx,
|
master_sender: tx,
|
||||||
});
|
});
|
||||||
|
|
||||||
let new_self_0 = Arc::clone(&new_self);
|
tokio::spawn(internal_cache_listener(
|
||||||
tokio::spawn(async move {
|
Arc::clone(&new_self),
|
||||||
let new_self = new_self_0;
|
max_mem_size,
|
||||||
let max_mem_size = max_mem_size.get() / 20 * 19;
|
rx,
|
||||||
while let Some((key, bytes, metadata, size)) = rx.recv().await {
|
));
|
||||||
// Add to memory cache
|
|
||||||
// We can add first because we constrain our memory usage to 95%
|
|
||||||
new_self
|
|
||||||
.cur_mem_size
|
|
||||||
.fetch_add(size as u64, Ordering::Release);
|
|
||||||
new_self
|
|
||||||
.mem_cache
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.push(key, (bytes, metadata, size));
|
|
||||||
|
|
||||||
// Pop if too large
|
|
||||||
while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
|
|
||||||
let popped = new_self
|
|
||||||
.mem_cache
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.pop()
|
|
||||||
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
|
|
||||||
if let Some((_, _, _, size)) = popped {
|
|
||||||
new_self
|
|
||||||
.cur_mem_size
|
|
||||||
.fetch_sub(size as u64, Ordering::Release);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
new_self
|
new_self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
||||||
|
cache: Arc<MemoryCache<MemoryCacheImpl, ColdCache>>,
|
||||||
|
max_mem_size: crate::units::Bytes,
|
||||||
|
mut rx: Receiver<CacheEntry>,
|
||||||
|
) where
|
||||||
|
MemoryCacheImpl: InternalMemoryCache,
|
||||||
|
ColdCache: Cache,
|
||||||
|
{
|
||||||
|
let max_mem_size = max_mem_size.get() / 20 * 19;
|
||||||
|
while let Some(CacheEntry {
|
||||||
|
key,
|
||||||
|
data,
|
||||||
|
metadata,
|
||||||
|
on_disk_size,
|
||||||
|
}) = rx.recv().await
|
||||||
|
{
|
||||||
|
// Add to memory cache
|
||||||
|
// We can add first because we constrain our memory usage to 95%
|
||||||
|
cache
|
||||||
|
.cur_mem_size
|
||||||
|
.fetch_add(on_disk_size as u64, Ordering::Release);
|
||||||
|
cache
|
||||||
|
.mem_cache
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.push(key, (data, metadata, on_disk_size));
|
||||||
|
|
||||||
|
// Pop if too large
|
||||||
|
while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
|
||||||
|
let popped = cache
|
||||||
|
.mem_cache
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.pop()
|
||||||
|
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
|
||||||
|
if let Some((_, _, _, size)) = popped {
|
||||||
|
cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
|
impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
|
||||||
where
|
where
|
||||||
|
|
12
src/cache/mod.rs
vendored
12
src/cache/mod.rs
vendored
|
@ -212,7 +212,7 @@ pub trait CallbackCache: Cache {
|
||||||
key: CacheKey,
|
key: CacheKey,
|
||||||
image: Bytes,
|
image: Bytes,
|
||||||
metadata: ImageMetadata,
|
metadata: ImageMetadata,
|
||||||
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
|
on_complete: Sender<CacheEntry>,
|
||||||
) -> Result<(), CacheError>;
|
) -> Result<(), CacheError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,13 +224,21 @@ impl<T: CallbackCache> CallbackCache for Arc<T> {
|
||||||
key: CacheKey,
|
key: CacheKey,
|
||||||
image: Bytes,
|
image: Bytes,
|
||||||
metadata: ImageMetadata,
|
metadata: ImageMetadata,
|
||||||
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
|
on_complete: Sender<CacheEntry>,
|
||||||
) -> Result<(), CacheError> {
|
) -> Result<(), CacheError> {
|
||||||
self.as_ref()
|
self.as_ref()
|
||||||
.put_with_on_completed_callback(key, image, metadata, on_complete)
|
.put_with_on_completed_callback(key, image, metadata, on_complete)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct CacheEntry {
|
||||||
|
key: CacheKey,
|
||||||
|
data: Bytes,
|
||||||
|
metadata: ImageMetadata,
|
||||||
|
on_disk_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
pub enum CacheStream {
|
pub enum CacheStream {
|
||||||
Memory(MemStream),
|
Memory(MemStream),
|
||||||
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>),
|
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>),
|
||||||
|
|
Loading…
Reference in a new issue