diff --git a/Cargo.lock b/Cargo.lock index a32523f..c9ef544 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,9 +456,9 @@ dependencies = [ [[package]] name = "const_fn" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "402da840495de3f976eaefc3485b7f5eb5b0bf9761f9a47be27fe975b3b8c2ec" +checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7" [[package]] name = "convert_case" @@ -479,9 +479,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.1.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec1028182c380cc45a2e2c5ec841134f2dfd0f8f5f0a5bcd68004f81b5efdf4" +checksum = "ed00c67cb5d0a7d64a44f6ad2668db7e7530311dd53ea79bcd4fb022c64911c8" dependencies = [ "libc", ] @@ -547,9 +547,9 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.13" +version = "0.99.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f82b1b72f1263f214c0f823371768776c4f5841b942c9883aa8e5ec584fd0ba6" +checksum = "5cc7b9cef1e351660e5443924e4f43ab25fbbed3e9a5f052df3677deb4d6b320" dependencies = [ "convert_case", "proc-macro2", @@ -1522,18 +1522,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.125" +version = "1.0.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171" +checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.125" +version = "1.0.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d" +checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" dependencies = [ "proc-macro2", "quote", @@ -1874,9 +1874,9 @@ dependencies = [ [[package]] name = "terminal_size" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ca8ced750734db02076f44132d802af0b33b09942331f4459dde8636fd2406" +checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" dependencies = [ "libc", "winapi", @@ -1977,9 +1977,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" +checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37" dependencies = [ "autocfg", "bytes", @@ -1997,9 +1997,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" dependencies = [ "proc-macro2", "quote", @@ -2019,9 +2019,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0" +checksum = "f8864d706fdb3cc0843a49647ac892720dac98a6eeb818b77190592cf4994066" dependencies = [ "futures-core", "pin-project-lite", @@ -2031,9 +2031,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" dependencies = [ "bytes", "futures-core", diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 38259f0..bec91e8 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -17,10 +17,12 @@ use actix_web::error::PayloadError; use bytes::{Buf, Bytes, BytesMut}; use futures::{Future, Stream, StreamExt}; -use log::debug; +use log::{debug, warn}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use sodiumoxide::crypto::secretstream::Header; +use sodiumoxide::crypto::secretstream::{ + Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES, +}; use std::collections::HashMap; use std::error::Error; use std::fmt::Display; @@ -30,14 +32,19 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{create_dir_all, remove_file, File}; -use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf}; +use tokio::io::{ + AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader, + ReadBuf, +}; use tokio::sync::mpsc::Sender; use tokio::sync::watch::{channel, Receiver}; use tokio::sync::RwLock; use tokio_stream::wrappers::WatchStream; use tokio_util::codec::{BytesCodec, FramedRead}; -use super::{BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream}; +use super::{ + BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream, ENCRYPTION_KEY, +}; #[derive(Serialize, Deserialize)] pub enum OnDiskMetadata { @@ -71,28 +78,118 @@ pub(super) async fn read_file( path: &Path, ) -> Option, ImageMetadata), std::io::Error>> { let std_file = std::fs::File::open(path).ok()?; - let file = File::from_std(std_file.try_clone().ok()?); + let mut file = File::from_std(std_file.try_clone().ok()?); + + let mut reader = { + // If the encryption key was set, use the encrypted disk reader instead; + // else, just directly read from file. + let inner_reader: Pin> = if let Some(key) = ENCRYPTION_KEY.get() { + let mut header_bytes = [0; HEADERBYTES]; + if file.read_exact(&mut header_bytes).await.is_err() { + warn!("Found file, but encrypted header was not found. Assuming corrupted!"); + return None; + } + + Box::pin(EncryptedDiskReader::new( + file, + SecretStream::init_pull( + &Header::from_slice(&header_bytes).expect("failed to get header"), + key, + ) + .expect("Failed to initialize decryption kesy"), + )) + } else { + Box::pin(file) + }; + + BufReader::new(inner_reader) + }; let metadata = { - let mut de = serde_json::Deserializer::from_reader(std_file); - ImageMetadata::deserialize(&mut de).ok()? + let mut read = String::new(); + reader + .read_line(&mut read) + .await + .expect("failed to read metadata"); + serde_json::from_str(&read).ok()? }; + let reader = Box::pin(reader); + // False positive lint, `file` is used in both cases, which means that it's // not possible to move this into a map_or_else without cloning `file`. #[allow(clippy::option_if_let_else)] let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) { - InnerStream::Concurrent(ConcurrentFsStream::from_file( - file, + InnerStream::Concurrent(ConcurrentFsStream::from_reader( + reader, WatchStream::new(status), )) } else { - InnerStream::Completed(FramedRead::new(BufReader::new(file), BytesCodec::new())) + InnerStream::Completed(FramedRead::new(reader, BytesCodec::new())) }; Some(Ok((stream, None, metadata))) } +struct EncryptedDiskReader { + file: Pin>, + stream: SecretStream, + buf: Vec, +} + +impl EncryptedDiskReader { + fn new(file: File, stream: SecretStream) -> Self { + Self { + file: Box::pin(file), + stream, + buf: vec![], + } + } +} + +impl AsyncRead for EncryptedDiskReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let cursor_start = buf.filled().len(); + + let res = self.as_mut().file.as_mut().poll_read(cx, buf); + if res.is_pending() { + return Poll::Pending; + } + + let cursor_new = buf.filled().len(); + + // pull_to_vec internally calls vec.clear() and vec.reserve(). Generally + // speaking we should be reading about the same amount of data each time + // so we shouldn't experience too much of a slow down w.r.t resizing the + // buffer... + let new_self = Pin::into_inner(self); + new_self + .stream + .pull_to_vec( + &buf.filled()[cursor_start..cursor_new], + None, + &mut new_self.buf, + ) + .unwrap(); + + // data is strictly smaller than the encrypted stream, since you need to + // encode tags as well, so this is always safe. + + // rewrite encrypted data into decrypted data + let buffer = buf.filled_mut(); + for (old, new) in buffer[cursor_start..].iter_mut().zip(&new_self.buf) { + *old = *new; + } + buf.set_filled(cursor_start + &new_self.buf.len()); + + res + } +} + /// Writes the metadata and input stream (in that order) to a file, returning a /// stream that reads from that file. Accepts a db callback function that is /// provided the number of bytes written, and an optional on-complete callback @@ -111,7 +208,7 @@ where { let (tx, rx) = channel(WritingStatus::NotDone); - let mut file = { + let file = { let mut write_lock = WRITING_STATUS.write().await; let parent = path.parent().expect("The path to have a parent"); create_dir_all(parent).await?; @@ -122,6 +219,17 @@ where let metadata_string = serde_json::to_string(&metadata).expect("serialization to work"); let metadata_size = metadata_string.len(); + + let (mut writer, maybe_header): (Pin>, _) = + if let Some((enc, header)) = ENCRYPTION_KEY + .get() + .map(|key| SecretStream::init_push(key).expect("Failed to init enc stream")) + { + (Box::pin(EncryptedDiskWriter::new(file, enc)), Some(header)) + } else { + (Box::pin(file), None) + }; + // need owned variant because async lifetime let path_buf = path.to_path_buf(); tokio::spawn(async move { @@ -130,7 +238,8 @@ where let mut bytes_written: u32 = 0; let mut acc_bytes = BytesMut::new(); let accumulate = on_complete.is_some(); - file.write_all(metadata_string.as_bytes()).await?; + writer.write_all(metadata_string.as_bytes()).await?; + writer.write_all(b"\n").await?; while let Some(bytes) = byte_stream.next().await { if let Ok(mut bytes) = bytes { @@ -139,7 +248,7 @@ where } loop { - match file.write(&bytes).await? { + match writer.write(&bytes).await? { 0 => break, n => { bytes.advance(n); @@ -162,8 +271,8 @@ where // We don't care about the result of the call. std::mem::drop(remove_file(&path_buf).await); } else { - file.flush().await?; - file.sync_all().await?; // we need metadata + writer.flush().await?; + // writer.sync_all().await?; // we need metadata debug!("writing to file done"); } @@ -204,13 +313,81 @@ where InnerStream::Concurrent( ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?, ), - None, + maybe_header, )) } +struct EncryptedDiskWriter { + file: Pin>, + stream: Option>, + encryption_buffer: Vec, + write_buffer: Vec, +} + +impl EncryptedDiskWriter { + fn new(file: File, stream: SecretStream) -> Self { + Self { + file: Box::pin(file), + stream: Some(stream), + encryption_buffer: vec![], + write_buffer: vec![], + } + } +} + +impl AsyncWrite for EncryptedDiskWriter { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let new_self = Pin::into_inner(self); + { + let encryption_buffer = &mut new_self.encryption_buffer; + new_self.stream.as_mut().map(|stream| { + stream + .push_to_vec(buf, None, Tag::Message, encryption_buffer) + .expect("Failed to write encrypted data to buffer"); + }); + } + + new_self.write_buffer.extend(&new_self.encryption_buffer); + + match new_self + .file + .as_mut() + .poll_write(cx, &new_self.write_buffer) + { + Poll::Ready(Ok(n)) => { + new_self.write_buffer.drain(..n); + Poll::Ready(Ok(n)) + } + poll => poll, + } + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.file.as_mut().poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.as_mut() + .stream + .take() + .map(|stream| stream.finalize(None)); + self.file.as_mut().poll_shutdown(cx) + } +} + pub struct ConcurrentFsStream { /// The File to read from - file: Pin>>, + reader: Pin>, /// The channel to get updates from. The writer must send its status, else /// this reader will never complete. receiver: Pin>>, @@ -229,12 +406,15 @@ impl ConcurrentFsStream { ) -> Result { let mut file = File::open(path).await?; file.seek(SeekFrom::Start(seek as u64)).await?; - Ok(Self::from_file(file, receiver)) + Ok(Self::from_reader(Box::pin(file), receiver)) } - fn from_file(file: File, receiver: WatchStream) -> Self { + fn from_reader( + reader: Pin>, + receiver: WatchStream, + ) -> Self { Self { - file: Box::pin(BufReader::new(file)), + reader: Box::pin(reader), receiver: Box::pin(receiver), bytes_read: 0, bytes_total: None, @@ -263,7 +443,7 @@ impl Stream for ConcurrentFsStream { // TODO: Might be more efficient to have a larger buffer let mut bytes = [0; 4 * 1024].to_vec(); let mut buffer = ReadBuf::new(&mut bytes); - match self.file.as_mut().poll_read(cx, &mut buffer) { + match self.reader.as_mut().poll_read(cx, &mut buffer) { Poll::Ready(Ok(_)) => (), Poll::Ready(Err(_)) => return Poll::Ready(Some(Err(UpstreamError))), Poll::Pending => return Poll::Pending, @@ -327,3 +507,9 @@ enum WritingStatus { Done(u32), Error, } + +#[cfg(test)] +mod storage { + #[test] + fn wut() {} +} diff --git a/src/cache/mem.rs b/src/cache/mem.rs index 7276ce2..0c1f640 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -5,7 +5,8 @@ use std::sync::Arc; use crate::cache::DiskCache; use super::{ - BoxedImageStream, Cache, CacheKey, CacheStream, ImageMetadata, InnerStream, MemStream, + BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata, InnerStream, + MemStream, }; use async_trait::async_trait; use bytes::Bytes; @@ -86,7 +87,11 @@ impl Cache for MemoryCache match mem_cache.get(key).map(|(bytes, metadata, _)| { Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata)) }) { - Some(v) => Some(v.map(|(inner, metadata)| todo!())), + Some(v) => Some(v.and_then(|(inner, metadata)| { + CacheStream::new(inner, None) + .map(|v| (v, metadata)) + .map_err(|_| CacheError::DecryptionFailure) + })), None => self.inner.get(key).await, }, None => self.inner.get(key).await, diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 0152516..069b0e8 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -10,13 +10,12 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, FixedOffset}; use fs::ConcurrentFsStream; use futures::{Stream, StreamExt}; -use once_cell::sync::Lazy; +use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; -use sodiumoxide::crypto::secretstream::{gen_key, Header, Key, Pull, Stream as SecretStream}; +use sodiumoxide::crypto::secretstream::{Header, Key, Pull, Stream as SecretStream}; use thiserror::Error; -use tokio::fs::File; -use tokio::io::BufReader; +use tokio::io::AsyncRead; use tokio::sync::mpsc::Sender; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -24,7 +23,7 @@ pub use disk::DiskCache; pub use fs::UpstreamError; pub use mem::MemoryCache; -static ENCRYPTION_KEY: Lazy = Lazy::new(gen_key); +pub static ENCRYPTION_KEY: OnceCell = OnceCell::new(); mod disk; mod fs; @@ -216,7 +215,7 @@ impl CacheStream { Ok(Self { inner, decrypt: header - .map(|header| SecretStream::init_pull(&header, &ENCRYPTION_KEY)) + .map(|header| SecretStream::init_pull(&header, &ENCRYPTION_KEY.get().unwrap())) .transpose()?, }) } @@ -248,7 +247,7 @@ impl Stream for CacheStream { pub(self) enum InnerStream { Concurrent(ConcurrentFsStream), Memory(MemStream), - Completed(FramedRead, BytesCodec>), + Completed(FramedRead>, BytesCodec>), } impl From for InnerStream { diff --git a/src/main.rs b/src/main.rs index 45ae2ea..c3b9312 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,15 +18,16 @@ use actix_web::{App, HttpServer}; use cache::{Cache, DiskCache}; use clap::Clap; use config::CliArgs; -use log::{debug, error, warn, LevelFilter}; +use log::{debug, error, info, warn, LevelFilter}; use parking_lot::RwLock; use rustls::{NoClientAuth, ServerConfig}; use simple_logger::SimpleLogger; +use sodiumoxide::crypto::secretstream::gen_key; use state::{RwLockServerState, ServerState}; use stop::send_stop; use thiserror::Error; -use crate::cache::{mem, MemoryCache}; +use crate::cache::{mem, MemoryCache, ENCRYPTION_KEY}; use crate::config::UnstableOptions; use crate::state::DynamicServerCert; @@ -76,6 +77,7 @@ async fn main() -> Result<(), Box> { (0, 0) => LevelFilter::Info, (_, 1) => LevelFilter::Debug, (_, n) if n > 1 => LevelFilter::Trace, + // compiler can't figure it out _ => unsafe { unreachable_unchecked() }, }; @@ -94,6 +96,11 @@ async fn main() -> Result<(), Box> { }; let client_secret_1 = client_secret.clone(); + if cli_args.ephemeral_disk_encryption { + info!("Running with at-rest encryption!"); + ENCRYPTION_KEY.set(gen_key()).unwrap(); + } + let server = ServerState::init(&client_secret, &cli_args).await?; let data_0 = Arc::new(RwLockServerState(RwLock::new(server))); let data_1 = Arc::clone(&data_0); @@ -119,7 +126,7 @@ async fn main() -> Result<(), Box> { if running_2.load(Ordering::SeqCst) { send_stop(&client_secret).await; } else { - warn!("Got second ctrl-c, forcefully exiting"); + warn!("Got second Ctrl-C, forcefully exiting"); system.stop() } }); @@ -159,7 +166,6 @@ async fn main() -> Result<(), Box> { }) .shutdown_timeout(60) .bind_rustls(format!("0.0.0.0:{}", port), tls_config)? - // .bind(format!("0.0.0.0:{}", port))? .run() .await?;