temporary remove generational

This commit is contained in:
Edward Shen 2021-04-22 13:11:08 -04:00
parent eab0449a02
commit 5099666322
Signed by: edward
GPG key ID: 19182661E818369F
4 changed files with 17 additions and 269 deletions

6
src/cache/fs.rs vendored
View file

@ -4,12 +4,12 @@ use futures::{Stream, StreamExt};
use log::{debug, error};
use once_cell::sync::Lazy;
use serde::Deserialize;
use std::collections::HashMap;
use std::fmt::Display;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{collections::HashMap, sync::Arc};
use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::sync::mpsc::UnboundedSender;
@ -43,8 +43,8 @@ static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
pub async fn read_file(
path: &Path,
) -> Option<Result<(CacheStream, ImageMetadata), std::io::Error>> {
let file = File::open(path).await.ok()?;
let std_file = file.try_clone().await.unwrap().into_std().await;
let std_file = std::fs::File::open(path).ok()?;
let file = File::from_std(std_file.try_clone().ok()?);
let metadata = {
let mut de = serde_json::Deserializer::from_reader(std_file);

View file

@ -1,251 +0,0 @@
use std::{path::PathBuf, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::StreamExt, TryStreamExt};
use log::{debug, warn};
use lru::LruCache;
use tokio::fs::{remove_file, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::{
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CachedImage, ImageMetadata,
};
pub struct GenerationalCache {
in_memory: LruCache<CacheKey, (CachedImage, ImageMetadata)>,
memory_max_size: u64,
memory_cur_size: u64,
on_disk: LruCache<CacheKey, ImageMetadata>,
disk_path: PathBuf,
disk_max_size: u64,
disk_cur_size: u64,
}
impl GenerationalCache {
pub fn new(memory_max_size: u64, disk_max_size: u64, disk_path: PathBuf) -> Self {
Self {
in_memory: LruCache::unbounded(),
memory_max_size,
memory_cur_size: 0,
on_disk: LruCache::unbounded(),
disk_path,
disk_max_size,
disk_cur_size: 0,
}
}
async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) {
let new_img_size = image.0.len();
let mut to_drop = vec![];
if self.disk_max_size >= new_img_size as u64 {
// Add images to drop from cold cache into a queue
while new_img_size as u64 + self.disk_cur_size > self.disk_max_size {
match self.on_disk.pop_lru() {
Some((key, _)) => {
let mut path = self.disk_path.clone();
path.push(PathBuf::from(key));
let async_file = File::open(&path).await;
// Basically, the assumption here is that if the meta
// data was deleted, we can't assume that deleting the
// value will yield any free space back, so we assume a
// size of zero while this is the case. This also means
// that we automatically drop broken links as well.
if let Ok(file) = async_file {
let file_size = file
.into_std()
.await
.metadata()
.map(|metadata| {
#[cfg(target_os = "windows")]
{
use std::os::windows::fs::MetadataExt;
metadata.file_size()
}
#[cfg(target_os = "linux")]
{
use std::os::unix::fs::MetadataExt;
metadata.size()
}
})
.unwrap_or_default();
self.disk_cur_size -= file_size;
}
to_drop.push(path);
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
} else {
warn!(
"Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}",
key
);
return;
}
// Run all cold caching in parallel, we don't care if the removal fails
// because it just means it's already been removed.
//
// We also don't care when it happens, so just spawn tasks to do them
// later, whenever is convenient.
for to_drop in to_drop {
tokio::spawn(remove_file(to_drop));
}
let new_key_path = {
let mut root = self.disk_path.clone();
root.push(PathBuf::from(key.clone()));
root
};
match File::open(&new_key_path).await {
Ok(mut file) => {
debug!("Starting to write to file: {:?}", &new_key_path);
match file.write_all(&image.0).await {
Ok(_) => {
self.on_disk.put(key, metadata);
self.disk_cur_size += new_img_size as u64;
debug!(
"Successfully written data to disk for file {:?}",
new_key_path
);
}
Err(e) => {
warn!("Failed to write to {:?}: {}", new_key_path, e);
}
}
}
Err(e) => {
warn!("Failed to open {:?}: {}", new_key_path, e);
}
}
}
}
#[async_trait]
impl Cache for GenerationalCache {
async fn get(
&self,
key: Arc<CacheKey>,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
todo!();
// if self.in_memory.contains(key) {
// return self
// .in_memory
// .get(key)
// // TODO: get rid of clone?
// .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone())));
// }
// if let Some(metadata) = self.on_disk.pop(key) {
// let new_key_path = {
// let mut root = self.disk_path.clone();
// root.push(PathBuf::from(key.clone()));
// root
// };
// // extract from disk, if it exists
// let file = File::open(&new_key_path).await;
// let mut buffer = metadata
// .content_length
// .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize));
// match file {
// Ok(mut file) => {
// match file.read_to_end(&mut buffer).await {
// Ok(_) => {
// // We don't particularly care if we fail to delete from disk since
// // if it's an error it means it's already been dropped.
// tokio::spawn(remove_file(new_key_path));
// }
// Err(e) => {
// warn!("Failed to read from {:?}: {}", new_key_path, e);
// }
// }
// }
// Err(e) => {
// warn!("Failed to open {:?}: {}", new_key_path, e);
// return None;
// }
// }
// buffer.shrink_to_fit();
// todo!();
// self.disk_cur_size -= buffer.len() as u64;
// // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
// // return Some(self.put(key.clone(), Box::new(image), metadata).await);
// }
None
}
async fn put(
&self,
key: Arc<CacheKey>,
mut image: BoxedImageStream,
metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> {
todo!();
let mut hot_evicted = vec![];
let image = {
let mut resolved = vec![];
while let Some(bytes) = image.next().await {
resolved.extend(bytes?);
}
CachedImage(Bytes::from(resolved))
};
let new_img_size = image.0.len() as u64;
if self.memory_max_size >= new_img_size {
// Evict oldest entires to make space for new image.
while new_img_size + self.memory_cur_size > self.memory_max_size {
match self.in_memory.pop_lru() {
Some((key, (image, metadata))) => {
self.memory_cur_size -= image.0.len() as u64;
hot_evicted.push((key, image, metadata));
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
todo!();
// self.in_memory.put(key.clone(), (image, metadata));
self.memory_cur_size += new_img_size;
} else {
// Image was larger than memory capacity, push directly into cold
// storage.
todo!();
// self.push_into_cold(key.clone(), image, metadata).await;
};
// Push evicted hot entires into cold storage.
for (key, image, metadata) in hot_evicted {
self.push_into_cold(key, image, metadata).await;
}
todo!();
// self.get(&key).await.unwrap()
}
// noop
async fn increase_usage(&self, _amt: u64) {}
}

2
src/cache/mod.rs vendored
View file

@ -16,11 +16,9 @@ use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
pub use fs::UpstreamError;
pub use generational::GenerationalCache;
pub use low_mem::LowMemCache;
mod fs;
mod generational;
mod low_mem;
#[derive(PartialEq, Eq, Hash, Clone)]

View file

@ -12,7 +12,7 @@ use std::{num::ParseIntError, sync::atomic::Ordering};
use actix_web::rt::{spawn, time, System};
use actix_web::web::{self, Data};
use actix_web::{App, HttpServer};
use cache::{Cache, GenerationalCache, LowMemCache};
use cache::{Cache, LowMemCache};
use clap::Clap;
use config::CliArgs;
use log::{debug, error, warn, LevelFilter};
@ -123,17 +123,18 @@ async fn main() -> Result<(), std::io::Error> {
}
});
let cache: Arc<Box<dyn Cache>> = if low_mem_mode {
LowMemCache::new(disk_quota, cache_path.clone()).await
} else {
Arc::new(Box::new(GenerationalCache::new(
memory_max_size,
disk_quota,
cache_path.clone(),
)))
};
let cache = Arc::clone(&cache);
let cache1 = Arc::clone(&cache);
// let cache: Arc<Box<dyn Cache>> = if low_mem_mode {
// LowMemCache::new(disk_quota, cache_path.clone()).await
// } else {
// Arc::new(Box::new(GenerationalCache::new(
// memory_max_size,
// disk_quota,
// cache_path.clone(),
// )))
// };
let cache = LowMemCache::new(disk_quota, cache_path.clone()).await;
let cache_0 = Arc::clone(&cache);
// Start HTTPS server
HttpServer::new(move || {
@ -142,7 +143,7 @@ async fn main() -> Result<(), std::io::Error> {
.service(routes::token_data_saver)
.route("{tail:.*}", web::get().to(routes::default))
.app_data(Data::from(Arc::clone(&data_1)))
.app_data(Data::from(Arc::clone(&cache1)))
.app_data(Data::from(Arc::clone(&cache_0)))
})
.shutdown_timeout(60)
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)?