Clean up traits

This commit is contained in:
Edward Shen 2021-05-19 23:27:56 -04:00
parent 4f55380d23
commit c488eccc8d
Signed by: edward
GPG key ID: 19182661E818369F
5 changed files with 56 additions and 173 deletions

45
src/cache/disk.rs vendored
View file

@ -15,7 +15,9 @@ use tokio::fs::remove_file;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata}; use super::{
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
};
pub struct DiskCache { pub struct DiskCache {
disk_path: PathBuf, disk_path: PathBuf,
@ -112,7 +114,9 @@ async fn db_listener(
warn!("Failed to add {:?} to db: {}", key, e); warn!("Failed to add {:?} to db: {}", key, e);
} }
cache.increase_usage(size); cache
.disk_cur_size
.fetch_add(u64::from(size), Ordering::Release);
} }
} }
} }
@ -124,7 +128,8 @@ async fn db_listener(
); );
} }
if cache.on_disk_size() >= max_on_disk_size { let on_disk_size = (cache.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096;
if on_disk_size >= max_on_disk_size {
let mut conn = match db_pool.acquire().await { let mut conn = match db_pool.acquire().await {
Ok(conn) => conn, Ok(conn) => conn,
Err(e) => { Err(e) => {
@ -163,7 +168,7 @@ async fn db_listener(
tokio::spawn(remove_file(item.id)); tokio::spawn(remove_file(item.id));
} }
cache.decrease_usage(size_freed); cache.disk_cur_size.fetch_sub(size_freed, Ordering::Release);
} }
} }
} }
@ -211,28 +216,10 @@ impl Cache for DiskCache {
CacheStream::new(inner, maybe_header).map_err(|_| CacheError::DecryptionFailure) CacheStream::new(inner, maybe_header).map_err(|_| CacheError::DecryptionFailure)
}) })
} }
#[inline]
fn increase_usage(&self, amt: u32) {
self.disk_cur_size
.fetch_add(u64::from(amt), Ordering::Release);
}
#[inline]
fn decrease_usage(&self, amt: u64) {
self.disk_cur_size.fetch_sub(amt, Ordering::Release);
}
#[inline]
fn on_disk_size(&self) -> u64 {
(self.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096
}
#[inline]
fn mem_size(&self) -> u64 {
0
} }
#[async_trait]
impl CallbackCache for DiskCache {
async fn put_with_on_completed_callback( async fn put_with_on_completed_callback(
&self, &self,
key: CacheKey, key: CacheKey,
@ -257,14 +244,4 @@ impl Cache for DiskCache {
CacheStream::new(inner, maybe_header).map_err(|_| CacheError::DecryptionFailure) CacheStream::new(inner, maybe_header).map_err(|_| CacheError::DecryptionFailure)
}) })
} }
#[inline]
async fn put_internal(&self, _: CacheKey, _: Bytes, _: ImageMetadata, _: usize) {
unimplemented!()
}
#[inline]
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> {
None
}
} }

2
src/cache/fs.rs vendored
View file

