add pruning
This commit is contained in:
parent
525aef91bf
commit
ae216b2410
4 changed files with 40 additions and 14 deletions
7
src/cache/low_mem.rs
vendored
7
src/cache/low_mem.rs
vendored
|
@ -3,6 +3,7 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use log::warn;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
|
|
||||||
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
|
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
|
||||||
|
@ -25,8 +26,6 @@ impl LowMemCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: schedule eviction
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Cache for LowMemCache {
|
impl Cache for LowMemCache {
|
||||||
async fn get(
|
async fn get(
|
||||||
|
@ -55,4 +54,8 @@ impl Cache for LowMemCache {
|
||||||
.map(move |stream| (stream, self.on_disk.get(&key).unwrap()))
|
.map(move |stream| (stream, self.on_disk.get(&key).unwrap()))
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn prune(&mut self) {
|
||||||
|
warn!("Trimming has not been implemented yet. Cache is unbounded!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
6
src/cache/mod.rs
vendored
6
src/cache/mod.rs
vendored
|
@ -10,6 +10,7 @@ use bytes::Bytes;
|
||||||
use chrono::{DateTime, FixedOffset};
|
use chrono::{DateTime, FixedOffset};
|
||||||
use fs::FsStream;
|
use fs::FsStream;
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
|
use log::debug;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
pub use fs::UpstreamError;
|
pub use fs::UpstreamError;
|
||||||
|
@ -148,12 +149,17 @@ pub trait Cache: Send + Sync {
|
||||||
&mut self,
|
&mut self,
|
||||||
key: &CacheKey,
|
key: &CacheKey,
|
||||||
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>>;
|
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>>;
|
||||||
|
|
||||||
async fn put(
|
async fn put(
|
||||||
&mut self,
|
&mut self,
|
||||||
key: CacheKey,
|
key: CacheKey,
|
||||||
image: BoxedImageStream,
|
image: BoxedImageStream,
|
||||||
metadata: ImageMetadata,
|
metadata: ImageMetadata,
|
||||||
) -> Result<(CacheStream, &ImageMetadata), CacheError>;
|
) -> Result<(CacheStream, &ImageMetadata), CacheError>;
|
||||||
|
|
||||||
|
async fn prune(&mut self) {
|
||||||
|
debug!("Would trim but cache does not implement trimming!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum CacheStream {
|
pub enum CacheStream {
|
||||||
|
|
|
@ -35,7 +35,13 @@ pub struct CliArgs {
|
||||||
/// reasons.
|
/// reasons.
|
||||||
#[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)]
|
#[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)]
|
||||||
pub enable_server_string: bool,
|
pub enable_server_string: bool,
|
||||||
#[clap(short, long, conflicts_with("memory-quota"), env = "LOW_MEMORY_MODE")]
|
#[clap(
|
||||||
|
short,
|
||||||
|
long,
|
||||||
|
conflicts_with("memory-quota"),
|
||||||
|
env = "LOW_MEMORY_MODE",
|
||||||
|
takes_value = false
|
||||||
|
)]
|
||||||
pub low_memory: bool,
|
pub low_memory: bool,
|
||||||
#[clap(short, long, parse(from_occurrences))]
|
#[clap(short, long, parse(from_occurrences))]
|
||||||
pub verbose: usize,
|
pub verbose: usize,
|
||||||
|
|
33
src/main.rs
33
src/main.rs
|
@ -130,24 +130,35 @@ async fn main() -> Result<(), std::io::Error> {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let cache: Box<dyn Cache> = if low_mem_mode {
|
||||||
|
Box::new(LowMemCache::new(disk_quota, cache_path.clone()))
|
||||||
|
} else {
|
||||||
|
Box::new(GenerationalCache::new(
|
||||||
|
memory_max_size,
|
||||||
|
disk_quota,
|
||||||
|
cache_path.clone(),
|
||||||
|
))
|
||||||
|
};
|
||||||
|
let cache = Arc::new(Mutex::new(cache));
|
||||||
|
let cache1 = Arc::clone(&cache);
|
||||||
|
|
||||||
|
// Spawn periodic cache trimming
|
||||||
|
spawn(async move {
|
||||||
|
let mut interval = time::interval(Duration::from_secs(3 * 60));
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
cache.lock().prune().await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Start HTTPS server
|
// Start HTTPS server
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
let cache: Box<dyn Cache> = if low_mem_mode {
|
|
||||||
Box::new(LowMemCache::new(disk_quota, cache_path.clone()))
|
|
||||||
} else {
|
|
||||||
Box::new(GenerationalCache::new(
|
|
||||||
memory_max_size,
|
|
||||||
disk_quota,
|
|
||||||
cache_path.clone(),
|
|
||||||
))
|
|
||||||
};
|
|
||||||
|
|
||||||
App::new()
|
App::new()
|
||||||
.service(routes::token_data)
|
.service(routes::token_data)
|
||||||
.service(routes::token_data_saver)
|
.service(routes::token_data_saver)
|
||||||
.route("{tail:.*}", web::get().to(routes::default))
|
.route("{tail:.*}", web::get().to(routes::default))
|
||||||
.app_data(Data::from(Arc::clone(&data_1)))
|
.app_data(Data::from(Arc::clone(&data_1)))
|
||||||
.app_data(Data::new(Mutex::new(cache)))
|
.app_data(Data::from(Arc::clone(&cache1)))
|
||||||
})
|
})
|
||||||
.shutdown_timeout(60)
|
.shutdown_timeout(60)
|
||||||
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)?
|
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)?
|
||||||
|
|
Loading…
Reference in a new issue