use Arc for cache keys

This commit is contained in:
Edward Shen 2021-04-22 13:03:33 -04:00
parent efd6b5c60c
commit eab0449a02
Signed by: edward
GPG key ID: 19182661E818369F
6 changed files with 77 additions and 65 deletions

2
src/cache/fs.rs vendored
View file

@ -4,12 +4,12 @@ use futures::{Stream, StreamExt};
use log::{debug, error}; use log::{debug, error};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{collections::HashMap, sync::Arc};
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;

View file

@ -1,4 +1,4 @@
use std::path::PathBuf; use std::{path::PathBuf, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@ -137,65 +137,65 @@ impl GenerationalCache {
impl Cache for GenerationalCache { impl Cache for GenerationalCache {
async fn get( async fn get(
&self, &self,
key: &CacheKey, key: Arc<CacheKey>,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> { ) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
todo!(); todo!();
if self.in_memory.contains(key) { // if self.in_memory.contains(key) {
return self // return self
.in_memory // .in_memory
.get(key) // .get(key)
// TODO: get rid of clone? // // TODO: get rid of clone?
.map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone()))); // .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone())));
} // }
if let Some(metadata) = self.on_disk.pop(key) { // if let Some(metadata) = self.on_disk.pop(key) {
let new_key_path = { // let new_key_path = {
let mut root = self.disk_path.clone(); // let mut root = self.disk_path.clone();
root.push(PathBuf::from(key.clone())); // root.push(PathBuf::from(key.clone()));
root // root
}; // };
// extract from disk, if it exists // // extract from disk, if it exists
let file = File::open(&new_key_path).await; // let file = File::open(&new_key_path).await;
let mut buffer = metadata // let mut buffer = metadata
.content_length // .content_length
.map_or_else(Vec::new, |v| Vec::with_capacity(v as usize)); // .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize));
match file { // match file {
Ok(mut file) => { // Ok(mut file) => {
match file.read_to_end(&mut buffer).await { // match file.read_to_end(&mut buffer).await {
Ok(_) => { // Ok(_) => {
// We don't particularly care if we fail to delete from disk since // // We don't particularly care if we fail to delete from disk since
// if it's an error it means it's already been dropped. // // if it's an error it means it's already been dropped.
tokio::spawn(remove_file(new_key_path)); // tokio::spawn(remove_file(new_key_path));
} // }
Err(e) => { // Err(e) => {
warn!("Failed to read from {:?}: {}", new_key_path, e); // warn!("Failed to read from {:?}: {}", new_key_path, e);
} // }
} // }
} // }
Err(e) => { // Err(e) => {
warn!("Failed to open {:?}: {}", new_key_path, e); // warn!("Failed to open {:?}: {}", new_key_path, e);
return None; // return None;
} // }
} // }
buffer.shrink_to_fit(); // buffer.shrink_to_fit();
todo!(); // todo!();
self.disk_cur_size -= buffer.len() as u64; // self.disk_cur_size -= buffer.len() as u64;
// let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into()); // // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
// return Some(self.put(key.clone(), Box::new(image), metadata).await); // // return Some(self.put(key.clone(), Box::new(image), metadata).await);
} // }
None None
} }
async fn put( async fn put(
&self, &self,
key: CacheKey, key: Arc<CacheKey>,
mut image: BoxedImageStream, mut image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> { ) -> Result<CacheStream, CacheError> {
@ -227,12 +227,14 @@ impl Cache for GenerationalCache {
} }
} }
self.in_memory.put(key.clone(), (image, metadata)); todo!();
// self.in_memory.put(key.clone(), (image, metadata));
self.memory_cur_size += new_img_size; self.memory_cur_size += new_img_size;
} else { } else {
// Image was larger than memory capacity, push directly into cold // Image was larger than memory capacity, push directly into cold
// storage. // storage.
self.push_into_cold(key.clone(), image, metadata).await; todo!();
// self.push_into_cold(key.clone(), image, metadata).await;
}; };
// Push evicted hot entires into cold storage. // Push evicted hot entires into cold storage.

19
src/cache/low_mem.rs vendored
View file