@ -236,7 +236,7 @@ where
tokio::spawn(async move { tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async let path_buf = path_buf; // moves path buf into async
let mut errored = false; let mut errored = false;
let mut bytes_written: u32 = 0; let mut bytes_written = 0;
let mut acc_bytes = BytesMut::new(); let mut acc_bytes = BytesMut::new();
let accumulate = on_complete.is_some(); let accumulate = on_complete.is_some();
writer.write_all(metadata_string.as_bytes()).await?; writer.write_all(metadata_string.as_bytes()).await?;

95
src/cache/mem.rs vendored
View file

@ -2,8 +2,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use super::{ use super::{
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata, InnerStream, BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
MemStream, InnerStream, MemStream,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@ -98,14 +98,28 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cac
let new_self = new_self_0; let new_self = new_self_0;
let max_mem_size = max_mem_size / 20 * 19; let max_mem_size = max_mem_size / 20 * 19;
while let Some((key, bytes, metadata, size)) = rx.recv().await { while let Some((key, bytes, metadata, size)) = rx.recv().await {
new_self.inner.increase_usage(size as u32); // Add to memory cache
new_self new_self
.inner .cur_mem_size
.put_internal(key, bytes, metadata, size) .fetch_add(size as u64, Ordering::Release);
.await; new_self
while new_self.mem_size() >= max_mem_size { .mem_cache
if let Some((_, _, _, size)) = new_self.pop_memory().await { .lock()
new_self.decrease_usage(size as u64); .await
.push(key, (bytes, metadata, size));
// Pop if too large
while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size {
let popped = new_self
.mem_cache
.lock()
.await
.pop()
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
if let Some((_, _, _, size)) = popped {
new_self
.cur_mem_size
.fetch_sub(size as u64, Ordering::Release);
} else { } else {
break; break;
} }
@ -118,8 +132,10 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cac
} }
#[async_trait] #[async_trait]
impl<InternalCacheImpl: InternalMemoryCache, InnerCache: Cache> Cache impl<InternalCacheImpl, InnerCache> Cache for MemoryCache<InternalCacheImpl, InnerCache>
for MemoryCache<InternalCacheImpl, InnerCache> where
InternalCacheImpl: InternalMemoryCache,
InnerCache: CallbackCache,
{ {
#[inline] #[inline]
async fn get( async fn get(
@ -152,61 +168,4 @@ impl<InternalCacheImpl: InternalMemoryCache, InnerCache: Cache> Cache
.put_with_on_completed_callback(key, image, metadata, self.master_sender.clone()) .put_with_on_completed_callback(key, image, metadata, self.master_sender.clone())
.await .await
} }
#[inline]
fn increase_usage(&self, amt: u32) {
self.cur_mem_size
.fetch_add(u64::from(amt), Ordering::Release);
}
#[inline]
fn decrease_usage(&self, amt: u64) {
self.cur_mem_size.fetch_sub(amt, Ordering::Release);
}
#[inline]
fn on_disk_size(&self) -> u64 {
self.inner.on_disk_size()
}
#[inline]
fn mem_size(&self) -> u64 {
self.cur_mem_size.load(Ordering::Acquire)
}
#[inline]
async fn put_with_on_completed_callback(
&self,
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
) -> Result<CacheStream, super::CacheError> {
self.inner
.put_with_on_completed_callback(key, image, metadata, on_complete)
.await
}
#[inline]
async fn put_internal(
&self,
key: CacheKey,
image: Bytes,
metadata: ImageMetadata,
size: usize,
) {
self.mem_cache
.lock()
.await
.push(key, (image, metadata, size));
}
#[inline]
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> {
self.mem_cache
.lock()
.await
.pop()
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size))
}
} }

75
src/cache/mod.rs vendored
View file

@ -172,37 +172,6 @@ pub trait Cache: Send + Sync {
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError>; ) -> Result<CacheStream, CacheError>;
/// Increases the size of the cache. This is a double-dispatch method, so
/// see specific implementations for complete detail. This only accepts a
/// u32 as all files should be smaller than a u32 and some cache
/// implementations can only handle up to a u32.
fn increase_usage(&self, amt: u32);
/// Decreases the size of the cache. This is a double-dispatch method, so
/// see specific implementations for complete detail.
fn decrease_usage(&self, amt: u64);
/// Reports the on-disk size of the cache.
fn on_disk_size(&self) -> u64;
/// Reports the memory size of the cache.
fn mem_size(&self) -> u64;
async fn put_with_on_completed_callback(
&self,
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
) -> Result<CacheStream, CacheError>;
/// Double-dispatch method. Used by cache implementations that require a
/// completed entry to put items into their cache.
async fn put_internal(&self, key: CacheKey, image: Bytes, metadata: ImageMetadata, size: usize);
/// Pops an entry from the memory cache, if one exists.
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>;
} }
#[async_trait] #[async_trait]
@ -224,27 +193,21 @@ impl<T: Cache> Cache for std::sync::Arc<T> {
) -> Result<CacheStream, CacheError> { ) -> Result<CacheStream, CacheError> {
self.as_ref().put(key, image, metadata).await self.as_ref().put(key, image, metadata).await
} }
#[inline]
fn increase_usage(&self, amt: u32) {
self.as_ref().increase_usage(amt)
} }
#[inline] #[async_trait]
fn decrease_usage(&self, amt: u64) { pub trait CallbackCache: Cache {
self.as_ref().decrease_usage(amt) async fn put_with_on_completed_callback(
} &self,
key: CacheKey,
#[inline] image: BoxedImageStream,
fn on_disk_size(&self) -> u64 { metadata: ImageMetadata,
self.as_ref().on_disk_size() on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
} ) -> Result<CacheStream, CacheError>;
#[inline]
fn mem_size(&self) -> u64 {
self.as_ref().mem_size()
} }
#[async_trait]
impl<T: CallbackCache> CallbackCache for std::sync::Arc<T> {
#[inline] #[inline]
async fn put_with_on_completed_callback( async fn put_with_on_completed_callback(
&self, &self,
@ -257,22 +220,6 @@ impl<T: Cache> Cache for std::sync::Arc<T> {
.put_with_on_completed_callback(key, image, metadata, on_complete) .put_with_on_completed_callback(key, image, metadata, on_complete)
.await .await
} }
#[inline]
async fn put_internal(
&self,
key: CacheKey,
image: Bytes,
metadata: ImageMetadata,
size: usize,
) {
self.as_ref().put_internal(key, image, metadata, size).await
}
#[inline]
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> {
self.as_ref().pop_memory().await
}
} }
pub struct CacheStream { pub struct CacheStream {

View file

@ -55,7 +55,7 @@ impl Responder for ServerResponse {
#[get("/{token}/data/{chapter_hash}/{file_name}")] #[get("/{token}/data/{chapter_hash}/{file_name}")]
async fn token_data( async fn token_data(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Box<dyn Cache>>, cache: Data<dyn Cache>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -71,7 +71,7 @@ async fn token_data(
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")] #[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
async fn token_data_saver( async fn token_data_saver(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Box<dyn Cache>>, cache: Data<dyn Cache>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -189,7 +189,7 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
#[allow(clippy::future_not_send)] #[allow(clippy::future_not_send)]
async fn fetch_image( async fn fetch_image(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Box<dyn Cache>>, cache: Data<dyn Cache>,
chapter_hash: String, chapter_hash: String,
file_name: String, file_name: String,
is_data_saver: bool, is_data_saver: bool,