From c25b8be45b7b3a397043d3525612ef7109913d23 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Wed, 14 Apr 2021 23:44:13 -0400 Subject: [PATCH] cache trait accepts a bytestream instead --- src/cache/low_mem.rs | 16 +++++++--------- src/cache/mod.rs | 21 +++++++++++++++------ src/config.rs | 2 +- src/main.rs | 19 +++++++++++++------ src/routes.rs | 8 ++++---- 5 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs index 42265ed..eb17794 100644 --- a/src/cache/low_mem.rs +++ b/src/cache/low_mem.rs @@ -7,17 +7,17 @@ use bytes::Bytes; use futures::Stream; use lru::LruCache; -use super::{fs::FromFsStream, Cache, CacheKey}; +use super::{fs::FromFsStream, ByteStream, Cache, CacheKey}; pub struct LowMemCache { on_disk: LruCache, disk_path: PathBuf, - disk_max_size: usize, - disk_cur_size: usize, + disk_max_size: u64, + disk_cur_size: u64, } impl LowMemCache { - pub fn new(disk_max_size: usize, disk_path: PathBuf) -> Self { + pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Self { Self { on_disk: LruCache::unbounded(), disk_path, @@ -37,10 +37,8 @@ impl Cache for LowMemCache { } } - async fn put_stream( - &mut self, - key: CacheKey, - image: impl Stream> + Unpin + Send + 'static, - ) { + async fn put_stream(&mut self, key: CacheKey, image: ByteStream) { + // this call has a side effect and the returned future is for reading + let _ = super::fs::transparent_file_stream(&PathBuf::from(key), image); } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 50665cf..060766f 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -131,7 +131,7 @@ impl ImageMetadata { } #[async_trait] -pub trait Cache { +pub trait Cache: Send + Sync { async fn get(&mut self, _key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> { unimplemented!() } @@ -147,11 +147,20 @@ pub trait Cache { unimplemented!() } - async fn put_stream( - &mut self, - _key: CacheKey, - _image: impl Stream> + Unpin + Send + 'static, - ) { + async fn put_stream(&mut self, _key: CacheKey, _image: ByteStream) { unimplemented!() } } + +pub enum ByteStream {} + +impl Stream for ByteStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + todo!() + } +} diff --git a/src/config.rs b/src/config.rs index 08abb79..73231b2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -34,6 +34,6 @@ pub struct CliArgs { /// reasons. #[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)] pub enable_server_string: bool, - #[clap(short, long, conflicts_with("memory_quota"))] + #[clap(short, long, conflicts_with("memory_quota"), env = "LOW_MEMORY_MODE")] pub low_memory: bool, } diff --git a/src/main.rs b/src/main.rs index e5594d1..2a56377 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::{num::ParseIntError, sync::atomic::Ordering}; use actix_web::rt::{spawn, time, System}; use actix_web::web::{self, Data}; use actix_web::{App, HttpServer}; -use cache::GenerationalCache; +use cache::{Cache, GenerationalCache, LowMemCache}; use clap::Clap; use config::CliArgs; use log::{debug, error, warn, LevelFilter}; @@ -54,6 +54,7 @@ async fn main() -> Result<(), std::io::Error> { let memory_max_size = cli_args.memory_quota.get(); let disk_quota = cli_args.disk_quota; let cache_path = cli_args.cache_path.clone(); + let low_mem_mode = cli_args.low_memory; SimpleLogger::new() .with_level(LevelFilter::Info) @@ -106,16 +107,22 @@ async fn main() -> Result<(), std::io::Error> { // Start HTTPS server HttpServer::new(move || { + let cache: Box = if low_mem_mode { + Box::new(LowMemCache::new(disk_quota, cache_path.clone())) + } else { + Box::new(GenerationalCache::new( + memory_max_size, + disk_quota, + cache_path.clone(), + )) + }; + App::new() .service(routes::token_data) .service(routes::token_data_saver) .route("{tail:.*}", web::get().to(routes::default)) .app_data(Data::from(Arc::clone(&data_1))) - .app_data(Data::new(Mutex::new(GenerationalCache::new( - memory_max_size, - disk_quota, - cache_path.clone(), - )))) + .app_data(Data::new(Mutex::new(cache))) }) .shutdown_timeout(60) .bind_rustls(format!("0.0.0.0:{}", port), tls_config)? diff --git a/src/routes.rs b/src/routes.rs index 699e628..5f46beb 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -18,7 +18,7 @@ use serde::Deserialize; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use thiserror::Error; -use crate::cache::{Cache, CacheKey, CachedImage, GenerationalCache, ImageMetadata}; +use crate::cache::{Cache, CacheKey, CachedImage, ImageMetadata}; use crate::client_api_version; use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS}; use crate::state::RwLockServerState; @@ -52,7 +52,7 @@ impl Responder for ServerResponse { #[get("/{token}/data/{chapter_hash}/{file_name}")] async fn token_data( state: Data, - cache: Data>, + cache: Data>>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -68,7 +68,7 @@ async fn token_data( #[get("/{token}/data-saver/{chapter_hash}/{file_name}")] async fn token_data_saver( state: Data, - cache: Data>, + cache: Data>>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -175,7 +175,7 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder { async fn fetch_image( state: Data, - cache: Data>, + cache: Data>>, chapter_hash: String, file_name: String, is_data_saver: bool,