From 151d0c5a8392cdce860a49323231f21f2bfbb7d5 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Sat, 24 Apr 2021 12:46:18 -0400 Subject: [PATCH] Add docs --- src/cache/disk_cache.rs | 1 + src/cache/fs.rs | 44 ++++++++++++++++++++++++++++++----------- src/cache/mod.rs | 11 +++++++++++ src/ping.rs | 8 ++------ src/state.rs | 6 +++--- 5 files changed, 50 insertions(+), 20 deletions(-) diff --git a/src/cache/disk_cache.rs b/src/cache/disk_cache.rs index e86aef2..7962bd9 100644 --- a/src/cache/disk_cache.rs +++ b/src/cache/disk_cache.rs @@ -223,6 +223,7 @@ impl Cache for DiskCache { (self.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096 } + #[inline] fn mem_size(&self) -> u64 { 0 } diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 9154c28..854150f 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -1,3 +1,19 @@ +//! This module contains two functions whose sole purpose is to allow a single +//! producer multiple consumer (SPMC) system using the filesystem as an +//! intermediate. +//! +//! Consider the scenario where two clients, A and B, request the same uncached +//! file, one after the other. In a typical caching system, both requests would +//! result in a cache miss, and both requests would then be proxied from +//! upstream. But, we can do better. We know that by the time one request +//! begins, there should be a file on disk for us to read from. Why require +//! subsequent requests to read from upstream, when we can simply fetch one and +//! read from the filesystem that we know will have the exact same data? +//! Instead, we can just read from the filesystem and just inform all readers +//! when the file is done. This is beneficial to both downstream and upstream as +//! upstream no longer needs to process duplicate requests and sequential cache +//! misses are treated as closer as a cache hit. + use actix_web::error::PayloadError; use bytes::{Buf, Bytes, BytesMut}; use futures::{Future, Stream, StreamExt}; @@ -40,7 +56,9 @@ use super::{BoxedImageStream, CacheKey, CacheStream, CacheStreamItem, ImageMetad static WRITING_STATUS: Lazy>>> = Lazy::new(|| RwLock::new(HashMap::new())); -/// Tries to read from the file, returning a byte stream if it exists +/// Attempts to lookup the file on disk, returning a byte stream if it exists. +/// Note that this could return two types of streams, depending on if the file +/// is in progress of being written to. pub async fn read_file( path: &Path, ) -> Option> { @@ -52,8 +70,8 @@ pub async fn read_file( ImageMetadata::deserialize(&mut de).ok()? }; - // False positive, `file` is used in both cases, which means that it's not - // possible to move this into a map_or_else without cloning `file`. + // False positive lint, `file` is used in both cases, which means that it's + // not possible to move this into a map_or_else without cloning `file`. #[allow(clippy::option_if_let_else)] let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) { CacheStream::Concurrent(ConcurrentFsStream::from_file( @@ -67,19 +85,22 @@ pub async fn read_file( Some(Ok((stream, metadata))) } -/// Maps the input byte stream into one that writes to disk instead, returning -/// a stream that reads from disk instead. -pub async fn write_file< - Fut: 'static + Send + Sync + Future, - DbCallback: 'static + Send + Sync + FnOnce(u32) -> Fut, ->( +/// Writes the metadata and input stream (in that order) to a file, returning a +/// stream that reads from that file. Accepts a db callback function that is +/// provided the number of bytes written, and an optional on-complete callback +/// that is called with a completed cache entry. +pub async fn write_file( path: &Path, cache_key: CacheKey, mut byte_stream: BoxedImageStream, metadata: ImageMetadata, db_callback: DbCallback, on_complete: Option>, -) -> Result { +) -> Result +where + Fut: 'static + Send + Sync + Future, + DbCallback: 'static + Send + Sync + FnOnce(u32) -> Fut, +{ let (tx, rx) = channel(WritingStatus::NotDone); let mut file = { @@ -114,8 +135,8 @@ pub async fn write_file< 0 => break, n => { bytes.advance(n); - // We don't care if we don't have receivers bytes_written += n as u32; + // We don't care if we don't have receivers let _ = tx.send(WritingStatus::NotDone); } } @@ -151,6 +172,7 @@ pub async fn write_file< } tokio::spawn(db_callback(bytes_written)); + if let Some(sender) = on_complete { tokio::spawn(async move { sender diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 43006c5..2fd9989 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -166,12 +166,20 @@ pub trait Cache: Send + Sync { metadata: ImageMetadata, ) -> Result; + /// Increases the size of the cache. This is a double-dispatch method, so + /// see specific implementations for complete detail. This only accepts a + /// u32 as all files should be smaller than a u32 and some cache + /// implementations can only handle up to a u32. fn increase_usage(&self, amt: u32); + /// Decreases the size of the cache. This is a double-dispatch method, so + /// see specific implementations for complete detail. fn decrease_usage(&self, amt: u64); + /// Reports the on-disk size of the cache. fn on_disk_size(&self) -> u64; + /// Reports the memory size of the cache. fn mem_size(&self) -> u64; async fn put_with_on_completed_callback( @@ -182,8 +190,11 @@ pub trait Cache: Send + Sync { on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, ) -> Result; + /// Double-dispatch method. Used by cache implementations that require a + /// completed entry to put items into their cache. async fn put_internal(&self, key: CacheKey, image: Bytes, metadata: ImageMetadata, size: usize); + /// Pops an entry from the memory cache, if one exists. async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>; } diff --git a/src/ping.rs b/src/ping.rs index 476040f..21689a9 100644 --- a/src/ping.rs +++ b/src/ping.rs @@ -29,11 +29,7 @@ pub struct Request<'a> { } impl<'a> Request<'a> { - fn from_config_and_state( - secret: &'a str, - config: &CliArgs, - state: &Arc, - ) -> Self { + fn from_config_and_state(secret: &'a str, config: &CliArgs) -> Self { Self { secret, port: config.port, @@ -147,7 +143,7 @@ impl std::fmt::Debug for Tls { } pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc) { - let req = Request::from_config_and_state(secret, cli, data); + let req = Request::from_config_and_state(secret, cli); let client = reqwest::Client::new(); let resp = client.post(CONTROL_CENTER_PING_URL).json(&req).send().await; match resp { diff --git a/src/state.rs b/src/state.rs index 3b184c2..ba8b8bc 100644 --- a/src/state.rs +++ b/src/state.rs @@ -102,9 +102,9 @@ impl ServerState { } let tls = resp.tls.unwrap(); - TLS_PREVIOUSLY_CREATED.set(ArcSwap::from_pointee(tls.created_at)); - TLS_SIGNING_KEY.set(ArcSwap::new(tls.priv_key)); - TLS_CERTS.set(ArcSwap::from_pointee(tls.certs)); + let _ = TLS_PREVIOUSLY_CREATED.set(ArcSwap::from_pointee(tls.created_at)); + let _ = TLS_SIGNING_KEY.set(ArcSwap::new(tls.priv_key)); + let _ = TLS_CERTS.set(ArcSwap::from_pointee(tls.certs)); Ok(Self { precomputed_key: key,