mangadex-home-rs/src/cache/generational.rs

246 lines
8.6 KiB
Rust
Raw Normal View History

2021-04-14 19:11:00 -07:00
use std::path::PathBuf;
use async_trait::async_trait;
use bytes::Bytes;
2021-04-18 14:06:40 -07:00
use futures::{stream::StreamExt, TryStreamExt};
2021-04-14 19:11:00 -07:00
use log::{debug, warn};
use lru::LruCache;
use tokio::fs::{remove_file, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2021-04-18 14:06:40 -07:00
use super::{
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CachedImage, ImageMetadata,
};
2021-04-14 19:11:00 -07:00
pub struct GenerationalCache {
in_memory: LruCache<CacheKey, (CachedImage, ImageMetadata)>,
2021-04-14 19:52:54 -07:00
memory_max_size: u64,
memory_cur_size: u64,
2021-04-14 19:11:00 -07:00
on_disk: LruCache<CacheKey, ImageMetadata>,
disk_path: PathBuf,
2021-04-14 19:52:54 -07:00
disk_max_size: u64,
disk_cur_size: u64,
2021-04-14 19:11:00 -07:00
}
impl GenerationalCache {
2021-04-14 19:52:54 -07:00
pub fn new(memory_max_size: u64, disk_max_size: u64, disk_path: PathBuf) -> Self {
2021-04-14 19:11:00 -07:00
Self {
in_memory: LruCache::unbounded(),
memory_max_size,
memory_cur_size: 0,
on_disk: LruCache::unbounded(),
disk_path,
disk_max_size,
disk_cur_size: 0,
}
}
async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) {
let new_img_size = image.0.len();
let mut to_drop = vec![];
2021-04-14 19:52:54 -07:00
if self.disk_max_size >= new_img_size as u64 {
2021-04-14 19:11:00 -07:00
// Add images to drop from cold cache into a queue
2021-04-14 19:52:54 -07:00
while new_img_size as u64 + self.disk_cur_size > self.disk_max_size {
2021-04-14 19:11:00 -07:00
match self.on_disk.pop_lru() {
Some((key, _)) => {
let mut path = self.disk_path.clone();
path.push(PathBuf::from(key));
let async_file = File::open(&path).await;
// Basically, the assumption here is that if the meta
// data was deleted, we can't assume that deleting the
// value will yield any free space back, so we assume a
// size of zero while this is the case. This also means
// that we automatically drop broken links as well.
if let Ok(file) = async_file {
let file_size = file
.into_std()
.await
.metadata()
.map(|metadata| {
#[cfg(target_os = "windows")]
{
use std::os::windows::fs::MetadataExt;
metadata.file_size()
}
#[cfg(target_os = "linux")]
{
use std::os::unix::fs::MetadataExt;
metadata.size()
}
})
.unwrap_or_default();
2021-04-14 19:52:54 -07:00
self.disk_cur_size -= file_size;
2021-04-14 19:11:00 -07:00
}
to_drop.push(path);
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
} else {
warn!(
"Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}",
key
);
return;
}
// Run all cold caching in parallel, we don't care if the removal fails
// because it just means it's already been removed.
//
// We also don't care when it happens, so just spawn tasks to do them
// later, whenever is convenient.
for to_drop in to_drop {
tokio::spawn(remove_file(to_drop));
}
let new_key_path = {
let mut root = self.disk_path.clone();
2021-04-14 19:52:54 -07:00
root.push(PathBuf::from(key.clone()));
2021-04-14 19:11:00 -07:00
root
};
match File::open(&new_key_path).await {
Ok(mut file) => {
debug!("Starting to write to file: {:?}", &new_key_path);
match file.write_all(&image.0).await {
Ok(_) => {
self.on_disk.put(key, metadata);
2021-04-14 19:52:54 -07:00
self.disk_cur_size += new_img_size as u64;
2021-04-14 19:11:00 -07:00
debug!(
"Successfully written data to disk for file {:?}",
new_key_path
);
}
Err(e) => {
warn!("Failed to write to {:?}: {}", new_key_path, e);
}
}
}
Err(e) => {
warn!("Failed to open {:?}: {}", new_key_path, e);
}
}
}
}
#[async_trait]
impl Cache for GenerationalCache {
2021-04-18 14:06:40 -07:00
async fn get(
&mut self,
key: &CacheKey,
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> {
2021-04-14 19:11:00 -07:00
if self.in_memory.contains(key) {
2021-04-18 14:06:40 -07:00
return self
.in_memory
.get(key)
// TODO: get rid of clone?
.map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata)));
2021-04-14 19:11:00 -07:00
}
if let Some(metadata) = self.on_disk.pop(key) {
let new_key_path = {
let mut root = self.disk_path.clone();
2021-04-14 19:52:54 -07:00
root.push(PathBuf::from(key.clone()));
2021-04-14 19:11:00 -07:00
root
};
// extract from disk, if it exists
2021-04-14 19:52:54 -07:00
let file = File::open(&new_key_path).await;
2021-04-14 19:11:00 -07:00
2021-04-14 19:52:54 -07:00
let mut buffer = metadata
.content_length
2021-04-18 14:06:40 -07:00
.map_or_else(Vec::new, |v| Vec::with_capacity(v as usize));
2021-04-14 19:11:00 -07:00
match file {
Ok(mut file) => {
match file.read_to_end(&mut buffer).await {
Ok(_) => {
// We don't particularly care if we fail to delete from disk since
// if it's an error it means it's already been dropped.
2021-04-14 19:52:54 -07:00
tokio::spawn(remove_file(new_key_path));
2021-04-14 19:11:00 -07:00
}
Err(e) => {
warn!("Failed to read from {:?}: {}", new_key_path, e);
}
}
}
Err(e) => {
warn!("Failed to open {:?}: {}", new_key_path, e);
return None;
}
}
buffer.shrink_to_fit();
2021-04-14 19:52:54 -07:00
self.disk_cur_size -= buffer.len() as u64;
2021-04-18 14:06:40 -07:00
let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
2021-04-14 19:11:00 -07:00
2021-04-18 14:06:40 -07:00
return Some(self.put(key.clone(), Box::new(image), metadata).await);
2021-04-14 19:11:00 -07:00
}
None
}
2021-04-18 14:06:40 -07:00
async fn put(
&mut self,
key: CacheKey,
mut image: BoxedImageStream,
metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError> {
2021-04-14 19:11:00 -07:00
let mut hot_evicted = vec![];
2021-04-18 14:06:40 -07:00
let image = {
let mut resolved = vec![];
while let Some(bytes) = image.next().await {
resolved.extend(bytes?);
}
CachedImage(Bytes::from(resolved))
};
2021-04-14 19:52:54 -07:00
let new_img_size = image.0.len() as u64;
2021-04-14 19:11:00 -07:00
if self.memory_max_size >= new_img_size {
// Evict oldest entires to make space for new image.
while new_img_size + self.memory_cur_size > self.memory_max_size {
match self.in_memory.pop_lru() {
Some((key, (image, metadata))) => {
2021-04-14 19:52:54 -07:00
self.memory_cur_size -= image.0.len() as u64;
2021-04-14 19:11:00 -07:00
hot_evicted.push((key, image, metadata));
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
2021-04-18 14:06:40 -07:00
self.in_memory.put(key.clone(), (image, metadata));
2021-04-14 19:11:00 -07:00
self.memory_cur_size += new_img_size;
} else {
// Image was larger than memory capacity, push directly into cold
// storage.
2021-04-18 14:06:40 -07:00
self.push_into_cold(key.clone(), image, metadata).await;
2021-04-14 19:11:00 -07:00
};
// Push evicted hot entires into cold storage.
for (key, image, metadata) in hot_evicted {
self.push_into_cold(key, image, metadata).await;
}
2021-04-18 14:06:40 -07:00
self.get(&key).await.unwrap()
2021-04-14 19:11:00 -07:00
}
2021-04-19 19:01:32 -07:00
// noop
async fn increase_usage(&mut self, _amt: u64) {}
2021-04-14 19:11:00 -07:00
}