From 6ac85821832bdbe67dca72cd63d1f886966eeb89 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Tue, 13 Jul 2021 20:38:01 -0400 Subject: [PATCH] Encrypted files work in debug mode --- Cargo.lock | 21 ++++ Cargo.toml | 1 + src/cache/disk.rs | 4 +- src/cache/fs.rs | 286 +++++++++++----------------------------------- src/cache/mem.rs | 12 +- src/cache/mod.rs | 52 +-------- src/main.rs | 7 +- 7 files changed, 101 insertions(+), 282 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b26f85b..d962233 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -418,6 +418,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chacha20" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fee7ad89dc1128635074c268ee661f90c3f7e83d9fd12910608c36b47d6c3412" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "chrono" version = "0.4.19" @@ -432,6 +443,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + [[package]] name = "clap" version = "3.0.0-beta.2" @@ -1177,6 +1197,7 @@ dependencies = [ "base64", "bincode", "bytes", + "chacha20", "chrono", "clap", "ctrlc", diff --git a/Cargo.toml b/Cargo.toml index 2b6ec32..71f9616 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ async-trait = "0.1" base64 = "0.13" bincode = "1" bytes = "1" +chacha20 = "0.7" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] } ctrlc = "3" diff --git a/src/cache/disk.rs b/src/cache/disk.rs index 99729bf..7aa9a54 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -336,9 +336,7 @@ impl Cache for DiskCache { tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await }); super::fs::read_file_from_path(&path).await.map(|res| { - let (inner, maybe_header, metadata) = res?; - CacheStream::new(inner, maybe_header) - .map(|stream| (stream, metadata)) + res.map(|(stream, _, metadata)| (stream, metadata)) .map_err(|_| CacheError::DecryptionFailure) }) } diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 3c9f619..a9dfabc 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -14,7 +14,6 @@ //! upstream no longer needs to process duplicate requests and sequential cache //! misses are treated as closer as a cache hit. -use std::convert::TryInto; use std::error::Error; use std::fmt::Display; use std::io::{Seek, SeekFrom}; @@ -25,11 +24,11 @@ use std::task::{Context, Poll}; use actix_web::error::PayloadError; use async_trait::async_trait; use bytes::Bytes; +use chacha20::cipher::{NewCipher, StreamCipher}; +use chacha20::{Key, XChaCha20, XNonce}; use futures::Future; -use serde::{Deserialize, Serialize}; -use sodiumoxide::crypto::secretstream::{ - Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES, -}; +use serde::Deserialize; +use sodiumoxide::crypto::stream::xchacha20::{gen_nonce, NONCEBYTES}; use tokio::fs::{create_dir_all, remove_file, File}; use tokio::io::{ AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader, @@ -40,13 +39,7 @@ use tokio_util::codec::{BytesCodec, FramedRead}; use tracing::{debug, instrument, warn}; use super::compat::LegacyImageMetadata; -use super::{CacheKey, ImageMetadata, InnerStream, ENCRYPTION_KEY}; - -#[derive(Serialize, Deserialize)] -pub enum OnDiskMetadata { - Encrypted(Header, ImageMetadata), - Plaintext(ImageMetadata), -} +use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY}; /// Attempts to lookup the file on disk, returning a byte stream if it exists. /// Note that this could return two types of streams, depending on if the file @@ -54,14 +47,14 @@ pub enum OnDiskMetadata { #[inline] pub(super) async fn read_file_from_path( path: &Path, -) -> Option, ImageMetadata), std::io::Error>> { +) -> Option, ImageMetadata), std::io::Error>> { read_file(std::fs::File::open(path).ok()?).await } #[instrument(level = "debug")] async fn read_file( file: std::fs::File, -) -> Option, ImageMetadata), std::io::Error>> { +) -> Option, ImageMetadata), std::io::Error>> { let mut file_0 = file.try_clone().unwrap(); let file_1 = file.try_clone().unwrap(); @@ -102,31 +95,20 @@ async fn read_file( // If the encryption key was set, use the encrypted disk reader instead; // else, just directly read from file. if let Some(key) = ENCRYPTION_KEY.get() { - let mut header_bytes = [0; HEADERBYTES]; - if let Err(e) = file.read_exact(&mut header_bytes).await { + let mut nonce_bytes = [0; NONCEBYTES]; + if let Err(e) = file.read_exact(&mut nonce_bytes).await { warn!("Found file but failed reading header: {}", e); return None; } - debug!("header bytes: {:x?}", header_bytes); + debug!("header bytes: {:x?}", nonce_bytes); - let file_header = if let Some(header) = Header::from_slice(&header_bytes) { - header - } else { - warn!("Found file, but encrypted header was invalid. Assuming corrupted!"); - return None; - }; - - let secret_stream = if let Ok(stream) = SecretStream::init_pull(&file_header, key) { - debug!("Valid header found!"); - stream - } else { - warn!("Failed to init secret stream with key and header. Assuming corrupted!"); - return None; - }; - - maybe_header = Some(file_header); - reader = Some(Box::pin(EncryptedDiskReader::new(file, secret_stream))); + maybe_header = Some(*XNonce::from_slice(&nonce_bytes)); + reader = Some(Box::pin(BufReader::new(EncryptedDiskReader::new( + file, + XNonce::from_slice(&XNonce::from_slice(&nonce_bytes)), + key, + )))); } parsed_metadata = if let Some(reader) = reader.as_mut() { @@ -151,7 +133,7 @@ async fn read_file( if let Some(reader) = reader { let stream = - InnerStream::Completed(FramedRead::new(reader as Pin>, BytesCodec::new())); + CacheStream::Completed(FramedRead::new(reader as Pin>, BytesCodec::new())); parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata))) } else { debug!("Reader was invalid, file is corrupt"); @@ -161,21 +143,14 @@ async fn read_file( struct EncryptedDiskReader { file: Pin>, - stream: SecretStream, - // Bytes we read from the secret stream - read_buffer: Box<[u8; 4096]>, - read_data: Vec, - decryption_buffer: Vec, + keystream: XChaCha20, } impl EncryptedDiskReader { - fn new(file: File, stream: SecretStream) -> Self { + fn new(file: File, nonce: &XNonce, key: &Key) -> Self { Self { file: Box::pin(file), - stream, - read_buffer: Box::new([0; 4096]), - read_data: Vec::with_capacity(4096), - decryption_buffer: Vec::with_capacity(4096), + keystream: XChaCha20::new(key, nonce), } } } @@ -199,91 +174,13 @@ impl AsyncRead for EncryptedDiskReader { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let buf_res = match self.as_mut().poll_fill_buf(cx) { - Poll::Ready(Ok(bytes)) => bytes, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Pending => return Poll::Pending, - }; - let size = buf.capacity().min(buf_res.len()); - buf.put_slice(&buf_res[..size]); - self.as_mut().consume(size); - Poll::Ready(Ok(())) - } -} - -impl AsyncBufRead for EncryptedDiskReader { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let pinned_self = Pin::into_inner(self); - let mut read_buf = ReadBuf::new(pinned_self.read_buffer.as_mut()); - - // // Load content from the leftovers - // read_buf.put_slice(&pinned_self.read_leftovers); - - let msg = loop { - let prev_len = read_buf.filled().len(); - // First, try and read from the underlying file. - let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf); - - // Wait until we have something to read - if read_res.is_pending() { - dbg!(line!()); - return Poll::Pending; - } - - if prev_len == read_buf.filled().len() && pinned_self.read_data.is_empty() { - dbg!(line!()); - return Poll::Ready(Ok(&[])); - } - - pinned_self.read_data.extend_from_slice(read_buf.filled()); - - if dbg!(pinned_self.read_data.len()) >= std::mem::size_of::() { - let (size, rest) = pinned_self.read_data.split_at(std::mem::size_of::()); - let size = usize::from_le_bytes(size.try_into().unwrap()); - if dbg!(size) <= dbg!(rest.len()) { - let (data, leftovers) = rest.split_at(size); - // Extract data into their own vec - let data = data.to_vec(); - - // temporarily store leftovers in decryption buffer, - // then move leftovers back into read_data. - // This is to avoid an alloc - pinned_self.decryption_buffer.clear(); - pinned_self.decryption_buffer.extend_from_slice(&leftovers); - pinned_self.read_data.clear(); - pinned_self - .read_data - .extend_from_slice(&pinned_self.decryption_buffer); - dbg!(line!()); - break data; - } - } - }; - - debug!("read message of size {:?}", msg.len()); - debug!("First 5 bytes {:x?}", &msg[..5]); - debug!("Last 5 bytes {:x?}", &msg[msg.len() - 5..]); - - if pinned_self - .stream - .pull_to_vec(&msg, None, &mut pinned_self.decryption_buffer) - .is_err() - { - dbg!(line!()); - return Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to decrypt data", - ))); - } - - // debug!("decrypted read: {:x?}", &pinned_self.decryption_buffer); - - dbg!(line!()); - Poll::Ready(Ok(&pinned_self.decryption_buffer)) - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - self.as_mut().decryption_buffer.drain(..amt); + let previously_read = buf.filled().len(); + let res = self.as_mut().file.as_mut().poll_read(cx, buf); + let bytes_modified = buf.filled().len() - previously_read; + self.keystream.apply_keystream( + &mut buf.filled_mut()[previously_read..previously_read + bytes_modified], + ); + res } } @@ -298,14 +195,12 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> { let buf = match self.0.as_mut().poll_fill_buf(cx) { Poll::Ready(Ok(buffer)) => buffer, Poll::Ready(Err(e)) => { - dbg!(e); return Poll::Ready(Err(())); } Poll::Pending => return Poll::Pending, }; if filled == buf.len() { - dbg!(line!()); return Poll::Ready(Err(())); } else { filled = buf.len(); @@ -318,7 +213,6 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> { continue; } Some(Err(_)) | None => { - dbg!(line!()); return Poll::Ready(Err(())); } }; @@ -354,12 +248,14 @@ where file }; - let mut writer: Pin> = if let Some((enc, header)) = ENCRYPTION_KEY - .get() - .map(|key| SecretStream::init_push(key).expect("Failed to init enc stream")) - { - file.write_all(header.as_ref()).await?; - Box::pin(EncryptedDiskWriter::new(file, enc)) + let mut writer: Pin> = if let Some(key) = ENCRYPTION_KEY.get() { + let nonce = gen_nonce(); + file.write_all(nonce.as_ref()).await?; + Box::pin(EncryptedDiskWriter::new( + file, + XNonce::from_slice(nonce.as_ref()), + key, + )) } else { Box::pin(file) }; @@ -401,126 +297,76 @@ where struct EncryptedDiskWriter { file: Pin>, - stream: Option>, - encryption_buffer: Vec, - write_buffer: Vec, + keystream: XChaCha20, + buffer: Vec, } impl EncryptedDiskWriter { - fn new(file: File, stream: SecretStream) -> Self { + fn new(file: File, nonce: &XNonce, key: &Key) -> Self { Self { file: Box::pin(file), - stream: Some(stream), - encryption_buffer: vec![], - write_buffer: vec![], + keystream: XChaCha20::new(key, nonce), + buffer: vec![], } } } impl AsyncWrite for EncryptedDiskWriter { + #[inline] fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let new_self = Pin::into_inner(self); + let pinned = Pin::into_inner(self); - if let Some(stream) = new_self.stream.as_mut() { - stream - .push_to_vec(buf, None, Tag::Message, &mut new_self.encryption_buffer) - .expect("Failed to write encrypted data to buffer"); - } - - new_self - .write_buffer - .extend_from_slice(&new_self.encryption_buffer.len().to_le_bytes()); - new_self.write_buffer.extend(&new_self.encryption_buffer); - - match new_self - .file - .as_mut() - .poll_write(cx, &new_self.write_buffer) - { + let old_buffer_size = pinned.buffer.len(); + pinned.buffer.extend_from_slice(buf); + pinned + .keystream + .apply_keystream(&mut pinned.buffer[old_buffer_size..]); + match pinned.file.as_mut().poll_write(cx, &pinned.buffer) { Poll::Ready(Ok(n)) => { - let bytes_written = new_self.write_buffer.drain(..n).collect::>(); - debug!( - "wrote message of size {:?}", - bytes_written.len() - std::mem::size_of::() - ); - debug!( - "First 5 bytes {:x?}", - &bytes_written[std::mem::size_of::()..std::mem::size_of::() + 5] - ); - debug!( - "Last 5 bytes {:x?}", - &bytes_written[bytes_written.len() - 5..] - ); - // We buffered all the bytes that were provided to use. + pinned.buffer.drain(..n); Poll::Ready(Ok(buf.len())) } - poll => poll, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, } } + #[inline] fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.as_ref().write_buffer.is_empty() { - self.file.as_mut().poll_flush(cx) + if self.buffer.is_empty() { + self.as_mut().file.as_mut().poll_flush(cx) } else { - let new_self = Pin::into_inner(self); - - // Write as many bytes that we can immediately write - loop { - match new_self - .file - .as_mut() - .poll_write(cx, &new_self.write_buffer) - { + let pinned = Pin::into_inner(self); + while !pinned.buffer.is_empty() { + match pinned.file.as_mut().poll_write(cx, &pinned.buffer) { Poll::Ready(Ok(n)) => { - let bytes_written = new_self.write_buffer.drain(..n).collect::>(); - debug!( - "wrote message of size {:?}", - bytes_written.len() - std::mem::size_of::() - ); - - // We buffered all the bytes that were provided to use. - if new_self.write_buffer.is_empty() { - return Poll::Ready(Ok(())); - } + pinned.buffer.drain(..n); } - poll => return poll.map(|res| res.map(|_| ())), + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, } } + + Poll::Ready(Ok(())) } } + #[inline] fn poll_shutdown( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let maybe_bytes = self - .as_mut() - .stream - .take() - .map(|stream| stream.finalize(None)); - - // If we've yet to finalize the stream, finalize it and add the bytes to - // our writer buffer. - if let Some(Ok(bytes)) = maybe_bytes { - // We just need to push it into the buffer, we don't really care - // about the result, since we can check later - let _ = self.as_mut().write_buffer.extend_from_slice(&bytes); + match self.as_mut().poll_flush(cx) { + Poll::Ready(Ok(())) => self.as_mut().file.as_mut().poll_shutdown(cx), + poll => poll, } - - // Now wait for us to fully flush out our write buffer - if self.as_mut().poll_flush(cx).is_pending() { - return Poll::Pending; - } - - // Write buffer is empty, flush file - self.file.as_mut().poll_shutdown(cx) } } diff --git a/src/cache/mem.rs b/src/cache/mem.rs index 1a7060a..49d87c2 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -1,9 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use super::{ - Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, InnerStream, MemStream, -}; +use super::{Cache, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream}; use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; @@ -146,13 +144,9 @@ where ) -> Option> { match self.mem_cache.lock().now_or_never() { Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| { - Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata)) + Ok((CacheStream::Memory(MemStream(bytes.clone())), *metadata)) }) { - Some(v) => Some(v.and_then(|(inner, metadata)| { - CacheStream::new(inner, None) - .map(|v| (v, metadata)) - .map_err(|_| CacheError::DecryptionFailure) - })), + Some(v) => Some(v), None => self.inner.get(key).await, }, None => self.inner.get(key).await, diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 36ee35c..0562ddc 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -8,12 +8,12 @@ use std::task::{Context, Poll}; use actix_web::http::HeaderValue; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; +use chacha20::Key; use chrono::{DateTime, FixedOffset}; use futures::{Stream, StreamExt}; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; -use sodiumoxide::crypto::secretstream::{Header, Key, Pull, Stream as SecretStream}; use thiserror::Error; use tokio::sync::mpsc::Sender; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -231,56 +231,12 @@ impl CallbackCache for Arc { .await } } - -pub struct CacheStream { - inner: InnerStream, - decrypt: Option>, -} - -impl CacheStream { - pub(self) fn new(inner: InnerStream, header: Option
) -> Result { - Ok(Self { - inner, - decrypt: header - .and_then(|header| { - ENCRYPTION_KEY - .get() - .map(|key| SecretStream::init_pull(&header, key)) - }) - .transpose()?, - }) - } -} - -impl Stream for CacheStream { - type Item = CacheStreamItem; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_next_unpin(cx).map(|data| { - // False positive (`data`): https://link.eddie.sh/r1fXX - #[allow(clippy::option_if_let_else)] - if let Some(keystream) = self.decrypt.as_mut() { - data.map(|bytes_res| { - bytes_res.and_then(|bytes| { - keystream - .pull(&bytes, None) - .map(|(data, _tag)| Bytes::from(data)) - .map_err(|_| UpstreamError) - }) - }) - } else { - data - } - }) - } -} - -pub(self) enum InnerStream { +pub enum CacheStream { Memory(MemStream), Completed(FramedRead>, BytesCodec>), } -impl From for InnerStream { +impl From for CacheStream { fn from(image: CachedImage) -> Self { Self::Memory(MemStream(image.0)) } @@ -288,7 +244,7 @@ impl From for InnerStream { type CacheStreamItem = Result; -impl Stream for InnerStream { +impl Stream for CacheStream { type Item = CacheStreamItem; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/main.rs b/src/main.rs index b1fa3ce..b4d3485 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,10 +14,11 @@ use actix_web::rt::{spawn, time, System}; use actix_web::web::{self, Data}; use actix_web::{App, HttpResponse, HttpServer}; use cache::{Cache, DiskCache}; +use chacha20::Key; use config::Config; use parking_lot::RwLock; use rustls::{NoClientAuth, ServerConfig}; -use sodiumoxide::crypto::secretstream::gen_key; +use sodiumoxide::crypto::stream::xchacha20::gen_key; use state::{RwLockServerState, ServerState}; use stop::send_stop; use thiserror::Error; @@ -96,7 +97,9 @@ async fn main() -> Result<(), Box> { if config.ephemeral_disk_encryption { info!("Running with at-rest encryption!"); - ENCRYPTION_KEY.set(gen_key()).unwrap(); + ENCRYPTION_KEY + .set(*Key::from_slice(gen_key().as_ref())) + .unwrap(); } if config.enable_metrics {