Gracefully handle secretstream errors
This commit is contained in:
parent
a70f4bfdc3
commit
a3f3b5e3ab
2 changed files with 30 additions and 18 deletions
25
src/cache/fs.rs
vendored
25
src/cache/fs.rs
vendored
|
@ -91,14 +91,23 @@ pub(super) async fn read_file(
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
Box::pin(EncryptedDiskReader::new(
|
let header = match Header::from_slice(&header_bytes) {
|
||||||
file,
|
Some(header) => header,
|
||||||
SecretStream::init_pull(
|
None => {
|
||||||
&Header::from_slice(&header_bytes).expect("failed to get header"),
|
warn!("Found file, but encrypted header was invalid. Assuming corrupted!");
|
||||||
key,
|
return None;
|
||||||
)
|
}
|
||||||
.expect("Failed to initialize decryption kesy"),
|
};
|
||||||
))
|
|
||||||
|
let secret_stream = match SecretStream::init_pull(&header, key) {
|
||||||
|
Ok(stream) => stream,
|
||||||
|
Err(_) => {
|
||||||
|
warn!("Failed to init secret stream with key and header. Assuming corrupted!");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::pin(EncryptedDiskReader::new(file, secret_stream))
|
||||||
} else {
|
} else {
|
||||||
Box::pin(file)
|
Box::pin(file)
|
||||||
};
|
};
|
||||||
|
|
23
src/cache/mem.rs
vendored
23
src/cache/mem.rs
vendored
|
@ -74,22 +74,24 @@ impl InternalMemoryCache for Lru {
|
||||||
|
|
||||||
/// Memory accelerated disk cache. Uses the internal cache implementation in
|
/// Memory accelerated disk cache. Uses the internal cache implementation in
|
||||||
/// memory to speed up reads.
|
/// memory to speed up reads.
|
||||||
pub struct MemoryCache<InternalCacheImpl, InnerCache> {
|
pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
|
||||||
inner: InnerCache,
|
inner: ColdCache,
|
||||||
cur_mem_size: AtomicU64,
|
cur_mem_size: AtomicU64,
|
||||||
mem_cache: Mutex<InternalCacheImpl>,
|
mem_cache: Mutex<MemoryCacheImpl>,
|
||||||
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
|
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cache>
|
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
|
||||||
MemoryCache<InternalCacheImpl, InnerCache>
|
where
|
||||||
|
MemoryCacheImpl: 'static + InternalMemoryCache,
|
||||||
|
ColdCache: 'static + Cache,
|
||||||
{
|
{
|
||||||
pub async fn new(inner: InnerCache, max_mem_size: u64) -> Arc<Self> {
|
pub async fn new(inner: ColdCache, max_mem_size: u64) -> Arc<Self> {
|
||||||
let (tx, mut rx) = channel(100);
|
let (tx, mut rx) = channel(100);
|
||||||
let new_self = Arc::new(Self {
|
let new_self = Arc::new(Self {
|
||||||
inner,
|
inner,
|
||||||
cur_mem_size: AtomicU64::new(0),
|
cur_mem_size: AtomicU64::new(0),
|
||||||
mem_cache: Mutex::new(InternalCacheImpl::unbounded()),
|
mem_cache: Mutex::new(MemoryCacheImpl::unbounded()),
|
||||||
master_sender: tx,
|
master_sender: tx,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -99,6 +101,7 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cac
|
||||||
let max_mem_size = max_mem_size / 20 * 19;
|
let max_mem_size = max_mem_size / 20 * 19;
|
||||||
while let Some((key, bytes, metadata, size)) = rx.recv().await {
|
while let Some((key, bytes, metadata, size)) = rx.recv().await {
|
||||||
// Add to memory cache
|
// Add to memory cache
|
||||||
|
// We can add first because we constrain our memory usage to 95%
|
||||||
new_self
|
new_self
|
||||||
.cur_mem_size
|
.cur_mem_size
|
||||||
.fetch_add(size as u64, Ordering::Release);
|
.fetch_add(size as u64, Ordering::Release);
|
||||||
|
@ -132,10 +135,10 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cac
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<InternalCacheImpl, InnerCache> Cache for MemoryCache<InternalCacheImpl, InnerCache>
|
impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
|
||||||
where
|
where
|
||||||
InternalCacheImpl: InternalMemoryCache,
|
MemoryCacheImpl: InternalMemoryCache,
|
||||||
InnerCache: CallbackCache,
|
ColdCache: CallbackCache,
|
||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn get(
|
async fn get(
|
||||||
|
|
Loading…
Reference in a new issue