This commit is contained in:
Edward Shen 2021-04-19 22:14:57 -04:00
parent 9904ba2cfc
commit db8473d5bf
Signed by: edward
GPG key ID: 19182661E818369F
4 changed files with 19 additions and 10 deletions

18
src/cache/low_mem.rs vendored
View file

@ -18,6 +18,10 @@ pub struct LowMemCache {
} }
impl LowMemCache { impl LowMemCache {
/// Constructs a new low memory cache at the provided path and capacity.
/// This internally spawns a task that will wait for filesystem
/// notifications when a file has been written.
#[allow(clippy::new_ret_no_self)]
pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<RwLock<Box<dyn Cache>>> { pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<RwLock<Box<dyn Cache>>> {
let (tx, mut rx) = unbounded_channel(); let (tx, mut rx) = unbounded_channel();
let new_self: Arc<RwLock<Box<dyn Cache>>> = Arc::new(RwLock::new(Box::new(Self { let new_self: Arc<RwLock<Box<dyn Cache>>> = Arc::new(RwLock::new(Box::new(Self {
@ -28,19 +32,17 @@ impl LowMemCache {
master_sender: tx, master_sender: tx,
}))); })));
// Spawns a new task that continuously listens for events received by
// the channel, which informs the low memory cache the total size of the
// item that was put into the cache.
let new_self_0 = Arc::clone(&new_self); let new_self_0 = Arc::clone(&new_self);
tokio::spawn(async move { tokio::spawn(async move {
loop { while let Some(new_size) = rx.recv().await {
let new_size = match rx.recv().await {
Some(v) => v,
None => break,
};
new_self_0.write().await.increase_usage(new_size).await; new_self_0.write().await.increase_usage(new_size).await;
} }
}); });
new_self.clone() new_self
} }
async fn prune(&mut self) { async fn prune(&mut self) {
@ -75,6 +77,8 @@ impl Cache for LowMemCache {
.map_err(Into::into) .map_err(Into::into)
} }
/// Increments the internal size counter, pruning if the value exceeds the
/// user-defined capacity.
async fn increase_usage(&mut self, amt: u64) { async fn increase_usage(&mut self, amt: u64) {
self.disk_cur_size += amt; self.disk_cur_size += amt;
if self.disk_cur_size > self.disk_max_size { if self.disk_cur_size > self.disk_max_size {

5
src/cache/mod.rs vendored
View file

@ -6,7 +6,7 @@ use std::task::{Context, Poll};
use actix_web::http::HeaderValue; use actix_web::http::HeaderValue;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::{Bytes, BytesMut};
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use fs::ConcurrentFsStream; use fs::ConcurrentFsStream;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
@ -88,6 +88,7 @@ impl AsRef<str> for ImageContentType {
} }
} }
#[allow(clippy::pub_enum_variant_names)]
#[derive(Debug)] #[derive(Debug)]
pub enum ImageRequestError { pub enum ImageRequestError {
InvalidContentType, InvalidContentType,
@ -184,7 +185,7 @@ impl Stream for CacheStream {
Self::Memory(stream) => stream.poll_next_unpin(cx), Self::Memory(stream) => stream.poll_next_unpin(cx),
Self::Completed(stream) => stream Self::Completed(stream) => stream
.poll_next_unpin(cx) .poll_next_unpin(cx)
.map_ok(|v| v.freeze()) .map_ok(BytesMut::freeze)
.map_err(|_| UpstreamError), .map_err(|_| UpstreamError),
} }
} }

View file

@ -48,6 +48,7 @@ impl Responder for ServerResponse {
} }
} }
#[allow(clippy::future_not_send)]
#[get("/{token}/data/{chapter_hash}/{file_name}")] #[get("/{token}/data/{chapter_hash}/{file_name}")]
async fn token_data( async fn token_data(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
@ -64,6 +65,7 @@ async fn token_data(
fetch_image(state, cache, chapter_hash, file_name, false).await fetch_image(state, cache, chapter_hash, file_name, false).await
} }
#[allow(clippy::future_not_send)]
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")] #[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
async fn token_data_saver( async fn token_data_saver(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
@ -80,6 +82,7 @@ async fn token_data_saver(
fetch_image(state, cache, chapter_hash, file_name, true).await fetch_image(state, cache, chapter_hash, file_name, true).await
} }
#[allow(clippy::future_not_send)]
pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl Responder { pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl Responder {
let path = &format!( let path = &format!(
"{}{}", "{}{}",
@ -181,6 +184,7 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
builder builder
} }
#[allow(clippy::future_not_send)]
async fn fetch_image( async fn fetch_image(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<RwLock<Box<dyn Cache>>>, cache: Data<RwLock<Box<dyn Cache>>>,