Run clippy

This commit is contained in:
Edward Shen 2021-04-14 22:52:54 -04:00
parent c75b5063af
commit 8949e41bee
Signed by: edward
GPG key ID: 19182661E818369F
7 changed files with 88 additions and 54 deletions

20
src/cache/fs.rs vendored
View file

@ -28,7 +28,7 @@ use tokio::time::Sleep;
/// up to Client A's request, then Client B could receive a broken image, as it /// up to Client A's request, then Client B could receive a broken image, as it
/// thinks it's done reading the file. /// thinks it's done reading the file.
/// ///
/// We effectively use WRITING_STATUS as a status relay to ensure concurrent /// We effectively use `WRITING_STATUS` as a status relay to ensure concurrent
/// reads to the file while it's being written to will wait for writing to be /// reads to the file while it's being written to will wait for writing to be
/// completed. /// completed.
static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Arc<CacheStatus>>>> = static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Arc<CacheStatus>>>> =
@ -40,8 +40,7 @@ pub async fn read_file(path: &Path) -> Option<Result<FromFsStream, std::io::Erro
let status = WRITING_STATUS let status = WRITING_STATUS
.read() .read()
.get(path) .get(path)
.map(Arc::clone) .map_or_else(|| Arc::new(CacheStatus::done()), Arc::clone);
.unwrap_or_else(|| Arc::new(CacheStatus::done()));
Some(FromFsStream::new(path, status).await) Some(FromFsStream::new(path, status).await)
} else { } else {
@ -69,18 +68,17 @@ pub async fn transparent_file_stream(
let path_buf = path.to_path_buf(); let path_buf = path.to_path_buf();
tokio::spawn(async move { tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async let path_buf = path_buf; // moves path buf into async
let mut was_errored = false; let mut errored = false;
while let Some(bytes) = byte_stream.next().await { while let Some(bytes) = byte_stream.next().await {
match bytes { if let Ok(bytes) = bytes {
Ok(bytes) => file.write_all(&bytes).await?, file.write_all(&bytes).await?
Err(_) => { } else {
was_errored = true; errored = true;
break; break;
} }
} }
}
if was_errored { if errored {
// It's ok if the deleting the file fails, since we truncate on // It's ok if the deleting the file fails, since we truncate on
// create anyways, but it should be best effort // create anyways, but it should be best effort
let _ = remove_file(&path_buf).await; let _ = remove_file(&path_buf).await;
@ -92,7 +90,7 @@ pub async fn transparent_file_stream(
let mut write_lock = WRITING_STATUS.write(); let mut write_lock = WRITING_STATUS.write();
// This needs to be written atomically with the write lock, else // This needs to be written atomically with the write lock, else
// it's possible we have an inconsistent state // it's possible we have an inconsistent state
if was_errored { if errored {
write_flag.store(WritingStatus::Error); write_flag.store(WritingStatus::Error);
} else { } else {
write_flag.store(WritingStatus::Done); write_flag.store(WritingStatus::Done);

View file

@ -11,17 +11,17 @@ use super::{Cache, CacheKey, CachedImage, ImageMetadata};
pub struct GenerationalCache { pub struct GenerationalCache {
in_memory: LruCache<CacheKey, (CachedImage, ImageMetadata)>, in_memory: LruCache<CacheKey, (CachedImage, ImageMetadata)>,
memory_max_size: usize, memory_max_size: u64,
memory_cur_size: usize, memory_cur_size: u64,
on_disk: LruCache<CacheKey, ImageMetadata>, on_disk: LruCache<CacheKey, ImageMetadata>,
disk_path: PathBuf, disk_path: PathBuf,
disk_max_size: usize, disk_max_size: u64,
disk_cur_size: usize, disk_cur_size: u64,
} }
impl GenerationalCache { impl GenerationalCache {
pub fn new(memory_max_size: usize, disk_max_size: usize, disk_path: PathBuf) -> Self { pub fn new(memory_max_size: u64, disk_max_size: u64, disk_path: PathBuf) -> Self {
Self { Self {
in_memory: LruCache::unbounded(), in_memory: LruCache::unbounded(),
memory_max_size, memory_max_size,
@ -38,9 +38,9 @@ impl GenerationalCache {
let new_img_size = image.0.len(); let new_img_size = image.0.len();
let mut to_drop = vec![]; let mut to_drop = vec![];
if self.disk_max_size >= new_img_size { if self.disk_max_size >= new_img_size as u64 {
// Add images to drop from cold cache into a queue // Add images to drop from cold cache into a queue
while new_img_size + self.disk_cur_size > self.disk_max_size { while new_img_size as u64 + self.disk_cur_size > self.disk_max_size {
match self.on_disk.pop_lru() { match self.on_disk.pop_lru() {
Some((key, _)) => { Some((key, _)) => {
let mut path = self.disk_path.clone(); let mut path = self.disk_path.clone();
@ -72,7 +72,7 @@ impl GenerationalCache {
}) })
.unwrap_or_default(); .unwrap_or_default();
self.disk_cur_size -= file_size as usize; self.disk_cur_size -= file_size;
} }
to_drop.push(path); to_drop.push(path);
@ -102,7 +102,7 @@ impl GenerationalCache {
let new_key_path = { let new_key_path = {
let mut root = self.disk_path.clone(); let mut root = self.disk_path.clone();
root.push(PathBuf::from(&key)); root.push(PathBuf::from(key.clone()));
root root
}; };
@ -112,7 +112,7 @@ impl GenerationalCache {
match file.write_all(&image.0).await { match file.write_all(&image.0).await {
Ok(_) => { Ok(_) => {
self.on_disk.put(key, metadata); self.on_disk.put(key, metadata);
self.disk_cur_size += new_img_size; self.disk_cur_size += new_img_size as u64;
debug!( debug!(
"Successfully written data to disk for file {:?}", "Successfully written data to disk for file {:?}",
new_key_path new_key_path
@ -140,18 +140,16 @@ impl Cache for GenerationalCache {
if let Some(metadata) = self.on_disk.pop(key) { if let Some(metadata) = self.on_disk.pop(key) {
let new_key_path = { let new_key_path = {
let mut root = self.disk_path.clone(); let mut root = self.disk_path.clone();
root.push(PathBuf::from(key)); root.push(PathBuf::from(key.clone()));
root root
}; };
// extract from disk, if it exists // extract from disk, if it exists
let file = File::open(new_key_path.clone()).await; let file = File::open(&new_key_path).await;
let mut buffer = if let Some(size) = metadata.content_length { let mut buffer = metadata
Vec::with_capacity(size) .content_length
} else { .map_or_else(Vec::new, Vec::with_capacity);
Vec::new()
};
match file { match file {
Ok(mut file) => { Ok(mut file) => {
@ -159,7 +157,7 @@ impl Cache for GenerationalCache {
Ok(_) => { Ok(_) => {
// We don't particularly care if we fail to delete from disk since // We don't particularly care if we fail to delete from disk since
// if it's an error it means it's already been dropped. // if it's an error it means it's already been dropped.
tokio::spawn(remove_file(new_key_path.clone())); tokio::spawn(remove_file(new_key_path));
} }
Err(e) => { Err(e) => {
warn!("Failed to read from {:?}: {}", new_key_path, e); warn!("Failed to read from {:?}: {}", new_key_path, e);
@ -174,7 +172,7 @@ impl Cache for GenerationalCache {
buffer.shrink_to_fit(); buffer.shrink_to_fit();
self.disk_cur_size -= buffer.len(); self.disk_cur_size -= buffer.len() as u64;
let image = CachedImage(Bytes::from(buffer)); let image = CachedImage(Bytes::from(buffer));
// Since we just put it in the in-memory cache it should be there // Since we just put it in the in-memory cache it should be there
@ -189,14 +187,14 @@ impl Cache for GenerationalCache {
#[inline] #[inline]
async fn put(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) { async fn put(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) {
let mut hot_evicted = vec![]; let mut hot_evicted = vec![];
let new_img_size = image.0.len(); let new_img_size = image.0.len() as u64;
if self.memory_max_size >= new_img_size { if self.memory_max_size >= new_img_size {
// Evict oldest entires to make space for new image. // Evict oldest entires to make space for new image.
while new_img_size + self.memory_cur_size > self.memory_max_size { while new_img_size + self.memory_cur_size > self.memory_max_size {
match self.in_memory.pop_lru() { match self.in_memory.pop_lru() {
Some((key, (image, metadata))) => { Some((key, (image, metadata))) => {
self.memory_cur_size -= image.0.len(); self.memory_cur_size -= image.0.len() as u64;
hot_evicted.push((key, image, metadata)); hot_evicted.push((key, image, metadata));
} }
None => unreachable!(concat!( None => unreachable!(concat!(

View file

@ -31,7 +31,7 @@ impl LowMemCache {
impl Cache for LowMemCache { impl Cache for LowMemCache {
async fn get_stream(&mut self, key: &CacheKey) -> Option<Result<FromFsStream, std::io::Error>> { async fn get_stream(&mut self, key: &CacheKey) -> Option<Result<FromFsStream, std::io::Error>> {
if self.on_disk.get(key).is_some() { if self.on_disk.get(key).is_some() {
super::fs::read_file(&Path::new(&key.to_string())).await super::fs::read_file(Path::new(&key.to_string())).await
} else { } else {
None None
} }

60
src/cache/mod.rs vendored
View file

@ -1,5 +1,5 @@
use std::fmt::Display;
use std::path::PathBuf; use std::path::PathBuf;
use std::{fmt::Display, str::FromStr};
use actix_web::http::HeaderValue; use actix_web::http::HeaderValue;
use async_trait::async_trait; use async_trait::async_trait;
@ -36,19 +36,54 @@ impl From<CacheKey> for PathBuf {
} }
} }
impl From<&CacheKey> for PathBuf { pub struct CachedImage(pub Bytes);
#[derive(Copy, Clone)]
pub struct ImageMetadata {
pub content_type: Option<ImageContentType>,
pub content_length: Option<usize>,
pub last_modified: Option<DateTime<FixedOffset>>,
}
// Note to self: If these are wrong blame Triscuit 9
#[derive(Copy, Clone)]
pub enum ImageContentType {
Png,
Jpeg,
Gif,
Bmp,
Tif,
}
pub struct InvalidContentType;
impl FromStr for ImageContentType {
type Err = InvalidContentType;
#[inline] #[inline]
fn from(key: &CacheKey) -> Self { fn from_str(s: &str) -> Result<Self, Self::Err> {
key.to_string().into() match s {
"image/png" => Ok(Self::Png),
"image/jpeg" => Ok(Self::Jpeg),
"image/gif" => Ok(Self::Gif),
"image/bmp" => Ok(Self::Bmp),
"image/tif" => Ok(Self::Tif),
_ => Err(InvalidContentType),
}
} }
} }
pub struct CachedImage(pub Bytes); impl AsRef<str> for ImageContentType {
#[inline]
pub struct ImageMetadata { fn as_ref(&self) -> &str {
pub content_type: Option<String>, match self {
pub content_length: Option<usize>, Self::Png => "image/png",
pub last_modified: Option<DateTime<FixedOffset>>, Self::Jpeg => "image/jpeg",
Self::Gif => "image/gif",
Self::Bmp => "image/bmp",
Self::Tif => "image/tif",
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -66,7 +101,10 @@ impl ImageMetadata {
) -> Result<Self, ImageRequestError> { ) -> Result<Self, ImageRequestError> {
Ok(Self { Ok(Self {
content_type: content_type content_type: content_type
.map(|v| v.to_str().map(|v| v.to_string())) .map(|v| match v.to_str() {
Ok(v) => ImageContentType::from_str(v),
Err(_) => Err(InvalidContentType),
})
.transpose() .transpose()
.map_err(|_| ImageRequestError::InvalidContentType)?, .map_err(|_| ImageRequestError::InvalidContentType)?,
content_length: content_length content_length: content_length

View file

@ -1,4 +1,4 @@
use std::num::{NonZeroU16, NonZeroUsize}; use std::num::{NonZeroU16, NonZeroU64};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -18,17 +18,17 @@ pub struct CliArgs {
/// How large, in bytes, the in-memory cache should be. Note that this does /// How large, in bytes, the in-memory cache should be. Note that this does
/// not include runtime memory usage. /// not include runtime memory usage.
#[clap(long, env = "MEM_CACHE_QUOTA_BYTES")] #[clap(long, env = "MEM_CACHE_QUOTA_BYTES")]
pub memory_quota: NonZeroUsize, pub memory_quota: NonZeroU64,
/// How large, in bytes, the on-disk cache should be. Note that actual /// How large, in bytes, the on-disk cache should be. Note that actual
/// values may be larger for metadata information. /// values may be larger for metadata information.
#[clap(long, env = "DISK_CACHE_QUOTA_BYTES")] #[clap(long, env = "DISK_CACHE_QUOTA_BYTES")]
pub disk_quota: usize, pub disk_quota: u64,
/// Sets the location of the disk cache. /// Sets the location of the disk cache.
#[clap(long, default_value = "./cache", env = "DISK_CACHE_PATH")] #[clap(long, default_value = "./cache", env = "DISK_CACHE_PATH")]
pub cache_path: PathBuf, pub cache_path: PathBuf,
/// The network speed to advertise to Mangadex@Home control server. /// The network speed to advertise to Mangadex@Home control server.
#[clap(long, env = "MAX_NETWORK_SPEED")] #[clap(long, env = "MAX_NETWORK_SPEED")]
pub network_speed: NonZeroUsize, pub network_speed: NonZeroU64,
/// Whether or not to provide the Server HTTP header to clients. This is /// Whether or not to provide the Server HTTP header to clients. This is
/// useful for debugging, but is generally not recommended for security /// useful for debugging, but is generally not recommended for security
/// reasons. /// reasons.

View file

@ -1,6 +1,6 @@
use std::{io::BufReader, sync::Arc}; use std::{io::BufReader, sync::Arc};
use std::{ use std::{
num::{NonZeroU16, NonZeroUsize}, num::{NonZeroU16, NonZeroU64},
sync::atomic::Ordering, sync::atomic::Ordering,
}; };
@ -27,9 +27,9 @@ pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
pub struct Request<'a> { pub struct Request<'a> {
secret: &'a str, secret: &'a str,
port: NonZeroU16, port: NonZeroU16,
disk_space: usize, disk_space: u64,
network_speed: NonZeroUsize, network_speed: NonZeroU64,
build_version: usize, build_version: u64,
tls_created_at: Option<String>, tls_created_at: Option<String>,
} }

View file

@ -273,7 +273,7 @@ fn construct_response(cached: &CachedImage, metadata: &ImageMetadata) -> ServerR
.collect(); .collect();
let mut resp = HttpResponse::Ok(); let mut resp = HttpResponse::Ok();
if let Some(content_type) = &metadata.content_type { if let Some(content_type) = &metadata.content_type {
resp.append_header((CONTENT_TYPE, &**content_type)); resp.append_header((CONTENT_TYPE, content_type.as_ref()));
} }
if let Some(content_length) = &metadata.content_length { if let Some(content_length) = &metadata.content_length {
resp.append_header((CONTENT_LENGTH, content_length.to_string())); resp.append_header((CONTENT_LENGTH, content_length.to_string()));