diff --git a/src/cache/fs.rs b/src/cache/fs.rs index cc9b844..07fddc5 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -4,12 +4,12 @@ use futures::{Stream, StreamExt}; use log::{debug, error}; use once_cell::sync::Lazy; use serde::Deserialize; +use std::collections::HashMap; use std::fmt::Display; use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{collections::HashMap, sync::Arc}; use tokio::fs::{create_dir_all, remove_file, File}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::sync::mpsc::UnboundedSender; @@ -43,8 +43,8 @@ static WRITING_STATUS: Lazy>>> = pub async fn read_file( path: &Path, ) -> Option> { - let file = File::open(path).await.ok()?; - let std_file = file.try_clone().await.unwrap().into_std().await; + let std_file = std::fs::File::open(path).ok()?; + let file = File::from_std(std_file.try_clone().ok()?); let metadata = { let mut de = serde_json::Deserializer::from_reader(std_file); diff --git a/src/cache/generational.rs b/src/cache/generational.rs deleted file mode 100644 index 250b953..0000000 --- a/src/cache/generational.rs +++ /dev/null @@ -1,251 +0,0 @@ -use std::{path::PathBuf, sync::Arc}; - -use async_trait::async_trait; -use bytes::Bytes; -use futures::{stream::StreamExt, TryStreamExt}; -use log::{debug, warn}; -use lru::LruCache; -use tokio::fs::{remove_file, File}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -use super::{ - BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CachedImage, ImageMetadata, -}; - -pub struct GenerationalCache { - in_memory: LruCache, - memory_max_size: u64, - memory_cur_size: u64, - - on_disk: LruCache, - disk_path: PathBuf, - disk_max_size: u64, - disk_cur_size: u64, -} - -impl GenerationalCache { - pub fn new(memory_max_size: u64, disk_max_size: u64, disk_path: PathBuf) -> Self { - Self { - in_memory: LruCache::unbounded(), - memory_max_size, - memory_cur_size: 0, - - on_disk: LruCache::unbounded(), - disk_path, - disk_max_size, - disk_cur_size: 0, - } - } - - async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) { - let new_img_size = image.0.len(); - let mut to_drop = vec![]; - - if self.disk_max_size >= new_img_size as u64 { - // Add images to drop from cold cache into a queue - while new_img_size as u64 + self.disk_cur_size > self.disk_max_size { - match self.on_disk.pop_lru() { - Some((key, _)) => { - let mut path = self.disk_path.clone(); - path.push(PathBuf::from(key)); - let async_file = File::open(&path).await; - - // Basically, the assumption here is that if the meta - // data was deleted, we can't assume that deleting the - // value will yield any free space back, so we assume a - // size of zero while this is the case. This also means - // that we automatically drop broken links as well. - if let Ok(file) = async_file { - let file_size = file - .into_std() - .await - .metadata() - .map(|metadata| { - #[cfg(target_os = "windows")] - { - use std::os::windows::fs::MetadataExt; - metadata.file_size() - } - - #[cfg(target_os = "linux")] - { - use std::os::unix::fs::MetadataExt; - metadata.size() - } - }) - .unwrap_or_default(); - - self.disk_cur_size -= file_size; - } - - to_drop.push(path); - } - None => unreachable!(concat!( - "Invariant violated. Cache is empty but we already ", - "guaranteed we can remove items from cache to make space." - )), - } - } - } else { - warn!( - "Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}", - key - ); - return; - } - - // Run all cold caching in parallel, we don't care if the removal fails - // because it just means it's already been removed. - // - // We also don't care when it happens, so just spawn tasks to do them - // later, whenever is convenient. - for to_drop in to_drop { - tokio::spawn(remove_file(to_drop)); - } - - let new_key_path = { - let mut root = self.disk_path.clone(); - root.push(PathBuf::from(key.clone())); - root - }; - - match File::open(&new_key_path).await { - Ok(mut file) => { - debug!("Starting to write to file: {:?}", &new_key_path); - match file.write_all(&image.0).await { - Ok(_) => { - self.on_disk.put(key, metadata); - self.disk_cur_size += new_img_size as u64; - debug!( - "Successfully written data to disk for file {:?}", - new_key_path - ); - } - Err(e) => { - warn!("Failed to write to {:?}: {}", new_key_path, e); - } - } - } - Err(e) => { - warn!("Failed to open {:?}: {}", new_key_path, e); - } - } - } -} - -#[async_trait] -impl Cache for GenerationalCache { - async fn get( - &self, - key: Arc, - ) -> Option> { - todo!(); - // if self.in_memory.contains(key) { - // return self - // .in_memory - // .get(key) - // // TODO: get rid of clone? - // .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone()))); - // } - - // if let Some(metadata) = self.on_disk.pop(key) { - // let new_key_path = { - // let mut root = self.disk_path.clone(); - // root.push(PathBuf::from(key.clone())); - // root - // }; - - // // extract from disk, if it exists - // let file = File::open(&new_key_path).await; - - // let mut buffer = metadata - // .content_length - // .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize)); - - // match file { - // Ok(mut file) => { - // match file.read_to_end(&mut buffer).await { - // Ok(_) => { - // // We don't particularly care if we fail to delete from disk since - // // if it's an error it means it's already been dropped. - // tokio::spawn(remove_file(new_key_path)); - // } - // Err(e) => { - // warn!("Failed to read from {:?}: {}", new_key_path, e); - // } - // } - // } - // Err(e) => { - // warn!("Failed to open {:?}: {}", new_key_path, e); - // return None; - // } - // } - - // buffer.shrink_to_fit(); - - // todo!(); - // self.disk_cur_size -= buffer.len() as u64; - // // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into()); - - // // return Some(self.put(key.clone(), Box::new(image), metadata).await); - // } - - None - } - - async fn put( - &self, - key: Arc, - mut image: BoxedImageStream, - metadata: ImageMetadata, - ) -> Result { - todo!(); - let mut hot_evicted = vec![]; - - let image = { - let mut resolved = vec![]; - while let Some(bytes) = image.next().await { - resolved.extend(bytes?); - } - CachedImage(Bytes::from(resolved)) - }; - - let new_img_size = image.0.len() as u64; - - if self.memory_max_size >= new_img_size { - // Evict oldest entires to make space for new image. - while new_img_size + self.memory_cur_size > self.memory_max_size { - match self.in_memory.pop_lru() { - Some((key, (image, metadata))) => { - self.memory_cur_size -= image.0.len() as u64; - hot_evicted.push((key, image, metadata)); - } - None => unreachable!(concat!( - "Invariant violated. Cache is empty but we already ", - "guaranteed we can remove items from cache to make space." - )), - } - } - - todo!(); - // self.in_memory.put(key.clone(), (image, metadata)); - self.memory_cur_size += new_img_size; - } else { - // Image was larger than memory capacity, push directly into cold - // storage. - todo!(); - // self.push_into_cold(key.clone(), image, metadata).await; - }; - - // Push evicted hot entires into cold storage. - for (key, image, metadata) in hot_evicted { - self.push_into_cold(key, image, metadata).await; - } - - todo!(); - // self.get(&key).await.unwrap() - } - - // noop - async fn increase_usage(&self, _amt: u64) {} -} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 9d3b112..bd778a4 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -16,11 +16,9 @@ use tokio::fs::File; use tokio_util::codec::{BytesCodec, FramedRead}; pub use fs::UpstreamError; -pub use generational::GenerationalCache; pub use low_mem::LowMemCache; mod fs; -mod generational; mod low_mem; #[derive(PartialEq, Eq, Hash, Clone)] diff --git a/src/main.rs b/src/main.rs index c8d245a..b5a7515 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::{num::ParseIntError, sync::atomic::Ordering}; use actix_web::rt::{spawn, time, System}; use actix_web::web::{self, Data}; use actix_web::{App, HttpServer}; -use cache::{Cache, GenerationalCache, LowMemCache}; +use cache::{Cache, LowMemCache}; use clap::Clap; use config::CliArgs; use log::{debug, error, warn, LevelFilter}; @@ -123,17 +123,18 @@ async fn main() -> Result<(), std::io::Error> { } }); - let cache: Arc> = if low_mem_mode { - LowMemCache::new(disk_quota, cache_path.clone()).await - } else { - Arc::new(Box::new(GenerationalCache::new( - memory_max_size, - disk_quota, - cache_path.clone(), - ))) - }; - let cache = Arc::clone(&cache); - let cache1 = Arc::clone(&cache); + // let cache: Arc> = if low_mem_mode { + // LowMemCache::new(disk_quota, cache_path.clone()).await + // } else { + // Arc::new(Box::new(GenerationalCache::new( + // memory_max_size, + // disk_quota, + // cache_path.clone(), + // ))) + // }; + + let cache = LowMemCache::new(disk_quota, cache_path.clone()).await; + let cache_0 = Arc::clone(&cache); // Start HTTPS server HttpServer::new(move || { @@ -142,7 +143,7 @@ async fn main() -> Result<(), std::io::Error> { .service(routes::token_data_saver) .route("{tail:.*}", web::get().to(routes::default)) .app_data(Data::from(Arc::clone(&data_1))) - .app_data(Data::from(Arc::clone(&cache1))) + .app_data(Data::from(Arc::clone(&cache_0))) }) .shutdown_timeout(60) .bind_rustls(format!("0.0.0.0:{}", port), tls_config)?