Compare commits

..

No commits in common. "4f55380d23eab981e6e7234860b0fb0eb0ffd467" and "8772df22317be33d97a443ee5d83053e5256118b" have entirely different histories.

6 changed files with 132 additions and 399 deletions

40
Cargo.lock generated
View file

@ -456,9 +456,9 @@ dependencies = [
[[package]] [[package]]
name = "const_fn" name = "const_fn"
version = "0.4.8" version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7" checksum = "402da840495de3f976eaefc3485b7f5eb5b0bf9761f9a47be27fe975b3b8c2ec"
[[package]] [[package]]
name = "convert_case" name = "convert_case"
@ -479,9 +479,9 @@ dependencies = [
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.1.4" version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed00c67cb5d0a7d64a44f6ad2668db7e7530311dd53ea79bcd4fb022c64911c8" checksum = "dec1028182c380cc45a2e2c5ec841134f2dfd0f8f5f0a5bcd68004f81b5efdf4"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -547,9 +547,9 @@ dependencies = [
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "0.99.14" version = "0.99.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc7b9cef1e351660e5443924e4f43ab25fbbed3e9a5f052df3677deb4d6b320" checksum = "f82b1b72f1263f214c0f823371768776c4f5841b942c9883aa8e5ec584fd0ba6"
dependencies = [ dependencies = [
"convert_case", "convert_case",
"proc-macro2", "proc-macro2",
@ -1522,18 +1522,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.126" version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.126" version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1874,9 +1874,9 @@ dependencies = [
[[package]] [[package]]
name = "terminal_size" name = "terminal_size"
version = "0.1.17" version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" checksum = "86ca8ced750734db02076f44132d802af0b33b09942331f4459dde8636fd2406"
dependencies = [ dependencies = [
"libc", "libc",
"winapi", "winapi",
@ -1977,9 +1977,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.6.0" version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37" checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",
@ -1997,9 +1997,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "1.2.0" version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2019,9 +2019,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.6" version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8864d706fdb3cc0843a49647ac892720dac98a6eeb818b77190592cf4994066" checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
@ -2031,9 +2031,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.6.7" version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",

11
src/cache/disk.rs vendored
View file

@ -32,7 +32,8 @@ impl DiskCache {
/// Constructs a new low memory cache at the provided path and capaci ty. /// Constructs a new low memory cache at the provided path and capaci ty.
/// This internally spawns a task that will wait for filesystem /// This internally spawns a task that will wait for filesystem
/// notifications when a file has been written. /// notifications when a file has been written.
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Self> { #[allow(clippy::new_ret_no_self)]
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Box<dyn Cache>> {
let (db_tx, db_rx) = channel(128); let (db_tx, db_rx) = channel(128);
let db_pool = { let db_pool = {
let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy()); let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy());
@ -51,11 +52,11 @@ impl DiskCache {
db db
}; };
let new_self = Arc::new(Self { let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self {
disk_path, disk_path,
disk_cur_size: AtomicU64::new(0), disk_cur_size: AtomicU64::new(0),
db_update_channel_sender: db_tx, db_update_channel_sender: db_tx,
}); }));
tokio::spawn(db_listener( tokio::spawn(db_listener(
Arc::clone(&new_self), Arc::clone(&new_self),
@ -71,7 +72,7 @@ impl DiskCache {
/// Spawn a new task that will listen for updates to the db, pruning if the size /// Spawn a new task that will listen for updates to the db, pruning if the size
/// becomes too large. /// becomes too large.
async fn db_listener( async fn db_listener(
cache: Arc<DiskCache>, cache: Arc<Box<dyn Cache>>,
db_rx: Receiver<DbMessage>, db_rx: Receiver<DbMessage>,
db_pool: SqlitePool, db_pool: SqlitePool,
max_on_disk_size: u64, max_on_disk_size: u64,
@ -201,7 +202,7 @@ impl Cache for DiskCache {
let path_0 = Arc::clone(&path); let path_0 = Arc::clone(&path);
let db_callback = |size: u32| async move { let db_callback = |size: u32| async move {
std::mem::drop(channel.send(DbMessage::Put(path_0, size)).await); let _ = channel.send(DbMessage::Put(path_0, size)).await;
}; };
super::fs::write_file(&path, key, image, metadata, db_callback, None) super::fs::write_file(&path, key, image, metadata, db_callback, None)

239
src/cache/fs.rs vendored
View file

@ -14,6 +14,13 @@
//! upstream no longer needs to process duplicate requests and sequential cache //! upstream no longer needs to process duplicate requests and sequential cache
//! misses are treated as closer as a cache hit. //! misses are treated as closer as a cache hit.
use actix_web::error::PayloadError;
use bytes::{Buf, Bytes, BytesMut};
use futures::{Future, Stream, StreamExt};
use log::debug;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::secretstream::Header;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::fmt::Display; use std::fmt::Display;
@ -22,30 +29,15 @@ use std::num::NonZeroU32;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_web::error::PayloadError;
use bytes::{Buf, Bytes, BytesMut};
use futures::{Future, Stream, StreamExt};
use log::{debug, warn};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::secretstream::{
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
};
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{ use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf};
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
ReadBuf,
};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::watch::{channel, Receiver}; use tokio::sync::watch::{channel, Receiver};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
use super::{ use super::{BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream};
BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream, ENCRYPTION_KEY,
};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub enum OnDiskMetadata { pub enum OnDiskMetadata {
@ -79,118 +71,28 @@ pub(super) async fn read_file(
path: &Path, path: &Path,
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> { ) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
let std_file = std::fs::File::open(path).ok()?; let std_file = std::fs::File::open(path).ok()?;
let mut file = File::from_std(std_file.try_clone().ok()?); let file = File::from_std(std_file.try_clone().ok()?);
let mut reader = {
// If the encryption key was set, use the encrypted disk reader instead;
// else, just directly read from file.
let inner_reader: Pin<Box<dyn AsyncRead + Send>> = if let Some(key) = ENCRYPTION_KEY.get() {
let mut header_bytes = [0; HEADERBYTES];
if file.read_exact(&mut header_bytes).await.is_err() {
warn!("Found file, but encrypted header was not found. Assuming corrupted!");
return None;
}
Box::pin(EncryptedDiskReader::new(
file,
SecretStream::init_pull(
&Header::from_slice(&header_bytes).expect("failed to get header"),
key,
)
.expect("Failed to initialize decryption kesy"),
))
} else {
Box::pin(file)
};
BufReader::new(inner_reader)
};
let metadata = { let metadata = {
let mut read = String::new(); let mut de = serde_json::Deserializer::from_reader(std_file);
reader ImageMetadata::deserialize(&mut de).ok()?
.read_line(&mut read)
.await
.expect("failed to read metadata");
serde_json::from_str(&read).ok()?
}; };
let reader = Box::pin(reader);
// False positive lint, `file` is used in both cases, which means that it's // False positive lint, `file` is used in both cases, which means that it's
// not possible to move this into a map_or_else without cloning `file`. // not possible to move this into a map_or_else without cloning `file`.
#[allow(clippy::option_if_let_else)] #[allow(clippy::option_if_let_else)]
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) { let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
InnerStream::Concurrent(ConcurrentFsStream::from_reader( InnerStream::Concurrent(ConcurrentFsStream::from_file(
reader, file,
WatchStream::new(status), WatchStream::new(status),
)) ))
} else { } else {
InnerStream::Completed(FramedRead::new(reader, BytesCodec::new())) InnerStream::Completed(FramedRead::new(BufReader::new(file), BytesCodec::new()))
}; };
Some(Ok((stream, None, metadata))) Some(Ok((stream, None, metadata)))
} }
struct EncryptedDiskReader {
file: Pin<Box<File>>,
stream: SecretStream<Pull>,
buf: Vec<u8>,
}
impl EncryptedDiskReader {
fn new(file: File, stream: SecretStream<Pull>) -> Self {
Self {
file: Box::pin(file),
stream,
buf: vec![],
}
}
}
impl AsyncRead for EncryptedDiskReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let cursor_start = buf.filled().len();
let res = self.as_mut().file.as_mut().poll_read(cx, buf);
if res.is_pending() {
return Poll::Pending;
}
let cursor_new = buf.filled().len();
// pull_to_vec internally calls vec.clear() and vec.reserve(). Generally
// speaking we should be reading about the same amount of data each time
// so we shouldn't experience too much of a slow down w.r.t resizing the
// buffer...
let new_self = Pin::into_inner(self);
new_self
.stream
.pull_to_vec(
&buf.filled()[cursor_start..cursor_new],
None,
&mut new_self.buf,
)
.unwrap();
// data is strictly smaller than the encrypted stream, since you need to
// encode tags as well, so this is always safe.
// rewrite encrypted data into decrypted data
let buffer = buf.filled_mut();
for (old, new) in buffer[cursor_start..].iter_mut().zip(&new_self.buf) {
*old = *new;
}
buf.set_filled(cursor_start + new_self.buf.len());
res
}
}
/// Writes the metadata and input stream (in that order) to a file, returning a /// Writes the metadata and input stream (in that order) to a file, returning a
/// stream that reads from that file. Accepts a db callback function that is /// stream that reads from that file. Accepts a db callback function that is
/// provided the number of bytes written, and an optional on-complete callback /// provided the number of bytes written, and an optional on-complete callback
@ -209,7 +111,7 @@ where
{ {
let (tx, rx) = channel(WritingStatus::NotDone); let (tx, rx) = channel(WritingStatus::NotDone);
let file = { let mut file = {
let mut write_lock = WRITING_STATUS.write().await; let mut write_lock = WRITING_STATUS.write().await;
let parent = path.parent().expect("The path to have a parent"); let parent = path.parent().expect("The path to have a parent");
create_dir_all(parent).await?; create_dir_all(parent).await?;
@ -220,17 +122,6 @@ where
let metadata_string = serde_json::to_string(&metadata).expect("serialization to work"); let metadata_string = serde_json::to_string(&metadata).expect("serialization to work");
let metadata_size = metadata_string.len(); let metadata_size = metadata_string.len();
let (mut writer, maybe_header): (Pin<Box<dyn AsyncWrite + Send>>, _) =
if let Some((enc, header)) = ENCRYPTION_KEY
.get()
.map(|key| SecretStream::init_push(key).expect("Failed to init enc stream"))
{
(Box::pin(EncryptedDiskWriter::new(file, enc)), Some(header))
} else {
(Box::pin(file), None)
};
// need owned variant because async lifetime // need owned variant because async lifetime
let path_buf = path.to_path_buf(); let path_buf = path.to_path_buf();
tokio::spawn(async move { tokio::spawn(async move {
@ -239,8 +130,7 @@ where
let mut bytes_written: u32 = 0; let mut bytes_written: u32 = 0;
let mut acc_bytes = BytesMut::new(); let mut acc_bytes = BytesMut::new();
let accumulate = on_complete.is_some(); let accumulate = on_complete.is_some();
writer.write_all(metadata_string.as_bytes()).await?; file.write_all(metadata_string.as_bytes()).await?;
writer.write_all(b"\n").await?;
while let Some(bytes) = byte_stream.next().await { while let Some(bytes) = byte_stream.next().await {
if let Ok(mut bytes) = bytes { if let Ok(mut bytes) = bytes {
@ -249,7 +139,7 @@ where
} }
loop { loop {
match writer.write(&bytes).await? { match file.write(&bytes).await? {
0 => break, 0 => break,
n => { n => {
bytes.advance(n); bytes.advance(n);
@ -272,8 +162,8 @@ where
// We don't care about the result of the call. // We don't care about the result of the call.
std::mem::drop(remove_file(&path_buf).await); std::mem::drop(remove_file(&path_buf).await);
} else { } else {
writer.flush().await?; file.flush().await?;
// writer.sync_all().await?; // we need metadata file.sync_all().await?; // we need metadata
debug!("writing to file done"); debug!("writing to file done");
} }
@ -314,81 +204,13 @@ where
InnerStream::Concurrent( InnerStream::Concurrent(
ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?, ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?,
), ),
maybe_header, None,
)) ))
} }
struct EncryptedDiskWriter {
file: Pin<Box<File>>,
stream: Option<SecretStream<Push>>,
encryption_buffer: Vec<u8>,
write_buffer: Vec<u8>,
}
impl EncryptedDiskWriter {
fn new(file: File, stream: SecretStream<Push>) -> Self {
Self {
file: Box::pin(file),
stream: Some(stream),
encryption_buffer: vec![],
write_buffer: vec![],
}
}
}
impl AsyncWrite for EncryptedDiskWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let new_self = Pin::into_inner(self);
{
let encryption_buffer = &mut new_self.encryption_buffer;
if let Some(stream) = new_self.stream.as_mut() {
stream
.push_to_vec(buf, None, Tag::Message, encryption_buffer)
.expect("Failed to write encrypted data to buffer");
}
}
new_self.write_buffer.extend(&new_self.encryption_buffer);
match new_self
.file
.as_mut()
.poll_write(cx, &new_self.write_buffer)
{
Poll::Ready(Ok(n)) => {
new_self.write_buffer.drain(..n);
Poll::Ready(Ok(n))
}
poll => poll,
}
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.file.as_mut().poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.as_mut()
.stream
.take()
.map(|stream| stream.finalize(None));
self.file.as_mut().poll_shutdown(cx)
}
}
pub struct ConcurrentFsStream { pub struct ConcurrentFsStream {
/// The File to read from /// The File to read from
reader: Pin<Box<dyn AsyncRead + Send>>, file: Pin<Box<BufReader<File>>>,
/// The channel to get updates from. The writer must send its status, else /// The channel to get updates from. The writer must send its status, else
/// this reader will never complete. /// this reader will never complete.
receiver: Pin<Box<WatchStream<WritingStatus>>>, receiver: Pin<Box<WatchStream<WritingStatus>>>,
@ -407,15 +229,12 @@ impl ConcurrentFsStream {
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
let mut file = File::open(path).await?; let mut file = File::open(path).await?;
file.seek(SeekFrom::Start(seek as u64)).await?; file.seek(SeekFrom::Start(seek as u64)).await?;
Ok(Self::from_reader(Box::pin(file), receiver)) Ok(Self::from_file(file, receiver))
} }
fn from_reader( fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
reader: Pin<Box<dyn AsyncRead + Send>>,
receiver: WatchStream<WritingStatus>,
) -> Self {
Self { Self {
reader: Box::pin(reader), file: Box::pin(BufReader::new(file)),
receiver: Box::pin(receiver), receiver: Box::pin(receiver),
bytes_read: 0, bytes_read: 0,
bytes_total: None, bytes_total: None,
@ -444,7 +263,7 @@ impl Stream for ConcurrentFsStream {
// TODO: Might be more efficient to have a larger buffer // TODO: Might be more efficient to have a larger buffer
let mut bytes = [0; 4 * 1024].to_vec(); let mut bytes = [0; 4 * 1024].to_vec();
let mut buffer = ReadBuf::new(&mut bytes); let mut buffer = ReadBuf::new(&mut bytes);
match self.reader.as_mut().poll_read(cx, &mut buffer) { match self.file.as_mut().poll_read(cx, &mut buffer) {
Poll::Ready(Ok(_)) => (), Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(_)) => return Poll::Ready(Some(Err(UpstreamError))), Poll::Ready(Err(_)) => return Poll::Ready(Some(Err(UpstreamError))),
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
@ -508,9 +327,3 @@ enum WritingStatus {
Done(u32), Done(u32),
Error, Error,
} }
#[cfg(test)]
mod storage {
#[test]
fn wut() {}
}

132
src/cache/mem.rs vendored
View file

@ -1,9 +1,11 @@
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use crate::cache::DiskCache;
use super::{ use super::{
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata, InnerStream, BoxedImageStream, Cache, CacheKey, CacheStream, ImageMetadata, InnerStream, MemStream,
MemStream,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@ -28,81 +30,37 @@ pub trait InternalMemoryCache: Sync + Send {
fn pop(&mut self) -> Option<(CacheKey, CacheValue)>; fn pop(&mut self) -> Option<(CacheKey, CacheValue)>;
} }
impl InternalMemoryCache for Lfu {
#[inline]
fn unbounded() -> Self {
Self::unbounded()
}
#[inline]
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
self.get(key)
}
#[inline]
fn push(&mut self, key: CacheKey, data: CacheValue) {
self.insert(key, data);
}
#[inline]
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
self.pop_lfu_key_value()
}
}
impl InternalMemoryCache for Lru {
#[inline]
fn unbounded() -> Self {
Self::unbounded()
}
#[inline]
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
self.get(key)
}
#[inline]
fn push(&mut self, key: CacheKey, data: CacheValue) {
self.put(key, data);
}
#[inline]
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
self.pop_lru()
}
}
/// Memory accelerated disk cache. Uses the internal cache implementation in /// Memory accelerated disk cache. Uses the internal cache implementation in
/// memory to speed up reads. /// memory to speed up reads.
pub struct MemoryCache<InternalCacheImpl, InnerCache> { pub struct MemoryCache<InternalCacheImpl> {
inner: InnerCache, inner: Arc<Box<dyn Cache>>,
cur_mem_size: AtomicU64, cur_mem_size: AtomicU64,
mem_cache: Mutex<InternalCacheImpl>, mem_cache: Mutex<InternalCacheImpl>,
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>, master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
} }
impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cache> impl<InternalCacheImpl: 'static + InternalMemoryCache> MemoryCache<InternalCacheImpl> {
MemoryCache<InternalCacheImpl, InnerCache> #[allow(clippy::new_ret_no_self)]
{ pub async fn new(
pub async fn new(inner: InnerCache, max_mem_size: u64) -> Arc<Self> { disk_max_size: u64,
disk_path: PathBuf,
max_mem_size: u64,
) -> Arc<Box<dyn Cache>> {
let (tx, mut rx) = channel(100); let (tx, mut rx) = channel(100);
let new_self = Arc::new(Self { let new_self = Arc::new(Box::new(Self {
inner, inner: DiskCache::new(disk_max_size, disk_path).await,
cur_mem_size: AtomicU64::new(0), cur_mem_size: AtomicU64::new(0),
mem_cache: Mutex::new(InternalCacheImpl::unbounded()), mem_cache: Mutex::new(InternalCacheImpl::unbounded()),
master_sender: tx, master_sender: tx,
}); }) as Box<dyn Cache>);
let new_self_0 = Arc::clone(&new_self); let new_self_0 = Arc::clone(&new_self);
tokio::spawn(async move { tokio::spawn(async move {
let new_self = new_self_0; let new_self = new_self_0;
let max_mem_size = max_mem_size / 20 * 19; let max_mem_size = max_mem_size / 20 * 19;
while let Some((key, bytes, metadata, size)) = rx.recv().await { while let Some((key, bytes, metadata, size)) = rx.recv().await {
new_self.inner.increase_usage(size as u32); new_self.increase_usage(size as u32);
new_self new_self.put_internal(key, bytes, metadata, size).await;
.inner
.put_internal(key, bytes, metadata, size)
.await;
while new_self.mem_size() >= max_mem_size { while new_self.mem_size() >= max_mem_size {
if let Some((_, _, _, size)) = new_self.pop_memory().await { if let Some((_, _, _, size)) = new_self.pop_memory().await {
new_self.decrease_usage(size as u64); new_self.decrease_usage(size as u64);
@ -118,9 +76,7 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cac
} }
#[async_trait] #[async_trait]
impl<InternalCacheImpl: InternalMemoryCache, InnerCache: Cache> Cache impl<InternalCacheImpl: InternalMemoryCache> Cache for MemoryCache<InternalCacheImpl> {
for MemoryCache<InternalCacheImpl, InnerCache>
{
#[inline] #[inline]
async fn get( async fn get(
&self, &self,
@ -130,11 +86,7 @@ impl<InternalCacheImpl: InternalMemoryCache, InnerCache: Cache> Cache
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| { Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata)) Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata))
}) { }) {
Some(v) => Some(v.and_then(|(inner, metadata)| { Some(v) => Some(v.map(|(inner, metadata)| todo!())),
CacheStream::new(inner, None)
.map(|v| (v, metadata))
.map_err(|_| CacheError::DecryptionFailure)
})),
None => self.inner.get(key).await, None => self.inner.get(key).await,
}, },
None => self.inner.get(key).await, None => self.inner.get(key).await,
@ -210,3 +162,47 @@ impl<InternalCacheImpl: InternalMemoryCache, InnerCache: Cache> Cache
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)) .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size))
} }
} }
impl InternalMemoryCache for Lfu {
#[inline]
fn unbounded() -> Self {
Self::unbounded()
}
#[inline]
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
self.get(key)
}
#[inline]
fn push(&mut self, key: CacheKey, data: CacheValue) {
self.insert(key, data);
}
#[inline]
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
self.pop_lfu_key_value()
}
}
impl InternalMemoryCache for Lru {
#[inline]
fn unbounded() -> Self {
Self::unbounded()
}
#[inline]
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
self.get(key)
}
#[inline]
fn push(&mut self, key: CacheKey, data: CacheValue) {
self.put(key, data);
}
#[inline]
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
self.pop_lru()
}
}

83
src/cache/mod.rs vendored
View file

@ -10,12 +10,13 @@ use bytes::{Bytes, BytesMut};
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use fs::ConcurrentFsStream; use fs::ConcurrentFsStream;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use once_cell::sync::OnceCell; use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr}; use serde_repr::{Deserialize_repr, Serialize_repr};
use sodiumoxide::crypto::secretstream::{Header, Key, Pull, Stream as SecretStream}; use sodiumoxide::crypto::secretstream::{gen_key, Header, Key, Pull, Stream as SecretStream};
use thiserror::Error; use thiserror::Error;
use tokio::io::AsyncRead; use tokio::fs::File;
use tokio::io::BufReader;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
@ -23,7 +24,7 @@ pub use disk::DiskCache;
pub use fs::UpstreamError; pub use fs::UpstreamError;
pub use mem::MemoryCache; pub use mem::MemoryCache;
pub static ENCRYPTION_KEY: OnceCell<Key> = OnceCell::new(); static ENCRYPTION_KEY: Lazy<Key> = Lazy::new(gen_key);
mod disk; mod disk;
mod fs; mod fs;
@ -205,76 +206,6 @@ pub trait Cache: Send + Sync {
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>; async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>;
} }
#[async_trait]
impl<T: Cache> Cache for std::sync::Arc<T> {
#[inline]
async fn get(
&self,
key: &CacheKey,
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
self.as_ref().get(key).await
}
#[inline]
async fn put(
&self,
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
) -> Result<CacheStream, CacheError> {
self.as_ref().put(key, image, metadata).await
}
#[inline]
fn increase_usage(&self, amt: u32) {
self.as_ref().increase_usage(amt)
}
#[inline]
fn decrease_usage(&self, amt: u64) {
self.as_ref().decrease_usage(amt)
}
#[inline]
fn on_disk_size(&self) -> u64 {
self.as_ref().on_disk_size()
}
#[inline]
fn mem_size(&self) -> u64 {
self.as_ref().mem_size()
}
#[inline]
async fn put_with_on_completed_callback(
&self,
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
) -> Result<CacheStream, CacheError> {
self.as_ref()
.put_with_on_completed_callback(key, image, metadata, on_complete)
.await
}
#[inline]
async fn put_internal(
&self,
key: CacheKey,
image: Bytes,
metadata: ImageMetadata,
size: usize,
) {
self.as_ref().put_internal(key, image, metadata, size).await
}
#[inline]
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> {
self.as_ref().pop_memory().await
}
}
pub struct CacheStream { pub struct CacheStream {
inner: InnerStream, inner: InnerStream,
decrypt: Option<SecretStream<Pull>>, decrypt: Option<SecretStream<Pull>>,
@ -285,7 +216,7 @@ impl CacheStream {
Ok(Self { Ok(Self {
inner, inner,
decrypt: header decrypt: header
.map(|header| SecretStream::init_pull(&header, ENCRYPTION_KEY.get().unwrap())) .map(|header| SecretStream::init_pull(&header, &ENCRYPTION_KEY))
.transpose()?, .transpose()?,
}) })
} }
@ -317,7 +248,7 @@ impl Stream for CacheStream {
pub(self) enum InnerStream { pub(self) enum InnerStream {
Concurrent(ConcurrentFsStream), Concurrent(ConcurrentFsStream),
Memory(MemStream), Memory(MemStream),
Completed(FramedRead<Pin<Box<dyn AsyncRead + Send>>, BytesCodec>), Completed(FramedRead<BufReader<File>, BytesCodec>),
} }
impl From<CachedImage> for InnerStream { impl From<CachedImage> for InnerStream {

View file

@ -18,17 +18,15 @@ use actix_web::{App, HttpServer};
use cache::{Cache, DiskCache}; use cache::{Cache, DiskCache};
use clap::Clap; use clap::Clap;
use config::CliArgs; use config::CliArgs;
use log::{debug, error, info, warn, LevelFilter}; use log::{debug, error, warn, LevelFilter};
use parking_lot::RwLock; use parking_lot::RwLock;
use rustls::{NoClientAuth, ServerConfig}; use rustls::{NoClientAuth, ServerConfig};
use simple_logger::SimpleLogger; use simple_logger::SimpleLogger;
use sodiumoxide::crypto::secretstream::gen_key;
use state::{RwLockServerState, ServerState}; use state::{RwLockServerState, ServerState};
use stop::send_stop; use stop::send_stop;
use thiserror::Error; use thiserror::Error;
use crate::cache::mem::{Lfu, Lru}; use crate::cache::{mem, MemoryCache};
use crate::cache::{MemoryCache, ENCRYPTION_KEY};
use crate::config::UnstableOptions; use crate::config::UnstableOptions;
use crate::state::DynamicServerCert; use crate::state::DynamicServerCert;
@ -78,7 +76,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
(0, 0) => LevelFilter::Info, (0, 0) => LevelFilter::Info,
(_, 1) => LevelFilter::Debug, (_, 1) => LevelFilter::Debug,
(_, n) if n > 1 => LevelFilter::Trace, (_, n) if n > 1 => LevelFilter::Trace,
// compiler can't figure it out
_ => unsafe { unreachable_unchecked() }, _ => unsafe { unreachable_unchecked() },
}; };
@ -97,11 +94,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
}; };
let client_secret_1 = client_secret.clone(); let client_secret_1 = client_secret.clone();
if cli_args.ephemeral_disk_encryption {
info!("Running with at-rest encryption!");
ENCRYPTION_KEY.set(gen_key()).unwrap();
}
let server = ServerState::init(&client_secret, &cli_args).await?; let server = ServerState::init(&client_secret, &cli_args).await?;
let data_0 = Arc::new(RwLockServerState(RwLock::new(server))); let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
let data_1 = Arc::clone(&data_0); let data_1 = Arc::clone(&data_0);
@ -127,7 +119,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if running_2.load(Ordering::SeqCst) { if running_2.load(Ordering::SeqCst) {
send_stop(&client_secret).await; send_stop(&client_secret).await;
} else { } else {
warn!("Got second Ctrl-C, forcefully exiting"); warn!("Got second ctrl-c, forcefully exiting");
system.stop() system.stop()
} }
}); });
@ -146,13 +138,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
}); });
let cache = DiskCache::new(disk_quota, cache_path.clone()).await; let cache: Arc<Box<dyn Cache>> = if low_mem_mode {
let cache: Arc<dyn Cache> = if low_mem_mode { DiskCache::new(disk_quota, cache_path.clone()).await
cache
} else if use_lfu { } else if use_lfu {
MemoryCache::<Lfu, _>::new(cache, memory_max_size).await MemoryCache::<mem::Lfu>::new(disk_quota, cache_path.clone(), memory_max_size).await
} else { } else {
MemoryCache::<Lru, _>::new(cache, memory_max_size).await MemoryCache::<mem::Lru>::new(disk_quota, cache_path.clone(), memory_max_size).await
}; };
let cache_0 = Arc::clone(&cache); let cache_0 = Arc::clone(&cache);
@ -168,6 +159,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}) })
.shutdown_timeout(60) .shutdown_timeout(60)
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)? .bind_rustls(format!("0.0.0.0:{}", port), tls_config)?
// .bind(format!("0.0.0.0:{}", port))?
.run() .run()
.await?; .await?;