From eab0449a02b220ea276daa55c557dbe1e20a53c6 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Thu, 22 Apr 2021 13:03:33 -0400 Subject: [PATCH] use Arc for cache keys --- src/cache/fs.rs | 2 +- src/cache/generational.rs | 96 ++++++++++++++++++++------------------- src/cache/low_mem.rs | 19 ++++---- src/cache/mod.rs | 17 +++++-- src/main.rs | 1 - src/routes.rs | 7 ++- 6 files changed, 77 insertions(+), 65 deletions(-) diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 9b5c91c..cc9b844 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -4,12 +4,12 @@ use futures::{Stream, StreamExt}; use log::{debug, error}; use once_cell::sync::Lazy; use serde::Deserialize; -use std::collections::HashMap; use std::fmt::Display; use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; +use std::{collections::HashMap, sync::Arc}; use tokio::fs::{create_dir_all, remove_file, File}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::sync::mpsc::UnboundedSender; diff --git a/src/cache/generational.rs b/src/cache/generational.rs index 9f63c31..250b953 100644 --- a/src/cache/generational.rs +++ b/src/cache/generational.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; @@ -137,65 +137,65 @@ impl GenerationalCache { impl Cache for GenerationalCache { async fn get( &self, - key: &CacheKey, + key: Arc, ) -> Option> { todo!(); - if self.in_memory.contains(key) { - return self - .in_memory - .get(key) - // TODO: get rid of clone? - .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone()))); - } + // if self.in_memory.contains(key) { + // return self + // .in_memory + // .get(key) + // // TODO: get rid of clone? + // .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone()))); + // } - if let Some(metadata) = self.on_disk.pop(key) { - let new_key_path = { - let mut root = self.disk_path.clone(); - root.push(PathBuf::from(key.clone())); - root - }; + // if let Some(metadata) = self.on_disk.pop(key) { + // let new_key_path = { + // let mut root = self.disk_path.clone(); + // root.push(PathBuf::from(key.clone())); + // root + // }; - // extract from disk, if it exists - let file = File::open(&new_key_path).await; + // // extract from disk, if it exists + // let file = File::open(&new_key_path).await; - let mut buffer = metadata - .content_length - .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize)); + // let mut buffer = metadata + // .content_length + // .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize)); - match file { - Ok(mut file) => { - match file.read_to_end(&mut buffer).await { - Ok(_) => { - // We don't particularly care if we fail to delete from disk since - // if it's an error it means it's already been dropped. - tokio::spawn(remove_file(new_key_path)); - } - Err(e) => { - warn!("Failed to read from {:?}: {}", new_key_path, e); - } - } - } - Err(e) => { - warn!("Failed to open {:?}: {}", new_key_path, e); - return None; - } - } + // match file { + // Ok(mut file) => { + // match file.read_to_end(&mut buffer).await { + // Ok(_) => { + // // We don't particularly care if we fail to delete from disk since + // // if it's an error it means it's already been dropped. + // tokio::spawn(remove_file(new_key_path)); + // } + // Err(e) => { + // warn!("Failed to read from {:?}: {}", new_key_path, e); + // } + // } + // } + // Err(e) => { + // warn!("Failed to open {:?}: {}", new_key_path, e); + // return None; + // } + // } - buffer.shrink_to_fit(); + // buffer.shrink_to_fit(); - todo!(); - self.disk_cur_size -= buffer.len() as u64; - // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into()); + // todo!(); + // self.disk_cur_size -= buffer.len() as u64; + // // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into()); - // return Some(self.put(key.clone(), Box::new(image), metadata).await); - } + // // return Some(self.put(key.clone(), Box::new(image), metadata).await); + // } None } async fn put( &self, - key: CacheKey, + key: Arc, mut image: BoxedImageStream, metadata: ImageMetadata, ) -> Result { @@ -227,12 +227,14 @@ impl Cache for GenerationalCache { } } - self.in_memory.put(key.clone(), (image, metadata)); + todo!(); + // self.in_memory.put(key.clone(), (image, metadata)); self.memory_cur_size += new_img_size; } else { // Image was larger than memory capacity, push directly into cold // storage. - self.push_into_cold(key.clone(), image, metadata).await; + todo!(); + // self.push_into_cold(key.clone(), image, metadata).await; }; // Push evicted hot entires into cold storage. diff --git a/src/cache/low_mem.rs b/src/cache/low_mem.rs index 2b1c127..c84414a 100644 --- a/src/cache/low_mem.rs +++ b/src/cache/low_mem.rs @@ -20,8 +20,8 @@ pub struct LowMemCache { } enum DbMessage { - Get(CacheKey), - Put(CacheKey, ImageMetadata), + Get(Arc), + Put(Arc, ImageMetadata), } impl LowMemCache { @@ -87,14 +87,17 @@ impl LowMemCache { impl Cache for LowMemCache { async fn get( &self, - key: &CacheKey, + key: Arc, ) -> Option> { let channel = self.db_update_channel_sender.clone(); - let key_0 = key.clone(); + let key_0 = Arc::clone(&key); tokio::spawn(async move { channel.send(DbMessage::Get(key_0)).await }); - let path = self.disk_path.clone().join(PathBuf::from(key.clone())); + let path = self + .disk_path + .clone() + .join(PathBuf::from(Arc::clone(&key).as_ref())); super::fs::read_file(&path) .await .map(|res| res.map_err(Into::into)) @@ -102,15 +105,15 @@ impl Cache for LowMemCache { async fn put( &self, - key: CacheKey, + key: Arc, image: BoxedImageStream, metadata: ImageMetadata, ) -> Result { let channel = self.db_update_channel_sender.clone(); - let key_0 = key.clone(); + let key_0 = Arc::clone(&key); tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await }); - let path = self.disk_path.clone().join(PathBuf::from(key)); + let path = self.disk_path.clone().join(PathBuf::from(key.as_ref())); super::fs::write_file( &path, diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 4371453..9d3b112 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -1,8 +1,8 @@ -use std::fmt::Display; use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc}; use actix_web::http::HeaderValue; use async_trait::async_trait; @@ -43,6 +43,13 @@ impl From for PathBuf { } } +impl From<&CacheKey> for PathBuf { + #[inline] + fn from(key: &CacheKey) -> Self { + key.to_string().into() + } +} + #[derive(Clone)] pub struct CachedImage(pub Bytes); @@ -147,12 +154,14 @@ pub enum CacheError { #[async_trait] pub trait Cache: Send + Sync { - async fn get(&self, key: &CacheKey) - -> Option>; + async fn get( + &self, + key: Arc, + ) -> Option>; async fn put( &self, - key: CacheKey, + key: Arc, image: BoxedImageStream, metadata: ImageMetadata, ) -> Result; diff --git a/src/main.rs b/src/main.rs index 5698671..c8d245a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,6 @@ use simple_logger::SimpleLogger; use state::{RwLockServerState, ServerState}; use stop::send_stop; use thiserror::Error; -use tokio::sync::RwLock as TokioRwLock; mod cache; mod config; diff --git a/src/routes.rs b/src/routes.rs index bc6283d..a2ebcfe 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::Ordering; +use std::sync::{atomic::Ordering, Arc}; use actix_web::http::header::{ ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH, @@ -15,7 +15,6 @@ use log::{debug, error, info, warn}; use serde::Deserialize; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use thiserror::Error; -use tokio::sync::RwLock; use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError}; use crate::client_api_version; @@ -192,9 +191,9 @@ async fn fetch_image( file_name: String, is_data_saver: bool, ) -> ServerResponse { - let key = CacheKey(chapter_hash, file_name, is_data_saver); + let key = Arc::new(CacheKey(chapter_hash, file_name, is_data_saver)); - match cache.get(&key).await { + match cache.get(Arc::clone(&key)).await { Some(Ok((image, metadata))) => { return construct_response(image, &metadata); }