This commit is contained in:
Edward Shen 2021-04-24 12:46:18 -04:00
parent 9abeec89bc
commit 151d0c5a83
Signed by: edward
GPG key ID: 19182661E818369F
5 changed files with 50 additions and 20 deletions

View file

@ -223,6 +223,7 @@ impl Cache for DiskCache {
(self.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096 (self.disk_cur_size.load(Ordering::Acquire) + 4095) / 4096 * 4096
} }
#[inline]
fn mem_size(&self) -> u64 { fn mem_size(&self) -> u64 {
0 0
} }

44
src/cache/fs.rs vendored
View file

@ -1,3 +1,19 @@
//! This module contains two functions whose sole purpose is to allow a single
//! producer multiple consumer (SPMC) system using the filesystem as an
//! intermediate.
//!
//! Consider the scenario where two clients, A and B, request the same uncached
//! file, one after the other. In a typical caching system, both requests would
//! result in a cache miss, and both requests would then be proxied from
//! upstream. But, we can do better. We know that by the time one request
//! begins, there should be a file on disk for us to read from. Why require
//! subsequent requests to read from upstream, when we can simply fetch one and
//! read from the filesystem that we know will have the exact same data?
//! Instead, we can just read from the filesystem and just inform all readers
//! when the file is done. This is beneficial to both downstream and upstream as
//! upstream no longer needs to process duplicate requests and sequential cache
//! misses are treated as closer as a cache hit.
use actix_web::error::PayloadError; use actix_web::error::PayloadError;
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use futures::{Future, Stream, StreamExt}; use futures::{Future, Stream, StreamExt};
@ -40,7 +56,9 @@ use super::{BoxedImageStream, CacheKey, CacheStream, CacheStreamItem, ImageMetad
static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> = static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
Lazy::new(|| RwLock::new(HashMap::new())); Lazy::new(|| RwLock::new(HashMap::new()));
/// Tries to read from the file, returning a byte stream if it exists /// Attempts to lookup the file on disk, returning a byte stream if it exists.
/// Note that this could return two types of streams, depending on if the file
/// is in progress of being written to.
pub async fn read_file( pub async fn read_file(
path: &Path, path: &Path,
) -> Option<Result<(CacheStream, ImageMetadata), std::io::Error>> { ) -> Option<Result<(CacheStream, ImageMetadata), std::io::Error>> {
@ -52,8 +70,8 @@ pub async fn read_file(
ImageMetadata::deserialize(&mut de).ok()? ImageMetadata::deserialize(&mut de).ok()?
}; };
// False positive, `file` is used in both cases, which means that it's not // False positive lint, `file` is used in both cases, which means that it's
// possible to move this into a map_or_else without cloning `file`. // not possible to move this into a map_or_else without cloning `file`.
#[allow(clippy::option_if_let_else)] #[allow(clippy::option_if_let_else)]
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) { let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
CacheStream::Concurrent(ConcurrentFsStream::from_file( CacheStream::Concurrent(ConcurrentFsStream::from_file(
@ -67,19 +85,22 @@ pub async fn read_file(
Some(Ok((stream, metadata))) Some(Ok((stream, metadata)))
} }
/// Maps the input byte stream into one that writes to disk instead, returning /// Writes the metadata and input stream (in that order) to a file, returning a
/// a stream that reads from disk instead. /// stream that reads from that file. Accepts a db callback function that is
pub async fn write_file< /// provided the number of bytes written, and an optional on-complete callback
Fut: 'static + Send + Sync + Future<Output = ()>, /// that is called with a completed cache entry.
DbCallback: 'static + Send + Sync + FnOnce(u32) -> Fut, pub async fn write_file<Fut, DbCallback>(
>(
path: &Path, path: &Path,
cache_key: CacheKey, cache_key: CacheKey,
mut byte_stream: BoxedImageStream, mut byte_stream: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
db_callback: DbCallback, db_callback: DbCallback,
on_complete: Option<Sender<(CacheKey, Bytes, ImageMetadata, usize)>>, on_complete: Option<Sender<(CacheKey, Bytes, ImageMetadata, usize)>>,
) -> Result<CacheStream, std::io::Error> { ) -> Result<CacheStream, std::io::Error>
where
Fut: 'static + Send + Sync + Future<Output = ()>,
DbCallback: 'static + Send + Sync + FnOnce(u32) -> Fut,
{
let (tx, rx) = channel(WritingStatus::NotDone); let (tx, rx) = channel(WritingStatus::NotDone);
let mut file = { let mut file = {
@ -114,8 +135,8 @@ pub async fn write_file<
0 => break, 0 => break,
n => { n => {
bytes.advance(n); bytes.advance(n);
// We don't care if we don't have receivers
bytes_written += n as u32; bytes_written += n as u32;
// We don't care if we don't have receivers
let _ = tx.send(WritingStatus::NotDone); let _ = tx.send(WritingStatus::NotDone);
} }
} }
@ -151,6 +172,7 @@ pub async fn write_file<
} }
tokio::spawn(db_callback(bytes_written)); tokio::spawn(db_callback(bytes_written));
if let Some(sender) = on_complete { if let Some(sender) = on_complete {
tokio::spawn(async move { tokio::spawn(async move {
sender sender

11
src/cache/mod.rs vendored
View file

@ -166,12 +166,20 @@ pub trait Cache: Send + Sync {
metadata: ImageMetadata, metadata: ImageMetadata,
) -> Result<CacheStream, CacheError>; ) -> Result<CacheStream, CacheError>;
/// Increases the size of the cache. This is a double-dispatch method, so
/// see specific implementations for complete detail. This only accepts a
/// u32 as all files should be smaller than a u32 and some cache
/// implementations can only handle up to a u32.
fn increase_usage(&self, amt: u32); fn increase_usage(&self, amt: u32);
/// Decreases the size of the cache. This is a double-dispatch method, so
/// see specific implementations for complete detail.
fn decrease_usage(&self, amt: u64); fn decrease_usage(&self, amt: u64);
/// Reports the on-disk size of the cache.
fn on_disk_size(&self) -> u64; fn on_disk_size(&self) -> u64;
/// Reports the memory size of the cache.
fn mem_size(&self) -> u64; fn mem_size(&self) -> u64;
async fn put_with_on_completed_callback( async fn put_with_on_completed_callback(
@ -182,8 +190,11 @@ pub trait Cache: Send + Sync {
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
) -> Result<CacheStream, CacheError>; ) -> Result<CacheStream, CacheError>;
/// Double-dispatch method. Used by cache implementations that require a
/// completed entry to put items into their cache.
async fn put_internal(&self, key: CacheKey, image: Bytes, metadata: ImageMetadata, size: usize); async fn put_internal(&self, key: CacheKey, image: Bytes, metadata: ImageMetadata, size: usize);
/// Pops an entry from the memory cache, if one exists.
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>; async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>;
} }

View file

@ -29,11 +29,7 @@ pub struct Request<'a> {
} }
impl<'a> Request<'a> { impl<'a> Request<'a> {
fn from_config_and_state( fn from_config_and_state(secret: &'a str, config: &CliArgs) -> Self {
secret: &'a str,
config: &CliArgs,
state: &Arc<RwLockServerState>,
) -> Self {
Self { Self {
secret, secret,
port: config.port, port: config.port,
@ -147,7 +143,7 @@ impl std::fmt::Debug for Tls {
} }
pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwLockServerState>) { pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwLockServerState>) {
let req = Request::from_config_and_state(secret, cli, data); let req = Request::from_config_and_state(secret, cli);
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let resp = client.post(CONTROL_CENTER_PING_URL).json(&req).send().await; let resp = client.post(CONTROL_CENTER_PING_URL).json(&req).send().await;
match resp { match resp {

View file

@ -102,9 +102,9 @@ impl ServerState {
} }
let tls = resp.tls.unwrap(); let tls = resp.tls.unwrap();
TLS_PREVIOUSLY_CREATED.set(ArcSwap::from_pointee(tls.created_at)); let _ = TLS_PREVIOUSLY_CREATED.set(ArcSwap::from_pointee(tls.created_at));
TLS_SIGNING_KEY.set(ArcSwap::new(tls.priv_key)); let _ = TLS_SIGNING_KEY.set(ArcSwap::new(tls.priv_key));
TLS_CERTS.set(ArcSwap::from_pointee(tls.certs)); let _ = TLS_CERTS.set(ArcSwap::from_pointee(tls.certs));
Ok(Self { Ok(Self {
precomputed_key: key, precomputed_key: key,