cache trait accepts a bytestream instead

This commit is contained in:
Edward Shen 2021-04-14 23:44:13 -04:00
parent 8949e41bee
commit c25b8be45b
Signed by: edward
GPG key ID: 19182661E818369F
5 changed files with 40 additions and 26 deletions

16
src/cache/low_mem.rs vendored
View file

@ -7,17 +7,17 @@ use bytes::Bytes;
use futures::Stream; use futures::Stream;
use lru::LruCache; use lru::LruCache;
use super::{fs::FromFsStream, Cache, CacheKey}; use super::{fs::FromFsStream, ByteStream, Cache, CacheKey};
pub struct LowMemCache { pub struct LowMemCache {
on_disk: LruCache<CacheKey, ()>, on_disk: LruCache<CacheKey, ()>,
disk_path: PathBuf, disk_path: PathBuf,
disk_max_size: usize, disk_max_size: u64,
disk_cur_size: usize, disk_cur_size: u64,
} }
impl LowMemCache { impl LowMemCache {
pub fn new(disk_max_size: usize, disk_path: PathBuf) -> Self { pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Self {
Self { Self {
on_disk: LruCache::unbounded(), on_disk: LruCache::unbounded(),
disk_path, disk_path,
@ -37,10 +37,8 @@ impl Cache for LowMemCache {
} }
} }
async fn put_stream( async fn put_stream(&mut self, key: CacheKey, image: ByteStream) {
&mut self, // this call has a side effect and the returned future is for reading
key: CacheKey, let _ = super::fs::transparent_file_stream(&PathBuf::from(key), image);
image: impl Stream<Item = Result<Bytes, reqwest::Error>> + Unpin + Send + 'static,
) {
} }
} }

21
src/cache/mod.rs vendored
View file

@ -131,7 +131,7 @@ impl ImageMetadata {
} }
#[async_trait] #[async_trait]
pub trait Cache { pub trait Cache: Send + Sync {
async fn get(&mut self, _key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> { async fn get(&mut self, _key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> {
unimplemented!() unimplemented!()
} }
@ -147,11 +147,20 @@ pub trait Cache {
unimplemented!() unimplemented!()
} }
async fn put_stream( async fn put_stream(&mut self, _key: CacheKey, _image: ByteStream) {
&mut self,
_key: CacheKey,
_image: impl Stream<Item = Result<Bytes, reqwest::Error>> + Unpin + Send + 'static,
) {
unimplemented!() unimplemented!()
} }
} }
pub enum ByteStream {}
impl Stream for ByteStream {
type Item = Result<Bytes, reqwest::Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
todo!()
}
}

View file

@ -34,6 +34,6 @@ pub struct CliArgs {
/// reasons. /// reasons.
#[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)] #[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)]
pub enable_server_string: bool, pub enable_server_string: bool,
#[clap(short, long, conflicts_with("memory_quota"))] #[clap(short, long, conflicts_with("memory_quota"), env = "LOW_MEMORY_MODE")]
pub low_memory: bool, pub low_memory: bool,
} }

View file

@ -12,7 +12,7 @@ use std::{num::ParseIntError, sync::atomic::Ordering};
use actix_web::rt::{spawn, time, System}; use actix_web::rt::{spawn, time, System};
use actix_web::web::{self, Data}; use actix_web::web::{self, Data};
use actix_web::{App, HttpServer}; use actix_web::{App, HttpServer};
use cache::GenerationalCache; use cache::{Cache, GenerationalCache, LowMemCache};
use clap::Clap; use clap::Clap;
use config::CliArgs; use config::CliArgs;
use log::{debug, error, warn, LevelFilter}; use log::{debug, error, warn, LevelFilter};
@ -54,6 +54,7 @@ async fn main() -> Result<(), std::io::Error> {
let memory_max_size = cli_args.memory_quota.get(); let memory_max_size = cli_args.memory_quota.get();
let disk_quota = cli_args.disk_quota; let disk_quota = cli_args.disk_quota;
let cache_path = cli_args.cache_path.clone(); let cache_path = cli_args.cache_path.clone();
let low_mem_mode = cli_args.low_memory;
SimpleLogger::new() SimpleLogger::new()
.with_level(LevelFilter::Info) .with_level(LevelFilter::Info)
@ -106,16 +107,22 @@ async fn main() -> Result<(), std::io::Error> {
// Start HTTPS server // Start HTTPS server
HttpServer::new(move || { HttpServer::new(move || {
let cache: Box<dyn Cache> = if low_mem_mode {
Box::new(LowMemCache::new(disk_quota, cache_path.clone()))
} else {
Box::new(GenerationalCache::new(
memory_max_size,
disk_quota,
cache_path.clone(),
))
};
App::new() App::new()
.service(routes::token_data) .service(routes::token_data)
.service(routes::token_data_saver) .service(routes::token_data_saver)
.route("{tail:.*}", web::get().to(routes::default)) .route("{tail:.*}", web::get().to(routes::default))
.app_data(Data::from(Arc::clone(&data_1))) .app_data(Data::from(Arc::clone(&data_1)))
.app_data(Data::new(Mutex::new(GenerationalCache::new( .app_data(Data::new(Mutex::new(cache)))
memory_max_size,
disk_quota,
cache_path.clone(),
))))
}) })
.shutdown_timeout(60) .shutdown_timeout(60)
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)? .bind_rustls(format!("0.0.0.0:{}", port), tls_config)?

View file

@ -18,7 +18,7 @@ use serde::Deserialize;
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
use thiserror::Error; use thiserror::Error;
use crate::cache::{Cache, CacheKey, CachedImage, GenerationalCache, ImageMetadata}; use crate::cache::{Cache, CacheKey, CachedImage, ImageMetadata};
use crate::client_api_version; use crate::client_api_version;
use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS}; use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS};
use crate::state::RwLockServerState; use crate::state::RwLockServerState;
@ -52,7 +52,7 @@ impl Responder for ServerResponse {
#[get("/{token}/data/{chapter_hash}/{file_name}")] #[get("/{token}/data/{chapter_hash}/{file_name}")]
async fn token_data( async fn token_data(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Mutex<GenerationalCache>>, cache: Data<Mutex<Box<dyn Cache>>>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -68,7 +68,7 @@ async fn token_data(
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")] #[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
async fn token_data_saver( async fn token_data_saver(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Mutex<GenerationalCache>>, cache: Data<Mutex<Box<dyn Cache>>>,
path: Path<(String, String, String)>, path: Path<(String, String, String)>,
) -> impl Responder { ) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner(); let (token, chapter_hash, file_name) = path.into_inner();
@ -175,7 +175,7 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
async fn fetch_image( async fn fetch_image(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
cache: Data<Mutex<GenerationalCache>>, cache: Data<Mutex<Box<dyn Cache>>>,
chapter_hash: String, chapter_hash: String,
file_name: String, file_name: String,
is_data_saver: bool, is_data_saver: bool,