Compare commits

...

2 commits

Author SHA1 Message Date
7f16e697e6
Fix lints 2021-05-20 13:42:08 -04:00
a3f3b5e3ab
Gracefully handle secretstream errors 2021-05-20 13:35:57 -04:00
2 changed files with 28 additions and 18 deletions

23
src/cache/fs.rs vendored
View file

@ -91,14 +91,21 @@ pub(super) async fn read_file(
return None; return None;
} }
Box::pin(EncryptedDiskReader::new( let header = if let Some(header) = Header::from_slice(&header_bytes) {
file, header
SecretStream::init_pull( } else {
&Header::from_slice(&header_bytes).expect("failed to get header"), warn!("Found file, but encrypted header was invalid. Assuming corrupted!");
key, return None;
) };
.expect("Failed to initialize decryption kesy"),
)) let secret_stream = if let Ok(stream) = SecretStream::init_pull(&header, key) {
stream
} else {
warn!("Failed to init secret stream with key and header. Assuming corrupted!");
return None;
};
Box::pin(EncryptedDiskReader::new(file, secret_stream))
} else { } else {
Box::pin(file) Box::pin(file)
}; };

23
src/cache/mem.rs vendored
View file

@ -74,22 +74,24 @@ impl InternalMemoryCache for Lru {
/// Memory accelerated disk cache. Uses the internal cache implementation in /// Memory accelerated disk cache. Uses the internal cache implementation in
/// memory to speed up reads. /// memory to speed up reads.
pub struct MemoryCache<InternalCacheImpl, InnerCache> { pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
inner: InnerCache, inner: ColdCache,
cur_mem_size: AtomicU64, cur_mem_size: AtomicU64,
mem_cache: Mutex<InternalCacheImpl>, mem_cache: Mutex<MemoryCacheImpl>,
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>, master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
} }
impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cache> impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
MemoryCache<InternalCacheImpl, InnerCache> where
MemoryCacheImpl: 'static + InternalMemoryCache,
ColdCache: 'static + Cache,
{ {
pub async fn new(inner: InnerCache, max_mem_size: u64) -> Arc<Self> { pub async fn new(inner: ColdCache, max_mem_size: u64) -> Arc<Self> {
let (tx, mut rx) = channel(100); let (tx, mut rx) = channel(100);
let new_self = Arc::new(Self { let new_self = Arc::new(Self {
inner, inner,
cur_mem_size: AtomicU64::new(0), cur_mem_size: AtomicU64::new(0),
mem_cache: Mutex::new(InternalCacheImpl::unbounded()), mem_cache: Mutex::new(MemoryCacheImpl::unbounded()),
master_sender: tx, master_sender: tx,
}); });
@ -99,6 +101,7 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cac
let max_mem_size = max_mem_size / 20 * 19; let max_mem_size = max_mem_size / 20 * 19;
while let Some((key, bytes, metadata, size)) = rx.recv().await { while let Some((key, bytes, metadata, size)) = rx.recv().await {
// Add to memory cache // Add to memory cache
// We can add first because we constrain our memory usage to 95%
new_self new_self
.cur_mem_size .cur_mem_size
.fetch_add(size as u64, Ordering::Release); .fetch_add(size as u64, Ordering::Release);
@ -132,10 +135,10 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cac
} }
#[async_trait] #[async_trait]
impl<InternalCacheImpl, InnerCache> Cache for MemoryCache<InternalCacheImpl, InnerCache> impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
where where
InternalCacheImpl: InternalMemoryCache, MemoryCacheImpl: InternalMemoryCache,
InnerCache: CallbackCache, ColdCache: CallbackCache,
{ {
#[inline] #[inline]
async fn get( async fn get(