diff --git a/Cargo.lock b/Cargo.lock index 0519afd..4d852f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -499,16 +499,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "dashmap" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" -dependencies = [ - "cfg-if", - "num_cpus", -] - [[package]] name = "derive_more" version = "0.99.13" @@ -984,7 +974,6 @@ dependencies = [ "chrono", "clap", "ctrlc", - "dashmap", "dotenv", "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index a3f8f79..024f8b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ bytes = "1" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] } ctrlc = "3" -dashmap = "4" dotenv = "0.15" futures = "0.3" once_cell = "1" diff --git a/src/cache/fs.rs b/src/cache/fs.rs index b8b8156..f845950 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -132,7 +132,7 @@ impl ConcurrentFsStream { Ok(Self { file: Box::pin(File::open(path).await?), // 0.5ms - sleep: Box::pin(tokio::time::interval(Duration::from_micros(500))), + sleep: Box::pin(tokio::time::interval(Duration::from_micros(250))), is_file_done_writing: is_done, }) } @@ -172,7 +172,6 @@ impl Stream for ConcurrentFsStream { bytes.truncate(filled); polled_result.map(|_| { if bytes.is_empty() { - dbg!(line!()); None } else { Some(Ok(bytes.into())) diff --git a/src/cache/mod.rs b/src/cache/mod.rs index b224460..8d41945 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -199,7 +199,8 @@ impl Stream for MemStream { type Item = CacheStreamItem; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - let new_bytes = self.0.split_to(1460); + let mut new_bytes = Bytes::new(); + std::mem::swap(&mut self.0, &mut new_bytes); if new_bytes.is_empty() { Poll::Ready(None) } else { diff --git a/src/main.rs b/src/main.rs index 98f94d3..16c95b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,12 +16,13 @@ use cache::{Cache, GenerationalCache, LowMemCache}; use clap::Clap; use config::CliArgs; use log::{debug, error, warn, LevelFilter}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use rustls::{NoClientAuth, ServerConfig}; use simple_logger::SimpleLogger; use state::{RwLockServerState, ServerState}; use stop::send_stop; use thiserror::Error; +use tokio::sync::RwLock as TokioRwLock; mod cache; mod config; @@ -128,7 +129,7 @@ async fn main() -> Result<(), std::io::Error> { cache_path.clone(), )) }; - let cache = Arc::new(Mutex::new(cache)); + let cache = Arc::new(TokioRwLock::new(cache)); let cache1 = Arc::clone(&cache); // Spawn periodic cache trimming @@ -136,7 +137,7 @@ async fn main() -> Result<(), std::io::Error> { let mut interval = time::interval(Duration::from_secs(3 * 60)); loop { interval.tick().await; - cache.lock().prune().await; + cache.write().await.prune().await; } }); diff --git a/src/ping.rs b/src/ping.rs index 61e2c67..4101011 100644 --- a/src/ping.rs +++ b/src/ping.rs @@ -1,19 +1,13 @@ +use std::num::{NonZeroU16, NonZeroU64}; +use std::sync::atomic::Ordering; use std::{io::BufReader, sync::Arc}; -use std::{ - num::{NonZeroU16, NonZeroU64}, - sync::atomic::Ordering, -}; -use log::{error, info, warn}; -use rustls::{ - internal::pemfile::{certs, rsa_private_keys}, - sign::RSASigningKey, -}; -use rustls::{sign::SigningKey, Certificate}; -use serde::{ - de::{MapAccess, Visitor}, - Deserialize, Serialize, -}; +use log::{debug, error, info, warn}; +use rustls::internal::pemfile::{certs, rsa_private_keys}; +use rustls::sign::{RSASigningKey, SigningKey}; +use rustls::Certificate; +use serde::de::{MapAccess, Visitor}; +use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::box_::PrecomputedKey; use url::Url; @@ -152,6 +146,7 @@ pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc match resp.json::().await { Ok(resp) => { + debug!("got write guard for server state"); let mut write_guard = data.0.write(); if !write_guard.url_overridden && write_guard.image_server != resp.image_server { @@ -201,6 +196,8 @@ pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc warn!("Got malformed response: {}", e), }, diff --git a/src/routes.rs b/src/routes.rs index 1eb7225..d2013c4 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -12,10 +12,10 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{Stream, TryStreamExt}; use log::{debug, error, info, warn}; -use parking_lot::Mutex; use serde::Deserialize; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use thiserror::Error; +use tokio::sync::RwLock; use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError}; use crate::client_api_version; @@ -51,7 +51,7 @@ impl Responder for ServerResponse { #[get("/{token}/data/{chapter_hash}/{file_name}")] async fn token_data( state: Data, - cache: Data>>, + cache: Data>>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -67,7 +67,7 @@ async fn token_data( #[get("/{token}/data-saver/{chapter_hash}/{file_name}")] async fn token_data_saver( state: Data, - cache: Data>>, + cache: Data>>, path: Path<(String, String, String)>, ) -> impl Responder { let (token, chapter_hash, file_name) = path.into_inner(); @@ -183,16 +183,20 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder { async fn fetch_image( state: Data, - cache: Data>>, + cache: Data>>, chapter_hash: String, file_name: String, is_data_saver: bool, ) -> ServerResponse { let key = CacheKey(chapter_hash, file_name, is_data_saver); - match cache.lock().get(&key).await { - Some(Ok((image, metadata))) => return construct_response(image, metadata), - Some(Err(_)) => return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish()), + match cache.write().await.get(&key).await { + Some(Ok((image, metadata))) => { + return construct_response(image, metadata); + } + Some(Err(_)) => { + return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish()); + } _ => (), } @@ -256,7 +260,7 @@ async fn fetch_image( let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap(); let (stream, metadata) = { - match cache.lock().put(key, Box::new(body), metadata).await { + match cache.write().await.put(key, Box::new(body), metadata).await { Ok((stream, metadata)) => (stream, *metadata), Err(e) => { warn!("Failed to insert into cache: {}", e);