@ -20,8 +20,8 @@ pub struct LowMemCache {
} }
enum DbMessage { enum DbMessage {
Get(CacheKey), Get(Arc<CacheKey>),
Put(CacheKey, ImageMetadata), Put(Arc<CacheKey>, ImageMetadata),
} }
impl LowMemCache { impl LowMemCache {
@ -87,14 +87,17 @@ impl LowMemCache {
impl Cache for LowMemCache { impl Cache for LowMemCache {
async fn get( async fn get(
&self, &self,
key: &CacheKey, key: Arc<CacheKey>,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> { ) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
let channel = self.db_update_channel_sender.clone(); let channel = self.db_update_channel_sender.clone();
let key_0 = key.clone(); let key_0 = Arc::clone(&key);
tokio::spawn(async move { channel.send(DbMessage::Get(key_0)).await }); tokio::spawn(async move { channel.send(DbMessage::Get(key_0)).await });
let path = self.disk_path.clone().join(PathBuf::from(key.clone())); let path = self
.disk_path
.clone()
.join(PathBuf::from(Arc::clone(&key).as_ref()));
super::fs::read_file(&path) super::fs::read_file(&path)
.await .await
.map(|res| res.map_err(Into::into)) .map(|res| res.map_err(Into::into))
@ -102,15 +105,15 @@ impl Cache for LowMemCache {
async fn put( async fn put(
&self, &self,
key: CacheKey, key: Arc<CacheKey>,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> { ) -> Result<CacheStream, CacheError> {
let channel = self.db_update_channel_sender.clone(); let channel = self.db_update_channel_sender.clone();
let key_0 = key.clone(); let key_0 = Arc::clone(&key);
tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await }); tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await });
let path = self.disk_path.clone().join(PathBuf::from(key)); let path = self.disk_path.clone().join(PathBuf::from(key.as_ref()));
super::fs::write_file( super::fs::write_file(
&path, &path,

17
src/cache/mod.rs vendored
View file

@ -1,8 +1,8 @@
use std::fmt::Display;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt::Display, sync::Arc};
use actix_web::http::HeaderValue; use actix_web::http::HeaderValue;
use async_trait::async_trait; use async_trait::async_trait;
@ -43,6 +43,13 @@ impl From<CacheKey> for PathBuf {
} }
} }
impl From<&CacheKey> for PathBuf {
#[inline]
fn from(key: &CacheKey) -> Self {
key.to_string().into()
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct CachedImage(pub Bytes); pub struct CachedImage(pub Bytes);
@ -147,12 +154,14 @@ pub enum CacheError {
#[async_trait] #[async_trait]
pub trait Cache: Send + Sync { pub trait Cache: Send + Sync {
async fn get(&self, key: &CacheKey) async fn get(
-> Option<Result<(CacheStream, ImageMetadata), CacheError>>; &self,
key: Arc<CacheKey>,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>>;
async fn put( async fn put(
&self, &self,
key: CacheKey, key: Arc<CacheKey>,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError>; ) -> Result<CacheStream, CacheError>;

View file

@ -22,7 +22,6 @@ use simple_logger::SimpleLogger;
use state::{RwLockServerState, ServerState}; use state::{RwLockServerState, ServerState};
use stop::send_stop; use stop::send_stop;
use thiserror::Error; use thiserror::Error;
use tokio::sync::RwLock as TokioRwLock;
mod cache; mod cache;
mod config; mod config;

View file

@ -1,4 +1,4 @@
use std::sync::atomic::Ordering; use std::sync::{atomic::Ordering, Arc};
use actix_web::http::header::{ use actix_web::http::header::{
ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH,
@ -15,7 +15,6 @@ use log::{debug, error, info, warn};
use serde::Deserialize; use serde::Deserialize;
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
use thiserror::Error; use thiserror::Error;
use tokio::sync::RwLock;
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError}; use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
use crate::client_api_version; use crate::client_api_version;
@ -192,9 +191,9 @@ async fn fetch_image(
file_name: String, file_name: String,
is_data_saver: bool, is_data_saver: bool,
) -> ServerResponse { ) -> ServerResponse {
let key = CacheKey(chapter_hash, file_name, is_data_saver); let key = Arc::new(CacheKey(chapter_hash, file_name, is_data_saver));
match cache.get(&key).await { match cache.get(Arc::clone(&key)).await {
Some(Ok((image, metadata))) => { Some(Ok((image, metadata))) => {
return construct_response(image, &metadata); return construct_response(image, &metadata);
} }