Compare commits
2 commits
8772df2231
...
4f55380d23
Author | SHA1 | Date | |
---|---|---|---|
4f55380d23 | |||
b66cccc832 |
6 changed files with 399 additions and 132 deletions
40
Cargo.lock
generated
40
Cargo.lock
generated
|
@ -456,9 +456,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "const_fn"
|
name = "const_fn"
|
||||||
version = "0.4.7"
|
version = "0.4.8"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "402da840495de3f976eaefc3485b7f5eb5b0bf9761f9a47be27fe975b3b8c2ec"
|
checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "convert_case"
|
name = "convert_case"
|
||||||
|
@ -479,9 +479,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cpufeatures"
|
name = "cpufeatures"
|
||||||
version = "0.1.1"
|
version = "0.1.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "dec1028182c380cc45a2e2c5ec841134f2dfd0f8f5f0a5bcd68004f81b5efdf4"
|
checksum = "ed00c67cb5d0a7d64a44f6ad2668db7e7530311dd53ea79bcd4fb022c64911c8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
@ -547,9 +547,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "derive_more"
|
name = "derive_more"
|
||||||
version = "0.99.13"
|
version = "0.99.14"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f82b1b72f1263f214c0f823371768776c4f5841b942c9883aa8e5ec584fd0ba6"
|
checksum = "5cc7b9cef1e351660e5443924e4f43ab25fbbed3e9a5f052df3677deb4d6b320"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"convert_case",
|
"convert_case",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
|
@ -1522,18 +1522,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde"
|
name = "serde"
|
||||||
version = "1.0.125"
|
version = "1.0.126"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171"
|
checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.125"
|
version = "1.0.126"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d"
|
checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -1874,9 +1874,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "terminal_size"
|
name = "terminal_size"
|
||||||
version = "0.1.16"
|
version = "0.1.17"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "86ca8ced750734db02076f44132d802af0b33b09942331f4459dde8636fd2406"
|
checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"winapi",
|
"winapi",
|
||||||
|
@ -1977,9 +1977,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.5.0"
|
version = "1.6.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
|
checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"autocfg",
|
"autocfg",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -1997,9 +1997,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-macros"
|
name = "tokio-macros"
|
||||||
version = "1.1.0"
|
version = "1.2.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
|
checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -2019,9 +2019,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-stream"
|
name = "tokio-stream"
|
||||||
version = "0.1.5"
|
version = "0.1.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
|
checksum = "f8864d706fdb3cc0843a49647ac892720dac98a6eeb818b77190592cf4994066"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
|
@ -2031,9 +2031,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-util"
|
name = "tokio-util"
|
||||||
version = "0.6.6"
|
version = "0.6.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e"
|
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
|
11
src/cache/disk.rs
vendored
11
src/cache/disk.rs
vendored
|
@ -32,8 +32,7 @@ impl DiskCache {
|
||||||
/// Constructs a new low memory cache at the provided path and capacity.
|
/// Constructs a new low memory cache at the provided path and capacity.
|
||||||
/// This internally spawns a task that will wait for filesystem
|
/// This internally spawns a task that will wait for filesystem
|
||||||
/// notifications when a file has been written.
|
/// notifications when a file has been written.
|
||||||
#[allow(clippy::new_ret_no_self)]
|
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Self> {
|
||||||
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Box<dyn Cache>> {
|
|
||||||
let (db_tx, db_rx) = channel(128);
|
let (db_tx, db_rx) = channel(128);
|
||||||
let db_pool = {
|
let db_pool = {
|
||||||
let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy());
|
let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy());
|
||||||
|
@ -52,11 +51,11 @@ impl DiskCache {
|
||||||
db
|
db
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self {
|
let new_self = Arc::new(Self {
|
||||||
disk_path,
|
disk_path,
|
||||||
disk_cur_size: AtomicU64::new(0),
|
disk_cur_size: AtomicU64::new(0),
|
||||||
db_update_channel_sender: db_tx,
|
db_update_channel_sender: db_tx,
|
||||||
}));
|
});
|
||||||
|
|
||||||
tokio::spawn(db_listener(
|
tokio::spawn(db_listener(
|
||||||
Arc::clone(&new_self),
|
Arc::clone(&new_self),
|
||||||
|
@ -72,7 +71,7 @@ impl DiskCache {
|
||||||
/// Spawn a new task that will listen for updates to the db, pruning if the size
|
/// Spawn a new task that will listen for updates to the db, pruning if the size
|
||||||
/// becomes too large.
|
/// becomes too large.
|
||||||
async fn db_listener(
|
async fn db_listener(
|
||||||
cache: Arc<Box<dyn Cache>>,
|
cache: Arc<DiskCache>,
|
||||||
db_rx: Receiver<DbMessage>,
|
db_rx: Receiver<DbMessage>,
|
||||||
db_pool: SqlitePool,
|
db_pool: SqlitePool,
|
||||||
max_on_disk_size: u64,
|
max_on_disk_size: u64,
|
||||||
|
@ -202,7 +201,7 @@ impl Cache for DiskCache {
|
||||||
let path_0 = Arc::clone(&path);
|
let path_0 = Arc::clone(&path);
|
||||||
|
|
||||||
let db_callback = |size: u32| async move {
|
let db_callback = |size: u32| async move {
|
||||||
let _ = channel.send(DbMessage::Put(path_0, size)).await;
|
std::mem::drop(channel.send(DbMessage::Put(path_0, size)).await);
|
||||||
};
|
};
|
||||||
|
|
||||||
super::fs::write_file(&path, key, image, metadata, db_callback, None)
|
super::fs::write_file(&path, key, image, metadata, db_callback, None)
|
||||||
|
|
239
src/cache/fs.rs
vendored
239
src/cache/fs.rs
vendored
|
@ -14,13 +14,6 @@
|
||||||
//! upstream no longer needs to process duplicate requests and sequential cache
|
//! upstream no longer needs to process duplicate requests and sequential cache
|
||||||
//! misses are treated as closer as a cache hit.
|
//! misses are treated as closer as a cache hit.
|
||||||
|
|
||||||
use actix_web::error::PayloadError;
|
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
|
||||||
use futures::{Future, Stream, StreamExt};
|
|
||||||
use log::debug;
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use sodiumoxide::crypto::secretstream::Header;
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
|
@ -29,15 +22,30 @@ use std::num::NonZeroU32;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use actix_web::error::PayloadError;
|
||||||
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
|
use futures::{Future, Stream, StreamExt};
|
||||||
|
use log::{debug, warn};
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use sodiumoxide::crypto::secretstream::{
|
||||||
|
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
|
||||||
|
};
|
||||||
use tokio::fs::{create_dir_all, remove_file, File};
|
use tokio::fs::{create_dir_all, remove_file, File};
|
||||||
use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf};
|
use tokio::io::{
|
||||||
|
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
|
||||||
|
ReadBuf,
|
||||||
|
};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::sync::watch::{channel, Receiver};
|
use tokio::sync::watch::{channel, Receiver};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio_stream::wrappers::WatchStream;
|
use tokio_stream::wrappers::WatchStream;
|
||||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
|
|
||||||
use super::{BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream};
|
use super::{
|
||||||
|
BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream, ENCRYPTION_KEY,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub enum OnDiskMetadata {
|
pub enum OnDiskMetadata {
|
||||||
|
@ -71,28 +79,118 @@ pub(super) async fn read_file(
|
||||||
path: &Path,
|
path: &Path,
|
||||||
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
||||||
let std_file = std::fs::File::open(path).ok()?;
|
let std_file = std::fs::File::open(path).ok()?;
|
||||||
let file = File::from_std(std_file.try_clone().ok()?);
|
let mut file = File::from_std(std_file.try_clone().ok()?);
|
||||||
|
|
||||||
|
let mut reader = {
|
||||||
|
// If the encryption key was set, use the encrypted disk reader instead;
|
||||||
|
// else, just directly read from file.
|
||||||
|
let inner_reader: Pin<Box<dyn AsyncRead + Send>> = if let Some(key) = ENCRYPTION_KEY.get() {
|
||||||
|
let mut header_bytes = [0; HEADERBYTES];
|
||||||
|
if file.read_exact(&mut header_bytes).await.is_err() {
|
||||||
|
warn!("Found file, but encrypted header was not found. Assuming corrupted!");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Box::pin(EncryptedDiskReader::new(
|
||||||
|
file,
|
||||||
|
SecretStream::init_pull(
|
||||||
|
&Header::from_slice(&header_bytes).expect("failed to get header"),
|
||||||
|
key,
|
||||||
|
)
|
||||||
|
.expect("Failed to initialize decryption kesy"),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Box::pin(file)
|
||||||
|
};
|
||||||
|
|
||||||
|
BufReader::new(inner_reader)
|
||||||
|
};
|
||||||
|
|
||||||
let metadata = {
|
let metadata = {
|
||||||
let mut de = serde_json::Deserializer::from_reader(std_file);
|
let mut read = String::new();
|
||||||
ImageMetadata::deserialize(&mut de).ok()?
|
reader
|
||||||
|
.read_line(&mut read)
|
||||||
|
.await
|
||||||
|
.expect("failed to read metadata");
|
||||||
|
serde_json::from_str(&read).ok()?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let reader = Box::pin(reader);
|
||||||
|
|
||||||
// False positive lint, `file` is used in both cases, which means that it's
|
// False positive lint, `file` is used in both cases, which means that it's
|
||||||
// not possible to move this into a map_or_else without cloning `file`.
|
// not possible to move this into a map_or_else without cloning `file`.
|
||||||
#[allow(clippy::option_if_let_else)]
|
#[allow(clippy::option_if_let_else)]
|
||||||
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
|
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
|
||||||
InnerStream::Concurrent(ConcurrentFsStream::from_file(
|
InnerStream::Concurrent(ConcurrentFsStream::from_reader(
|
||||||
file,
|
reader,
|
||||||
WatchStream::new(status),
|
WatchStream::new(status),
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
InnerStream::Completed(FramedRead::new(BufReader::new(file), BytesCodec::new()))
|
InnerStream::Completed(FramedRead::new(reader, BytesCodec::new()))
|
||||||
};
|
};
|
||||||
|
|
||||||
Some(Ok((stream, None, metadata)))
|
Some(Ok((stream, None, metadata)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct EncryptedDiskReader {
|
||||||
|
file: Pin<Box<File>>,
|
||||||
|
stream: SecretStream<Pull>,
|
||||||
|
buf: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EncryptedDiskReader {
|
||||||
|
fn new(file: File, stream: SecretStream<Pull>) -> Self {
|
||||||
|
Self {
|
||||||
|
file: Box::pin(file),
|
||||||
|
stream,
|
||||||
|
buf: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for EncryptedDiskReader {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<std::io::Result<()>> {
|
||||||
|
let cursor_start = buf.filled().len();
|
||||||
|
|
||||||
|
let res = self.as_mut().file.as_mut().poll_read(cx, buf);
|
||||||
|
if res.is_pending() {
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
let cursor_new = buf.filled().len();
|
||||||
|
|
||||||
|
// pull_to_vec internally calls vec.clear() and vec.reserve(). Generally
|
||||||
|
// speaking we should be reading about the same amount of data each time
|
||||||
|
// so we shouldn't experience too much of a slow down w.r.t resizing the
|
||||||
|
// buffer...
|
||||||
|
let new_self = Pin::into_inner(self);
|
||||||
|
new_self
|
||||||
|
.stream
|
||||||
|
.pull_to_vec(
|
||||||
|
&buf.filled()[cursor_start..cursor_new],
|
||||||
|
None,
|
||||||
|
&mut new_self.buf,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// data is strictly smaller than the encrypted stream, since you need to
|
||||||
|
// encode tags as well, so this is always safe.
|
||||||
|
|
||||||
|
// rewrite encrypted data into decrypted data
|
||||||
|
let buffer = buf.filled_mut();
|
||||||
|
for (old, new) in buffer[cursor_start..].iter_mut().zip(&new_self.buf) {
|
||||||
|
*old = *new;
|
||||||
|
}
|
||||||
|
buf.set_filled(cursor_start + new_self.buf.len());
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Writes the metadata and input stream (in that order) to a file, returning a
|
/// Writes the metadata and input stream (in that order) to a file, returning a
|
||||||
/// stream that reads from that file. Accepts a db callback function that is
|
/// stream that reads from that file. Accepts a db callback function that is
|
||||||
/// provided the number of bytes written, and an optional on-complete callback
|
/// provided the number of bytes written, and an optional on-complete callback
|
||||||
|
@ -111,7 +209,7 @@ where
|
||||||
{
|
{
|
||||||
let (tx, rx) = channel(WritingStatus::NotDone);
|
let (tx, rx) = channel(WritingStatus::NotDone);
|
||||||
|
|
||||||
let mut file = {
|
let file = {
|
||||||
let mut write_lock = WRITING_STATUS.write().await;
|
let mut write_lock = WRITING_STATUS.write().await;
|
||||||
let parent = path.parent().expect("The path to have a parent");
|
let parent = path.parent().expect("The path to have a parent");
|
||||||
create_dir_all(parent).await?;
|
create_dir_all(parent).await?;
|
||||||
|
@ -122,6 +220,17 @@ where
|
||||||
|
|
||||||
let metadata_string = serde_json::to_string(&metadata).expect("serialization to work");
|
let metadata_string = serde_json::to_string(&metadata).expect("serialization to work");
|
||||||
let metadata_size = metadata_string.len();
|
let metadata_size = metadata_string.len();
|
||||||
|
|
||||||
|
let (mut writer, maybe_header): (Pin<Box<dyn AsyncWrite + Send>>, _) =
|
||||||
|
if let Some((enc, header)) = ENCRYPTION_KEY
|
||||||
|
.get()
|
||||||
|
.map(|key| SecretStream::init_push(key).expect("Failed to init enc stream"))
|
||||||
|
{
|
||||||
|
(Box::pin(EncryptedDiskWriter::new(file, enc)), Some(header))
|
||||||
|
} else {
|
||||||
|
(Box::pin(file), None)
|
||||||
|
};
|
||||||
|
|
||||||
// need owned variant because async lifetime
|
// need owned variant because async lifetime
|
||||||
let path_buf = path.to_path_buf();
|
let path_buf = path.to_path_buf();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
@ -130,7 +239,8 @@ where
|
||||||
let mut bytes_written: u32 = 0;
|
let mut bytes_written: u32 = 0;
|
||||||
let mut acc_bytes = BytesMut::new();
|
let mut acc_bytes = BytesMut::new();
|
||||||
let accumulate = on_complete.is_some();
|
let accumulate = on_complete.is_some();
|
||||||
file.write_all(metadata_string.as_bytes()).await?;
|
writer.write_all(metadata_string.as_bytes()).await?;
|
||||||
|
writer.write_all(b"\n").await?;
|
||||||
|
|
||||||
while let Some(bytes) = byte_stream.next().await {
|
while let Some(bytes) = byte_stream.next().await {
|
||||||
if let Ok(mut bytes) = bytes {
|
if let Ok(mut bytes) = bytes {
|
||||||
|
@ -139,7 +249,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match file.write(&bytes).await? {
|
match writer.write(&bytes).await? {
|
||||||
0 => break,
|
0 => break,
|
||||||
n => {
|
n => {
|
||||||
bytes.advance(n);
|
bytes.advance(n);
|
||||||
|
@ -162,8 +272,8 @@ where
|
||||||
// We don't care about the result of the call.
|
// We don't care about the result of the call.
|
||||||
std::mem::drop(remove_file(&path_buf).await);
|
std::mem::drop(remove_file(&path_buf).await);
|
||||||
} else {
|
} else {
|
||||||
file.flush().await?;
|
writer.flush().await?;
|
||||||
file.sync_all().await?; // we need metadata
|
// writer.sync_all().await?; // we need metadata
|
||||||
debug!("writing to file done");
|
debug!("writing to file done");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,13 +314,81 @@ where
|
||||||
InnerStream::Concurrent(
|
InnerStream::Concurrent(
|
||||||
ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?,
|
ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?,
|
||||||
),
|
),
|
||||||
None,
|
maybe_header,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct EncryptedDiskWriter {
|
||||||
|
file: Pin<Box<File>>,
|
||||||
|
stream: Option<SecretStream<Push>>,
|
||||||
|
encryption_buffer: Vec<u8>,
|
||||||
|
write_buffer: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EncryptedDiskWriter {
|
||||||
|
fn new(file: File, stream: SecretStream<Push>) -> Self {
|
||||||
|
Self {
|
||||||
|
file: Box::pin(file),
|
||||||
|
stream: Some(stream),
|
||||||
|
encryption_buffer: vec![],
|
||||||
|
write_buffer: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWrite for EncryptedDiskWriter {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<Result<usize, std::io::Error>> {
|
||||||
|
let new_self = Pin::into_inner(self);
|
||||||
|
{
|
||||||
|
let encryption_buffer = &mut new_self.encryption_buffer;
|
||||||
|
if let Some(stream) = new_self.stream.as_mut() {
|
||||||
|
stream
|
||||||
|
.push_to_vec(buf, None, Tag::Message, encryption_buffer)
|
||||||
|
.expect("Failed to write encrypted data to buffer");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
new_self.write_buffer.extend(&new_self.encryption_buffer);
|
||||||
|
|
||||||
|
match new_self
|
||||||
|
.file
|
||||||
|
.as_mut()
|
||||||
|
.poll_write(cx, &new_self.write_buffer)
|
||||||
|
{
|
||||||
|
Poll::Ready(Ok(n)) => {
|
||||||
|
new_self.write_buffer.drain(..n);
|
||||||
|
Poll::Ready(Ok(n))
|
||||||
|
}
|
||||||
|
poll => poll,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
|
self.file.as_mut().poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
|
self.as_mut()
|
||||||
|
.stream
|
||||||
|
.take()
|
||||||
|
.map(|stream| stream.finalize(None));
|
||||||
|
self.file.as_mut().poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct ConcurrentFsStream {
|
pub struct ConcurrentFsStream {
|
||||||
/// The File to read from
|
/// The File to read from
|
||||||
file: Pin<Box<BufReader<File>>>,
|
reader: Pin<Box<dyn AsyncRead + Send>>,
|
||||||
/// The channel to get updates from. The writer must send its status, else
|
/// The channel to get updates from. The writer must send its status, else
|
||||||
/// this reader will never complete.
|
/// this reader will never complete.
|
||||||
receiver: Pin<Box<WatchStream<WritingStatus>>>,
|
receiver: Pin<Box<WatchStream<WritingStatus>>>,
|
||||||
|
@ -229,12 +407,15 @@ impl ConcurrentFsStream {
|
||||||
) -> Result<Self, std::io::Error> {
|
) -> Result<Self, std::io::Error> {
|
||||||
let mut file = File::open(path).await?;
|
let mut file = File::open(path).await?;
|
||||||
file.seek(SeekFrom::Start(seek as u64)).await?;
|
file.seek(SeekFrom::Start(seek as u64)).await?;
|
||||||
Ok(Self::from_file(file, receiver))
|
Ok(Self::from_reader(Box::pin(file), receiver))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
|
fn from_reader(
|
||||||
|
reader: Pin<Box<dyn AsyncRead + Send>>,
|
||||||
|
receiver: WatchStream<WritingStatus>,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
file: Box::pin(BufReader::new(file)),
|
reader: Box::pin(reader),
|
||||||
receiver: Box::pin(receiver),
|
receiver: Box::pin(receiver),
|
||||||
bytes_read: 0,
|
bytes_read: 0,
|
||||||
bytes_total: None,
|
bytes_total: None,
|
||||||
|
@ -263,7 +444,7 @@ impl Stream for ConcurrentFsStream {
|
||||||
// TODO: Might be more efficient to have a larger buffer
|
// TODO: Might be more efficient to have a larger buffer
|
||||||
let mut bytes = [0; 4 * 1024].to_vec();
|
let mut bytes = [0; 4 * 1024].to_vec();
|
||||||
let mut buffer = ReadBuf::new(&mut bytes);
|
let mut buffer = ReadBuf::new(&mut bytes);
|
||||||
match self.file.as_mut().poll_read(cx, &mut buffer) {
|
match self.reader.as_mut().poll_read(cx, &mut buffer) {
|
||||||
Poll::Ready(Ok(_)) => (),
|
Poll::Ready(Ok(_)) => (),
|
||||||
Poll::Ready(Err(_)) => return Poll::Ready(Some(Err(UpstreamError))),
|
Poll::Ready(Err(_)) => return Poll::Ready(Some(Err(UpstreamError))),
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
|
@ -327,3 +508,9 @@ enum WritingStatus {
|
||||||
Done(u32),
|
Done(u32),
|
||||||
Error,
|
Error,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod storage {
|
||||||
|
#[test]
|
||||||
|
fn wut() {}
|
||||||
|
}
|
||||||
|
|
132
src/cache/mem.rs
vendored
132
src/cache/mem.rs
vendored
|
@ -1,11 +1,9 @@
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::cache::DiskCache;
|
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
BoxedImageStream, Cache, CacheKey, CacheStream, ImageMetadata, InnerStream, MemStream,
|
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata, InnerStream,
|
||||||
|
MemStream,
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
@ -30,37 +28,81 @@ pub trait InternalMemoryCache: Sync + Send {
|
||||||
fn pop(&mut self) -> Option<(CacheKey, CacheValue)>;
|
fn pop(&mut self) -> Option<(CacheKey, CacheValue)>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl InternalMemoryCache for Lfu {
|
||||||
|
#[inline]
|
||||||
|
fn unbounded() -> Self {
|
||||||
|
Self::unbounded()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
||||||
|
self.get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
||||||
|
self.insert(key, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
||||||
|
self.pop_lfu_key_value()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InternalMemoryCache for Lru {
|
||||||
|
#[inline]
|
||||||
|
fn unbounded() -> Self {
|
||||||
|
Self::unbounded()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
||||||
|
self.get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
||||||
|
self.put(key, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
||||||
|
self.pop_lru()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Memory accelerated disk cache. Uses the internal cache implementation in
|
/// Memory accelerated disk cache. Uses the internal cache implementation in
|
||||||
/// memory to speed up reads.
|
/// memory to speed up reads.
|
||||||
pub struct MemoryCache<InternalCacheImpl> {
|
pub struct MemoryCache<InternalCacheImpl, InnerCache> {
|
||||||
inner: Arc<Box<dyn Cache>>,
|
inner: InnerCache,
|
||||||
cur_mem_size: AtomicU64,
|
cur_mem_size: AtomicU64,
|
||||||
mem_cache: Mutex<InternalCacheImpl>,
|
mem_cache: Mutex<InternalCacheImpl>,
|
||||||
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
|
master_sender: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<InternalCacheImpl: 'static + InternalMemoryCache> MemoryCache<InternalCacheImpl> {
|
impl<InternalCacheImpl: 'static + InternalMemoryCache, InnerCache: 'static + Cache>
|
||||||
#[allow(clippy::new_ret_no_self)]
|
MemoryCache<InternalCacheImpl, InnerCache>
|
||||||
pub async fn new(
|
{
|
||||||
disk_max_size: u64,
|
pub async fn new(inner: InnerCache, max_mem_size: u64) -> Arc<Self> {
|
||||||
disk_path: PathBuf,
|
|
||||||
max_mem_size: u64,
|
|
||||||
) -> Arc<Box<dyn Cache>> {
|
|
||||||
let (tx, mut rx) = channel(100);
|
let (tx, mut rx) = channel(100);
|
||||||
let new_self = Arc::new(Box::new(Self {
|
let new_self = Arc::new(Self {
|
||||||
inner: DiskCache::new(disk_max_size, disk_path).await,
|
inner,
|
||||||
cur_mem_size: AtomicU64::new(0),
|
cur_mem_size: AtomicU64::new(0),
|
||||||
mem_cache: Mutex::new(InternalCacheImpl::unbounded()),
|
mem_cache: Mutex::new(InternalCacheImpl::unbounded()),
|
||||||
master_sender: tx,
|
master_sender: tx,
|
||||||
}) as Box<dyn Cache>);
|
});
|
||||||
|
|
||||||
let new_self_0 = Arc::clone(&new_self);
|
let new_self_0 = Arc::clone(&new_self);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let new_self = new_self_0;
|
let new_self = new_self_0;
|
||||||
let max_mem_size = max_mem_size / 20 * 19;
|
let max_mem_size = max_mem_size / 20 * 19;
|
||||||
while let Some((key, bytes, metadata, size)) = rx.recv().await {
|
while let Some((key, bytes, metadata, size)) = rx.recv().await {
|
||||||
new_self.increase_usage(size as u32);
|
new_self.inner.increase_usage(size as u32);
|
||||||
new_self.put_internal(key, bytes, metadata, size).await;
|
new_self
|
||||||
|
.inner
|
||||||
|
.put_internal(key, bytes, metadata, size)
|
||||||
|
.await;
|
||||||
while new_self.mem_size() >= max_mem_size {
|
while new_self.mem_size() >= max_mem_size {
|
||||||
if let Some((_, _, _, size)) = new_self.pop_memory().await {
|
if let Some((_, _, _, size)) = new_self.pop_memory().await {
|
||||||
new_self.decrease_usage(size as u64);
|
new_self.decrease_usage(size as u64);
|
||||||
|
@ -76,7 +118,9 @@ impl<InternalCacheImpl: 'static + InternalMemoryCache> MemoryCache<InternalCache
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<InternalCacheImpl: InternalMemoryCache> Cache for MemoryCache<InternalCacheImpl> {
|
impl<InternalCacheImpl: InternalMemoryCache, InnerCache: Cache> Cache
|
||||||
|
for MemoryCache<InternalCacheImpl, InnerCache>
|
||||||
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn get(
|
async fn get(
|
||||||
&self,
|
&self,
|
||||||
|
@ -86,7 +130,11 @@ impl<InternalCacheImpl: InternalMemoryCache> Cache for MemoryCache<InternalCache
|
||||||
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
|
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
|
||||||
Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata))
|
Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata))
|
||||||
}) {
|
}) {
|
||||||
Some(v) => Some(v.map(|(inner, metadata)| todo!())),
|
Some(v) => Some(v.and_then(|(inner, metadata)| {
|
||||||
|
CacheStream::new(inner, None)
|
||||||
|
.map(|v| (v, metadata))
|
||||||
|
.map_err(|_| CacheError::DecryptionFailure)
|
||||||
|
})),
|
||||||
None => self.inner.get(key).await,
|
None => self.inner.get(key).await,
|
||||||
},
|
},
|
||||||
None => self.inner.get(key).await,
|
None => self.inner.get(key).await,
|
||||||
|
@ -162,47 +210,3 @@ impl<InternalCacheImpl: InternalMemoryCache> Cache for MemoryCache<InternalCache
|
||||||
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size))
|
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InternalMemoryCache for Lfu {
|
|
||||||
#[inline]
|
|
||||||
fn unbounded() -> Self {
|
|
||||||
Self::unbounded()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
|
||||||
self.get(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
|
||||||
self.insert(key, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
|
||||||
self.pop_lfu_key_value()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InternalMemoryCache for Lru {
|
|
||||||
#[inline]
|
|
||||||
fn unbounded() -> Self {
|
|
||||||
Self::unbounded()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
|
||||||
self.get(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
|
||||||
self.put(key, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
|
||||||
self.pop_lru()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
83
src/cache/mod.rs
vendored
83
src/cache/mod.rs
vendored
|
@ -10,13 +10,12 @@ use bytes::{Bytes, BytesMut};
|
||||||
use chrono::{DateTime, FixedOffset};
|
use chrono::{DateTime, FixedOffset};
|
||||||
use fs::ConcurrentFsStream;
|
use fs::ConcurrentFsStream;
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::OnceCell;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||||
use sodiumoxide::crypto::secretstream::{gen_key, Header, Key, Pull, Stream as SecretStream};
|
use sodiumoxide::crypto::secretstream::{Header, Key, Pull, Stream as SecretStream};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::fs::File;
|
use tokio::io::AsyncRead;
|
||||||
use tokio::io::BufReader;
|
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
|
|
||||||
|
@ -24,7 +23,7 @@ pub use disk::DiskCache;
|
||||||
pub use fs::UpstreamError;
|
pub use fs::UpstreamError;
|
||||||
pub use mem::MemoryCache;
|
pub use mem::MemoryCache;
|
||||||
|
|
||||||
static ENCRYPTION_KEY: Lazy<Key> = Lazy::new(gen_key);
|
pub static ENCRYPTION_KEY: OnceCell<Key> = OnceCell::new();
|
||||||
|
|
||||||
mod disk;
|
mod disk;
|
||||||
mod fs;
|
mod fs;
|
||||||
|
@ -206,6 +205,76 @@ pub trait Cache: Send + Sync {
|
||||||
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>;
|
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Cache> Cache for std::sync::Arc<T> {
|
||||||
|
#[inline]
|
||||||
|
async fn get(
|
||||||
|
&self,
|
||||||
|
key: &CacheKey,
|
||||||
|
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
||||||
|
self.as_ref().get(key).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
async fn put(
|
||||||
|
&self,
|
||||||
|
key: CacheKey,
|
||||||
|
image: BoxedImageStream,
|
||||||
|
metadata: ImageMetadata,
|
||||||
|
) -> Result<CacheStream, CacheError> {
|
||||||
|
self.as_ref().put(key, image, metadata).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn increase_usage(&self, amt: u32) {
|
||||||
|
self.as_ref().increase_usage(amt)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn decrease_usage(&self, amt: u64) {
|
||||||
|
self.as_ref().decrease_usage(amt)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn on_disk_size(&self) -> u64 {
|
||||||
|
self.as_ref().on_disk_size()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn mem_size(&self) -> u64 {
|
||||||
|
self.as_ref().mem_size()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
async fn put_with_on_completed_callback(
|
||||||
|
&self,
|
||||||
|
key: CacheKey,
|
||||||
|
image: BoxedImageStream,
|
||||||
|
metadata: ImageMetadata,
|
||||||
|
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, usize)>,
|
||||||
|
) -> Result<CacheStream, CacheError> {
|
||||||
|
self.as_ref()
|
||||||
|
.put_with_on_completed_callback(key, image, metadata, on_complete)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
async fn put_internal(
|
||||||
|
&self,
|
||||||
|
key: CacheKey,
|
||||||
|
image: Bytes,
|
||||||
|
metadata: ImageMetadata,
|
||||||
|
size: usize,
|
||||||
|
) {
|
||||||
|
self.as_ref().put_internal(key, image, metadata, size).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
async fn pop_memory(&self) -> Option<(CacheKey, Bytes, ImageMetadata, usize)> {
|
||||||
|
self.as_ref().pop_memory().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct CacheStream {
|
pub struct CacheStream {
|
||||||
inner: InnerStream,
|
inner: InnerStream,
|
||||||
decrypt: Option<SecretStream<Pull>>,
|
decrypt: Option<SecretStream<Pull>>,
|
||||||
|
@ -216,7 +285,7 @@ impl CacheStream {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
decrypt: header
|
decrypt: header
|
||||||
.map(|header| SecretStream::init_pull(&header, &ENCRYPTION_KEY))
|
.map(|header| SecretStream::init_pull(&header, ENCRYPTION_KEY.get().unwrap()))
|
||||||
.transpose()?,
|
.transpose()?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -248,7 +317,7 @@ impl Stream for CacheStream {
|
||||||
pub(self) enum InnerStream {
|
pub(self) enum InnerStream {
|
||||||
Concurrent(ConcurrentFsStream),
|
Concurrent(ConcurrentFsStream),
|
||||||
Memory(MemStream),
|
Memory(MemStream),
|
||||||
Completed(FramedRead<BufReader<File>, BytesCodec>),
|
Completed(FramedRead<Pin<Box<dyn AsyncRead + Send>>, BytesCodec>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<CachedImage> for InnerStream {
|
impl From<CachedImage> for InnerStream {
|
||||||
|
|
24
src/main.rs
24
src/main.rs
|
@ -18,15 +18,17 @@ use actix_web::{App, HttpServer};
|
||||||
use cache::{Cache, DiskCache};
|
use cache::{Cache, DiskCache};
|
||||||
use clap::Clap;
|
use clap::Clap;
|
||||||
use config::CliArgs;
|
use config::CliArgs;
|
||||||
use log::{debug, error, warn, LevelFilter};
|
use log::{debug, error, info, warn, LevelFilter};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rustls::{NoClientAuth, ServerConfig};
|
use rustls::{NoClientAuth, ServerConfig};
|
||||||
use simple_logger::SimpleLogger;
|
use simple_logger::SimpleLogger;
|
||||||
|
use sodiumoxide::crypto::secretstream::gen_key;
|
||||||
use state::{RwLockServerState, ServerState};
|
use state::{RwLockServerState, ServerState};
|
||||||
use stop::send_stop;
|
use stop::send_stop;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
use crate::cache::{mem, MemoryCache};
|
use crate::cache::mem::{Lfu, Lru};
|
||||||
|
use crate::cache::{MemoryCache, ENCRYPTION_KEY};
|
||||||
use crate::config::UnstableOptions;
|
use crate::config::UnstableOptions;
|
||||||
use crate::state::DynamicServerCert;
|
use crate::state::DynamicServerCert;
|
||||||
|
|
||||||
|
@ -76,6 +78,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
(0, 0) => LevelFilter::Info,
|
(0, 0) => LevelFilter::Info,
|
||||||
(_, 1) => LevelFilter::Debug,
|
(_, 1) => LevelFilter::Debug,
|
||||||
(_, n) if n > 1 => LevelFilter::Trace,
|
(_, n) if n > 1 => LevelFilter::Trace,
|
||||||
|
// compiler can't figure it out
|
||||||
_ => unsafe { unreachable_unchecked() },
|
_ => unsafe { unreachable_unchecked() },
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -94,6 +97,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
};
|
};
|
||||||
let client_secret_1 = client_secret.clone();
|
let client_secret_1 = client_secret.clone();
|
||||||
|
|
||||||
|
if cli_args.ephemeral_disk_encryption {
|
||||||
|
info!("Running with at-rest encryption!");
|
||||||
|
ENCRYPTION_KEY.set(gen_key()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
let server = ServerState::init(&client_secret, &cli_args).await?;
|
let server = ServerState::init(&client_secret, &cli_args).await?;
|
||||||
let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
|
let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
|
||||||
let data_1 = Arc::clone(&data_0);
|
let data_1 = Arc::clone(&data_0);
|
||||||
|
@ -119,7 +127,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
if running_2.load(Ordering::SeqCst) {
|
if running_2.load(Ordering::SeqCst) {
|
||||||
send_stop(&client_secret).await;
|
send_stop(&client_secret).await;
|
||||||
} else {
|
} else {
|
||||||
warn!("Got second ctrl-c, forcefully exiting");
|
warn!("Got second Ctrl-C, forcefully exiting");
|
||||||
system.stop()
|
system.stop()
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -138,12 +146,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let cache: Arc<Box<dyn Cache>> = if low_mem_mode {
|
let cache = DiskCache::new(disk_quota, cache_path.clone()).await;
|
||||||
DiskCache::new(disk_quota, cache_path.clone()).await
|
let cache: Arc<dyn Cache> = if low_mem_mode {
|
||||||
|
cache
|
||||||
} else if use_lfu {
|
} else if use_lfu {
|
||||||
MemoryCache::<mem::Lfu>::new(disk_quota, cache_path.clone(), memory_max_size).await
|
MemoryCache::<Lfu, _>::new(cache, memory_max_size).await
|
||||||
} else {
|
} else {
|
||||||
MemoryCache::<mem::Lru>::new(disk_quota, cache_path.clone(), memory_max_size).await
|
MemoryCache::<Lru, _>::new(cache, memory_max_size).await
|
||||||
};
|
};
|
||||||
|
|
||||||
let cache_0 = Arc::clone(&cache);
|
let cache_0 = Arc::clone(&cache);
|
||||||
|
@ -159,7 +168,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
})
|
})
|
||||||
.shutdown_timeout(60)
|
.shutdown_timeout(60)
|
||||||
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)?
|
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)?
|
||||||
// .bind(format!("0.0.0.0:{}", port))?
|
|
||||||
.run()
|
.run()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue