Encrypted files work in debug mode
This commit is contained in:
parent
656543b539
commit
6ac8582183
7 changed files with 101 additions and 282 deletions
21
Cargo.lock
generated
21
Cargo.lock
generated
|
@ -418,6 +418,17 @@ version = "1.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "chacha20"
|
||||||
|
version = "0.7.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "fee7ad89dc1128635074c268ee661f90c3f7e83d9fd12910608c36b47d6c3412"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"cipher",
|
||||||
|
"cpufeatures",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "chrono"
|
name = "chrono"
|
||||||
version = "0.4.19"
|
version = "0.4.19"
|
||||||
|
@ -432,6 +443,15 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cipher"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
|
||||||
|
dependencies = [
|
||||||
|
"generic-array",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "3.0.0-beta.2"
|
version = "3.0.0-beta.2"
|
||||||
|
@ -1177,6 +1197,7 @@ dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"bincode",
|
"bincode",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"chacha20",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"ctrlc",
|
"ctrlc",
|
||||||
|
|
|
@ -20,6 +20,7 @@ async-trait = "0.1"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
bincode = "1"
|
bincode = "1"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
|
chacha20 = "0.7"
|
||||||
chrono = { version = "0.4", features = [ "serde" ] }
|
chrono = { version = "0.4", features = [ "serde" ] }
|
||||||
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
|
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
|
||||||
ctrlc = "3"
|
ctrlc = "3"
|
||||||
|
|
4
src/cache/disk.rs
vendored
4
src/cache/disk.rs
vendored
|
@ -336,9 +336,7 @@ impl Cache for DiskCache {
|
||||||
tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await });
|
tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await });
|
||||||
|
|
||||||
super::fs::read_file_from_path(&path).await.map(|res| {
|
super::fs::read_file_from_path(&path).await.map(|res| {
|
||||||
let (inner, maybe_header, metadata) = res?;
|
res.map(|(stream, _, metadata)| (stream, metadata))
|
||||||
CacheStream::new(inner, maybe_header)
|
|
||||||
.map(|stream| (stream, metadata))
|
|
||||||
.map_err(|_| CacheError::DecryptionFailure)
|
.map_err(|_| CacheError::DecryptionFailure)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
286
src/cache/fs.rs
vendored
286
src/cache/fs.rs
vendored
|
@ -14,7 +14,6 @@
|
||||||
//! upstream no longer needs to process duplicate requests and sequential cache
|
//! upstream no longer needs to process duplicate requests and sequential cache
|
||||||
//! misses are treated as closer as a cache hit.
|
//! misses are treated as closer as a cache hit.
|
||||||
|
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::io::{Seek, SeekFrom};
|
use std::io::{Seek, SeekFrom};
|
||||||
|
@ -25,11 +24,11 @@ use std::task::{Context, Poll};
|
||||||
use actix_web::error::PayloadError;
|
use actix_web::error::PayloadError;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use chacha20::cipher::{NewCipher, StreamCipher};
|
||||||
|
use chacha20::{Key, XChaCha20, XNonce};
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::Deserialize;
|
||||||
use sodiumoxide::crypto::secretstream::{
|
use sodiumoxide::crypto::stream::xchacha20::{gen_nonce, NONCEBYTES};
|
||||||
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
|
|
||||||
};
|
|
||||||
use tokio::fs::{create_dir_all, remove_file, File};
|
use tokio::fs::{create_dir_all, remove_file, File};
|
||||||
use tokio::io::{
|
use tokio::io::{
|
||||||
AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
|
AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
|
||||||
|
@ -40,13 +39,7 @@ use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
use tracing::{debug, instrument, warn};
|
use tracing::{debug, instrument, warn};
|
||||||
|
|
||||||
use super::compat::LegacyImageMetadata;
|
use super::compat::LegacyImageMetadata;
|
||||||
use super::{CacheKey, ImageMetadata, InnerStream, ENCRYPTION_KEY};
|
use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub enum OnDiskMetadata {
|
|
||||||
Encrypted(Header, ImageMetadata),
|
|
||||||
Plaintext(ImageMetadata),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempts to lookup the file on disk, returning a byte stream if it exists.
|
/// Attempts to lookup the file on disk, returning a byte stream if it exists.
|
||||||
/// Note that this could return two types of streams, depending on if the file
|
/// Note that this could return two types of streams, depending on if the file
|
||||||
|
@ -54,14 +47,14 @@ pub enum OnDiskMetadata {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub(super) async fn read_file_from_path(
|
pub(super) async fn read_file_from_path(
|
||||||
path: &Path,
|
path: &Path,
|
||||||
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
) -> Option<Result<(CacheStream, Option<XNonce>, ImageMetadata), std::io::Error>> {
|
||||||
read_file(std::fs::File::open(path).ok()?).await
|
read_file(std::fs::File::open(path).ok()?).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "debug")]
|
#[instrument(level = "debug")]
|
||||||
async fn read_file(
|
async fn read_file(
|
||||||
file: std::fs::File,
|
file: std::fs::File,
|
||||||
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
) -> Option<Result<(CacheStream, Option<XNonce>, ImageMetadata), std::io::Error>> {
|
||||||
let mut file_0 = file.try_clone().unwrap();
|
let mut file_0 = file.try_clone().unwrap();
|
||||||
let file_1 = file.try_clone().unwrap();
|
let file_1 = file.try_clone().unwrap();
|
||||||
|
|
||||||
|
@ -102,31 +95,20 @@ async fn read_file(
|
||||||
// If the encryption key was set, use the encrypted disk reader instead;
|
// If the encryption key was set, use the encrypted disk reader instead;
|
||||||
// else, just directly read from file.
|
// else, just directly read from file.
|
||||||
if let Some(key) = ENCRYPTION_KEY.get() {
|
if let Some(key) = ENCRYPTION_KEY.get() {
|
||||||
let mut header_bytes = [0; HEADERBYTES];
|
let mut nonce_bytes = [0; NONCEBYTES];
|
||||||
if let Err(e) = file.read_exact(&mut header_bytes).await {
|
if let Err(e) = file.read_exact(&mut nonce_bytes).await {
|
||||||
warn!("Found file but failed reading header: {}", e);
|
warn!("Found file but failed reading header: {}", e);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("header bytes: {:x?}", header_bytes);
|
debug!("header bytes: {:x?}", nonce_bytes);
|
||||||
|
|
||||||
let file_header = if let Some(header) = Header::from_slice(&header_bytes) {
|
maybe_header = Some(*XNonce::from_slice(&nonce_bytes));
|
||||||
header
|
reader = Some(Box::pin(BufReader::new(EncryptedDiskReader::new(
|
||||||
} else {
|
file,
|
||||||
warn!("Found file, but encrypted header was invalid. Assuming corrupted!");
|
XNonce::from_slice(&XNonce::from_slice(&nonce_bytes)),
|
||||||
return None;
|
key,
|
||||||
};
|
))));
|
||||||
|
|
||||||
let secret_stream = if let Ok(stream) = SecretStream::init_pull(&file_header, key) {
|
|
||||||
debug!("Valid header found!");
|
|
||||||
stream
|
|
||||||
} else {
|
|
||||||
warn!("Failed to init secret stream with key and header. Assuming corrupted!");
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
maybe_header = Some(file_header);
|
|
||||||
reader = Some(Box::pin(EncryptedDiskReader::new(file, secret_stream)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
parsed_metadata = if let Some(reader) = reader.as_mut() {
|
parsed_metadata = if let Some(reader) = reader.as_mut() {
|
||||||
|
@ -151,7 +133,7 @@ async fn read_file(
|
||||||
|
|
||||||
if let Some(reader) = reader {
|
if let Some(reader) = reader {
|
||||||
let stream =
|
let stream =
|
||||||
InnerStream::Completed(FramedRead::new(reader as Pin<Box<_>>, BytesCodec::new()));
|
CacheStream::Completed(FramedRead::new(reader as Pin<Box<_>>, BytesCodec::new()));
|
||||||
parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata)))
|
parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata)))
|
||||||
} else {
|
} else {
|
||||||
debug!("Reader was invalid, file is corrupt");
|
debug!("Reader was invalid, file is corrupt");
|
||||||
|
@ -161,21 +143,14 @@ async fn read_file(
|
||||||
|
|
||||||
struct EncryptedDiskReader {
|
struct EncryptedDiskReader {
|
||||||
file: Pin<Box<File>>,
|
file: Pin<Box<File>>,
|
||||||
stream: SecretStream<Pull>,
|
keystream: XChaCha20,
|
||||||
// Bytes we read from the secret stream
|
|
||||||
read_buffer: Box<[u8; 4096]>,
|
|
||||||
read_data: Vec<u8>,
|
|
||||||
decryption_buffer: Vec<u8>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EncryptedDiskReader {
|
impl EncryptedDiskReader {
|
||||||
fn new(file: File, stream: SecretStream<Pull>) -> Self {
|
fn new(file: File, nonce: &XNonce, key: &Key) -> Self {
|
||||||
Self {
|
Self {
|
||||||
file: Box::pin(file),
|
file: Box::pin(file),
|
||||||
stream,
|
keystream: XChaCha20::new(key, nonce),
|
||||||
read_buffer: Box::new([0; 4096]),
|
|
||||||
read_data: Vec::with_capacity(4096),
|
|
||||||
decryption_buffer: Vec::with_capacity(4096),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -199,91 +174,13 @@ impl AsyncRead for EncryptedDiskReader {
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut ReadBuf<'_>,
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<std::io::Result<()>> {
|
) -> Poll<std::io::Result<()>> {
|
||||||
let buf_res = match self.as_mut().poll_fill_buf(cx) {
|
let previously_read = buf.filled().len();
|
||||||
Poll::Ready(Ok(bytes)) => bytes,
|
let res = self.as_mut().file.as_mut().poll_read(cx, buf);
|
||||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
let bytes_modified = buf.filled().len() - previously_read;
|
||||||
Poll::Pending => return Poll::Pending,
|
self.keystream.apply_keystream(
|
||||||
};
|
&mut buf.filled_mut()[previously_read..previously_read + bytes_modified],
|
||||||
let size = buf.capacity().min(buf_res.len());
|
);
|
||||||
buf.put_slice(&buf_res[..size]);
|
res
|
||||||
self.as_mut().consume(size);
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncBufRead for EncryptedDiskReader {
|
|
||||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
|
|
||||||
let pinned_self = Pin::into_inner(self);
|
|
||||||
let mut read_buf = ReadBuf::new(pinned_self.read_buffer.as_mut());
|
|
||||||
|
|
||||||
// // Load content from the leftovers
|
|
||||||
// read_buf.put_slice(&pinned_self.read_leftovers);
|
|
||||||
|
|
||||||
let msg = loop {
|
|
||||||
let prev_len = read_buf.filled().len();
|
|
||||||
// First, try and read from the underlying file.
|
|
||||||
let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf);
|
|
||||||
|
|
||||||
// Wait until we have something to read
|
|
||||||
if read_res.is_pending() {
|
|
||||||
dbg!(line!());
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
|
|
||||||
if prev_len == read_buf.filled().len() && pinned_self.read_data.is_empty() {
|
|
||||||
dbg!(line!());
|
|
||||||
return Poll::Ready(Ok(&[]));
|
|
||||||
}
|
|
||||||
|
|
||||||
pinned_self.read_data.extend_from_slice(read_buf.filled());
|
|
||||||
|
|
||||||
if dbg!(pinned_self.read_data.len()) >= std::mem::size_of::<usize>() {
|
|
||||||
let (size, rest) = pinned_self.read_data.split_at(std::mem::size_of::<usize>());
|
|
||||||
let size = usize::from_le_bytes(size.try_into().unwrap());
|
|
||||||
if dbg!(size) <= dbg!(rest.len()) {
|
|
||||||
let (data, leftovers) = rest.split_at(size);
|
|
||||||
// Extract data into their own vec
|
|
||||||
let data = data.to_vec();
|
|
||||||
|
|
||||||
// temporarily store leftovers in decryption buffer,
|
|
||||||
// then move leftovers back into read_data.
|
|
||||||
// This is to avoid an alloc
|
|
||||||
pinned_self.decryption_buffer.clear();
|
|
||||||
pinned_self.decryption_buffer.extend_from_slice(&leftovers);
|
|
||||||
pinned_self.read_data.clear();
|
|
||||||
pinned_self
|
|
||||||
.read_data
|
|
||||||
.extend_from_slice(&pinned_self.decryption_buffer);
|
|
||||||
dbg!(line!());
|
|
||||||
break data;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("read message of size {:?}", msg.len());
|
|
||||||
debug!("First 5 bytes {:x?}", &msg[..5]);
|
|
||||||
debug!("Last 5 bytes {:x?}", &msg[msg.len() - 5..]);
|
|
||||||
|
|
||||||
if pinned_self
|
|
||||||
.stream
|
|
||||||
.pull_to_vec(&msg, None, &mut pinned_self.decryption_buffer)
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
dbg!(line!());
|
|
||||||
return Poll::Ready(Err(std::io::Error::new(
|
|
||||||
std::io::ErrorKind::Other,
|
|
||||||
"Failed to decrypt data",
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// debug!("decrypted read: {:x?}", &pinned_self.decryption_buffer);
|
|
||||||
|
|
||||||
dbg!(line!());
|
|
||||||
Poll::Ready(Ok(&pinned_self.decryption_buffer))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn consume(mut self: Pin<&mut Self>, amt: usize) {
|
|
||||||
self.as_mut().decryption_buffer.drain(..amt);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -298,14 +195,12 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
||||||
let buf = match self.0.as_mut().poll_fill_buf(cx) {
|
let buf = match self.0.as_mut().poll_fill_buf(cx) {
|
||||||
Poll::Ready(Ok(buffer)) => buffer,
|
Poll::Ready(Ok(buffer)) => buffer,
|
||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(e)) => {
|
||||||
dbg!(e);
|
|
||||||
return Poll::Ready(Err(()));
|
return Poll::Ready(Err(()));
|
||||||
}
|
}
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
};
|
};
|
||||||
|
|
||||||
if filled == buf.len() {
|
if filled == buf.len() {
|
||||||
dbg!(line!());
|
|
||||||
return Poll::Ready(Err(()));
|
return Poll::Ready(Err(()));
|
||||||
} else {
|
} else {
|
||||||
filled = buf.len();
|
filled = buf.len();
|
||||||
|
@ -318,7 +213,6 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Some(Err(_)) | None => {
|
Some(Err(_)) | None => {
|
||||||
dbg!(line!());
|
|
||||||
return Poll::Ready(Err(()));
|
return Poll::Ready(Err(()));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -354,12 +248,14 @@ where
|
||||||
file
|
file
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut writer: Pin<Box<dyn AsyncWrite + Send>> = if let Some((enc, header)) = ENCRYPTION_KEY
|
let mut writer: Pin<Box<dyn AsyncWrite + Send>> = if let Some(key) = ENCRYPTION_KEY.get() {
|
||||||
.get()
|
let nonce = gen_nonce();
|
||||||
.map(|key| SecretStream::init_push(key).expect("Failed to init enc stream"))
|
file.write_all(nonce.as_ref()).await?;
|
||||||
{
|
Box::pin(EncryptedDiskWriter::new(
|
||||||
file.write_all(header.as_ref()).await?;
|
file,
|
||||||
Box::pin(EncryptedDiskWriter::new(file, enc))
|
XNonce::from_slice(nonce.as_ref()),
|
||||||
|
key,
|
||||||
|
))
|
||||||
} else {
|
} else {
|
||||||
Box::pin(file)
|
Box::pin(file)
|
||||||
};
|
};
|
||||||
|
@ -401,126 +297,76 @@ where
|
||||||
|
|
||||||
struct EncryptedDiskWriter {
|
struct EncryptedDiskWriter {
|
||||||
file: Pin<Box<File>>,
|
file: Pin<Box<File>>,
|
||||||
stream: Option<SecretStream<Push>>,
|
keystream: XChaCha20,
|
||||||
encryption_buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
write_buffer: Vec<u8>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EncryptedDiskWriter {
|
impl EncryptedDiskWriter {
|
||||||
fn new(file: File, stream: SecretStream<Push>) -> Self {
|
fn new(file: File, nonce: &XNonce, key: &Key) -> Self {
|
||||||
Self {
|
Self {
|
||||||
file: Box::pin(file),
|
file: Box::pin(file),
|
||||||
stream: Some(stream),
|
keystream: XChaCha20::new(key, nonce),
|
||||||
encryption_buffer: vec![],
|
buffer: vec![],
|
||||||
write_buffer: vec![],
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncWrite for EncryptedDiskWriter {
|
impl AsyncWrite for EncryptedDiskWriter {
|
||||||
|
#[inline]
|
||||||
fn poll_write(
|
fn poll_write(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &[u8],
|
buf: &[u8],
|
||||||
) -> Poll<Result<usize, std::io::Error>> {
|
) -> Poll<Result<usize, std::io::Error>> {
|
||||||
let new_self = Pin::into_inner(self);
|
let pinned = Pin::into_inner(self);
|
||||||
|
|
||||||
if let Some(stream) = new_self.stream.as_mut() {
|
let old_buffer_size = pinned.buffer.len();
|
||||||
stream
|
pinned.buffer.extend_from_slice(buf);
|
||||||
.push_to_vec(buf, None, Tag::Message, &mut new_self.encryption_buffer)
|
pinned
|
||||||
.expect("Failed to write encrypted data to buffer");
|
.keystream
|
||||||
}
|
.apply_keystream(&mut pinned.buffer[old_buffer_size..]);
|
||||||
|
match pinned.file.as_mut().poll_write(cx, &pinned.buffer) {
|
||||||
new_self
|
|
||||||
.write_buffer
|
|
||||||
.extend_from_slice(&new_self.encryption_buffer.len().to_le_bytes());
|
|
||||||
new_self.write_buffer.extend(&new_self.encryption_buffer);
|
|
||||||
|
|
||||||
match new_self
|
|
||||||
.file
|
|
||||||
.as_mut()
|
|
||||||
.poll_write(cx, &new_self.write_buffer)
|
|
||||||
{
|
|
||||||
Poll::Ready(Ok(n)) => {
|
Poll::Ready(Ok(n)) => {
|
||||||
let bytes_written = new_self.write_buffer.drain(..n).collect::<Vec<_>>();
|
pinned.buffer.drain(..n);
|
||||||
debug!(
|
|
||||||
"wrote message of size {:?}",
|
|
||||||
bytes_written.len() - std::mem::size_of::<usize>()
|
|
||||||
);
|
|
||||||
debug!(
|
|
||||||
"First 5 bytes {:x?}",
|
|
||||||
&bytes_written[std::mem::size_of::<usize>()..std::mem::size_of::<usize>() + 5]
|
|
||||||
);
|
|
||||||
debug!(
|
|
||||||
"Last 5 bytes {:x?}",
|
|
||||||
&bytes_written[bytes_written.len() - 5..]
|
|
||||||
);
|
|
||||||
// We buffered all the bytes that were provided to use.
|
|
||||||
Poll::Ready(Ok(buf.len()))
|
Poll::Ready(Ok(buf.len()))
|
||||||
}
|
}
|
||||||
poll => poll,
|
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
fn poll_flush(
|
fn poll_flush(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
if self.as_ref().write_buffer.is_empty() {
|
if self.buffer.is_empty() {
|
||||||
self.file.as_mut().poll_flush(cx)
|
self.as_mut().file.as_mut().poll_flush(cx)
|
||||||
} else {
|
} else {
|
||||||
let new_self = Pin::into_inner(self);
|
let pinned = Pin::into_inner(self);
|
||||||
|
while !pinned.buffer.is_empty() {
|
||||||
// Write as many bytes that we can immediately write
|
match pinned.file.as_mut().poll_write(cx, &pinned.buffer) {
|
||||||
loop {
|
|
||||||
match new_self
|
|
||||||
.file
|
|
||||||
.as_mut()
|
|
||||||
.poll_write(cx, &new_self.write_buffer)
|
|
||||||
{
|
|
||||||
Poll::Ready(Ok(n)) => {
|
Poll::Ready(Ok(n)) => {
|
||||||
let bytes_written = new_self.write_buffer.drain(..n).collect::<Vec<_>>();
|
pinned.buffer.drain(..n);
|
||||||
debug!(
|
|
||||||
"wrote message of size {:?}",
|
|
||||||
bytes_written.len() - std::mem::size_of::<usize>()
|
|
||||||
);
|
|
||||||
|
|
||||||
// We buffered all the bytes that were provided to use.
|
|
||||||
if new_self.write_buffer.is_empty() {
|
|
||||||
return Poll::Ready(Ok(()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
poll => return poll.map(|res| res.map(|_| ())),
|
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
||||||
|
Poll::Pending => return Poll::Pending,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
fn poll_shutdown(
|
fn poll_shutdown(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
let maybe_bytes = self
|
match self.as_mut().poll_flush(cx) {
|
||||||
.as_mut()
|
Poll::Ready(Ok(())) => self.as_mut().file.as_mut().poll_shutdown(cx),
|
||||||
.stream
|
poll => poll,
|
||||||
.take()
|
|
||||||
.map(|stream| stream.finalize(None));
|
|
||||||
|
|
||||||
// If we've yet to finalize the stream, finalize it and add the bytes to
|
|
||||||
// our writer buffer.
|
|
||||||
if let Some(Ok(bytes)) = maybe_bytes {
|
|
||||||
// We just need to push it into the buffer, we don't really care
|
|
||||||
// about the result, since we can check later
|
|
||||||
let _ = self.as_mut().write_buffer.extend_from_slice(&bytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now wait for us to fully flush out our write buffer
|
|
||||||
if self.as_mut().poll_flush(cx).is_pending() {
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write buffer is empty, flush file
|
|
||||||
self.file.as_mut().poll_shutdown(cx)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
12
src/cache/mem.rs
vendored
12
src/cache/mem.rs
vendored
|
@ -1,9 +1,7 @@
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::{
|
use super::{Cache, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream};
|
||||||
Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, InnerStream, MemStream,
|
|
||||||
};
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
|
@ -146,13 +144,9 @@ where
|
||||||
) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> {
|
) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> {
|
||||||
match self.mem_cache.lock().now_or_never() {
|
match self.mem_cache.lock().now_or_never() {
|
||||||
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
|
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
|
||||||
Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata))
|
Ok((CacheStream::Memory(MemStream(bytes.clone())), *metadata))
|
||||||
}) {
|
}) {
|
||||||
Some(v) => Some(v.and_then(|(inner, metadata)| {
|
Some(v) => Some(v),
|
||||||
CacheStream::new(inner, None)
|
|
||||||
.map(|v| (v, metadata))
|
|
||||||
.map_err(|_| CacheError::DecryptionFailure)
|
|
||||||
})),
|
|
||||||
None => self.inner.get(key).await,
|
None => self.inner.get(key).await,
|
||||||
},
|
},
|
||||||
None => self.inner.get(key).await,
|
None => self.inner.get(key).await,
|
||||||
|
|
52
src/cache/mod.rs
vendored
52
src/cache/mod.rs
vendored
|
@ -8,12 +8,12 @@ use std::task::{Context, Poll};
|
||||||
use actix_web::http::HeaderValue;
|
use actix_web::http::HeaderValue;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
|
use chacha20::Key;
|
||||||
use chrono::{DateTime, FixedOffset};
|
use chrono::{DateTime, FixedOffset};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||||
use sodiumoxide::crypto::secretstream::{Header, Key, Pull, Stream as SecretStream};
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
|
@ -231,56 +231,12 @@ impl<T: CallbackCache> CallbackCache for Arc<T> {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub enum CacheStream {
|
||||||
pub struct CacheStream {
|
|
||||||
inner: InnerStream,
|
|
||||||
decrypt: Option<SecretStream<Pull>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CacheStream {
|
|
||||||
pub(self) fn new(inner: InnerStream, header: Option<Header>) -> Result<Self, ()> {
|
|
||||||
Ok(Self {
|
|
||||||
inner,
|
|
||||||
decrypt: header
|
|
||||||
.and_then(|header| {
|
|
||||||
ENCRYPTION_KEY
|
|
||||||
.get()
|
|
||||||
.map(|key| SecretStream::init_pull(&header, key))
|
|
||||||
})
|
|
||||||
.transpose()?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Stream for CacheStream {
|
|
||||||
type Item = CacheStreamItem;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
self.inner.poll_next_unpin(cx).map(|data| {
|
|
||||||
// False positive (`data`): https://link.eddie.sh/r1fXX
|
|
||||||
#[allow(clippy::option_if_let_else)]
|
|
||||||
if let Some(keystream) = self.decrypt.as_mut() {
|
|
||||||
data.map(|bytes_res| {
|
|
||||||
bytes_res.and_then(|bytes| {
|
|
||||||
keystream
|
|
||||||
.pull(&bytes, None)
|
|
||||||
.map(|(data, _tag)| Bytes::from(data))
|
|
||||||
.map_err(|_| UpstreamError)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
data
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(self) enum InnerStream {
|
|
||||||
Memory(MemStream),
|
Memory(MemStream),
|
||||||
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>),
|
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<CachedImage> for InnerStream {
|
impl From<CachedImage> for CacheStream {
|
||||||
fn from(image: CachedImage) -> Self {
|
fn from(image: CachedImage) -> Self {
|
||||||
Self::Memory(MemStream(image.0))
|
Self::Memory(MemStream(image.0))
|
||||||
}
|
}
|
||||||
|
@ -288,7 +244,7 @@ impl From<CachedImage> for InnerStream {
|
||||||
|
|
||||||
type CacheStreamItem = Result<Bytes, UpstreamError>;
|
type CacheStreamItem = Result<Bytes, UpstreamError>;
|
||||||
|
|
||||||
impl Stream for InnerStream {
|
impl Stream for CacheStream {
|
||||||
type Item = CacheStreamItem;
|
type Item = CacheStreamItem;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
|
|
@ -14,10 +14,11 @@ use actix_web::rt::{spawn, time, System};
|
||||||
use actix_web::web::{self, Data};
|
use actix_web::web::{self, Data};
|
||||||
use actix_web::{App, HttpResponse, HttpServer};
|
use actix_web::{App, HttpResponse, HttpServer};
|
||||||
use cache::{Cache, DiskCache};
|
use cache::{Cache, DiskCache};
|
||||||
|
use chacha20::Key;
|
||||||
use config::Config;
|
use config::Config;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rustls::{NoClientAuth, ServerConfig};
|
use rustls::{NoClientAuth, ServerConfig};
|
||||||
use sodiumoxide::crypto::secretstream::gen_key;
|
use sodiumoxide::crypto::stream::xchacha20::gen_key;
|
||||||
use state::{RwLockServerState, ServerState};
|
use state::{RwLockServerState, ServerState};
|
||||||
use stop::send_stop;
|
use stop::send_stop;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
@ -96,7 +97,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
|
||||||
if config.ephemeral_disk_encryption {
|
if config.ephemeral_disk_encryption {
|
||||||
info!("Running with at-rest encryption!");
|
info!("Running with at-rest encryption!");
|
||||||
ENCRYPTION_KEY.set(gen_key()).unwrap();
|
ENCRYPTION_KEY
|
||||||
|
.set(*Key::from_slice(gen_key().as_ref()))
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.enable_metrics {
|
if config.enable_metrics {
|
||||||
|
|
Loading…
Reference in a new issue