Compare commits
No commits in common. "5e7a82a61032075a35dd9d9ac94f62b6ed47342c" and "54c8fe1cb32f8a533b868e4207cfabf9ecf85a39" have entirely different histories.
5e7a82a610
...
54c8fe1cb3
4 changed files with 5 additions and 156 deletions
2
src/cache/fs.rs
vendored
2
src/cache/fs.rs
vendored
|
@ -65,7 +65,7 @@ pub(super) async fn read_file(
|
||||||
|
|
||||||
let parsed_metadata;
|
let parsed_metadata;
|
||||||
let mut maybe_header = None;
|
let mut maybe_header = None;
|
||||||
let mut reader: Option<Pin<Box<dyn MetadataFetch + Send + Sync>>> = None;
|
let mut reader: Option<Pin<Box<dyn MetadataFetch + Send>>> = None;
|
||||||
if let Ok(metadata) = maybe_metadata {
|
if let Ok(metadata) = maybe_metadata {
|
||||||
// image is decrypted
|
// image is decrypted
|
||||||
if ENCRYPTION_KEY.get().is_some() {
|
if ENCRYPTION_KEY.get().is_some() {
|
||||||
|
|
153
src/cache/mem.rs
vendored
153
src/cache/mem.rs
vendored
|
@ -83,7 +83,7 @@ where
|
||||||
MemoryCacheImpl: 'static + InternalMemoryCache,
|
MemoryCacheImpl: 'static + InternalMemoryCache,
|
||||||
ColdCache: 'static + Cache,
|
ColdCache: 'static + Cache,
|
||||||
{
|
{
|
||||||
pub fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
|
pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
|
||||||
let (tx, rx) = channel(100);
|
let (tx, rx) = channel(100);
|
||||||
let new_self = Arc::new(Self {
|
let new_self = Arc::new(Self {
|
||||||
inner,
|
inner,
|
||||||
|
@ -100,25 +100,6 @@ where
|
||||||
|
|
||||||
new_self
|
new_self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an instance of the cache with the receiver for callback events
|
|
||||||
/// Really only useful for inspecting the receiver, e.g. for testing
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn new_with_receiver(
|
|
||||||
inner: ColdCache,
|
|
||||||
_: crate::units::Bytes,
|
|
||||||
) -> (Self, Receiver<CacheEntry>) {
|
|
||||||
let (tx, rx) = channel(100);
|
|
||||||
(
|
|
||||||
Self {
|
|
||||||
inner,
|
|
||||||
cur_mem_size: AtomicU64::new(0),
|
|
||||||
mem_cache: Mutex::new(MemoryCacheImpl::unbounded()),
|
|
||||||
master_sender: tx,
|
|
||||||
},
|
|
||||||
rx,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
||||||
|
@ -199,135 +180,3 @@ where
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test_util {
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use super::{CacheValue, InternalMemoryCache};
|
|
||||||
use crate::cache::{
|
|
||||||
Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
|
|
||||||
};
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use tokio::sync::mpsc::Sender;
|
|
||||||
|
|
||||||
pub struct TestDiskCache(
|
|
||||||
pub HashMap<CacheKey, Result<(CacheStream, ImageMetadata), CacheError>>,
|
|
||||||
);
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Cache for TestDiskCache {
|
|
||||||
async fn get(
|
|
||||||
&self,
|
|
||||||
key: &CacheKey,
|
|
||||||
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
|
||||||
// todo: Actually implement nontrivial code
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn put(
|
|
||||||
&self,
|
|
||||||
key: CacheKey,
|
|
||||||
image: bytes::Bytes,
|
|
||||||
metadata: ImageMetadata,
|
|
||||||
) -> Result<(), CacheError> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl CallbackCache for TestDiskCache {
|
|
||||||
async fn put_with_on_completed_callback(
|
|
||||||
&self,
|
|
||||||
key: CacheKey,
|
|
||||||
image: bytes::Bytes,
|
|
||||||
metadata: ImageMetadata,
|
|
||||||
on_complete: Sender<CacheEntry>,
|
|
||||||
) -> Result<(), CacheError> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct TestMemoryCache(pub HashMap<CacheKey, CacheValue>);
|
|
||||||
|
|
||||||
impl InternalMemoryCache for TestMemoryCache {
|
|
||||||
fn unbounded() -> Self {
|
|
||||||
Self::default()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
|
||||||
self.0.get(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
|
||||||
self.0.insert(key, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
|
||||||
unimplemented!("shouldn't be needed for tests");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod cache_ops {
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::error::Error;
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use futures::FutureExt;
|
|
||||||
|
|
||||||
use crate::cache::mem::InternalMemoryCache;
|
|
||||||
use crate::cache::{Cache, CacheKey, CacheStream, ImageMetadata, MemStream};
|
|
||||||
|
|
||||||
use super::test_util::{TestDiskCache, TestMemoryCache};
|
|
||||||
use super::MemoryCache;
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn get_mem_cached() -> Result<(), Box<dyn Error>> {
|
|
||||||
let (cache, mut rx) = MemoryCache::<TestMemoryCache, _>::new_with_receiver(
|
|
||||||
TestDiskCache(HashMap::new()),
|
|
||||||
crate::units::Bytes(10),
|
|
||||||
);
|
|
||||||
|
|
||||||
let key = CacheKey("a".to_string(), "b".to_string(), false);
|
|
||||||
let metadata = ImageMetadata {
|
|
||||||
content_type: None,
|
|
||||||
content_length: Some(1),
|
|
||||||
last_modified: None,
|
|
||||||
};
|
|
||||||
let bytes = Bytes::from_static(b"abcd");
|
|
||||||
let value = (bytes.clone(), metadata.clone(), 34);
|
|
||||||
|
|
||||||
// Populate the cache, need to drop the lock else it's considered locked
|
|
||||||
// when we actually call the cache
|
|
||||||
{
|
|
||||||
let mem_cache = &mut cache.mem_cache.lock().await;
|
|
||||||
mem_cache.push(key.clone(), value.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let (stream, ret_metadata) = cache.get(&key).await.unwrap()?;
|
|
||||||
assert_eq!(metadata, ret_metadata);
|
|
||||||
if let CacheStream::Memory(MemStream(ret_stream)) = stream {
|
|
||||||
assert_eq!(bytes, ret_stream);
|
|
||||||
} else {
|
|
||||||
panic!("wrong stream type");
|
|
||||||
}
|
|
||||||
|
|
||||||
assert!(rx.recv().now_or_never().is_none());
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn get_disk_cached() {}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn get_miss() {}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod db_listener {
|
|
||||||
use super::*;
|
|
||||||
}
|
|
||||||
|
|
2
src/cache/mod.rs
vendored
2
src/cache/mod.rs
vendored
|
@ -241,7 +241,7 @@ pub struct CacheEntry {
|
||||||
|
|
||||||
pub enum CacheStream {
|
pub enum CacheStream {
|
||||||
Memory(MemStream),
|
Memory(MemStream),
|
||||||
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send + Sync>>, BytesCodec>),
|
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<CachedImage> for CacheStream {
|
impl From<CachedImage> for CacheStream {
|
||||||
|
|
|
@ -172,8 +172,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
let cache = DiskCache::new(disk_quota.into(), cache_path.clone()).await;
|
let cache = DiskCache::new(disk_quota.into(), cache_path.clone()).await;
|
||||||
let cache: Arc<dyn Cache> = match cache_type {
|
let cache: Arc<dyn Cache> = match cache_type {
|
||||||
CacheType::OnDisk => cache,
|
CacheType::OnDisk => cache,
|
||||||
CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size),
|
CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size).await,
|
||||||
CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size),
|
CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size).await,
|
||||||
};
|
};
|
||||||
|
|
||||||
let cache_0 = Arc::clone(&cache);
|
let cache_0 = Arc::clone(&cache);
|
||||||
|
|
Loading…
Reference in a new issue