implement at-rest encryption

feature/v32-tokens
Edward Shen 2021-05-19 21:42:55 -04:00
parent 8772df2231
commit b66cccc832
Signed by: edward
GPG Key ID: 19182661E818369F
5 changed files with 250 additions and 54 deletions

40
Cargo.lock generated
View File

@ -456,9 +456,9 @@ dependencies = [
[[package]]
name = "const_fn"
version = "0.4.7"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "402da840495de3f976eaefc3485b7f5eb5b0bf9761f9a47be27fe975b3b8c2ec"
checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7"
[[package]]
name = "convert_case"
@ -479,9 +479,9 @@ dependencies = [
[[package]]
name = "cpufeatures"
version = "0.1.1"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dec1028182c380cc45a2e2c5ec841134f2dfd0f8f5f0a5bcd68004f81b5efdf4"
checksum = "ed00c67cb5d0a7d64a44f6ad2668db7e7530311dd53ea79bcd4fb022c64911c8"
dependencies = [
"libc",
]
@ -547,9 +547,9 @@ dependencies = [
[[package]]
name = "derive_more"
version = "0.99.13"
version = "0.99.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82b1b72f1263f214c0f823371768776c4f5841b942c9883aa8e5ec584fd0ba6"
checksum = "5cc7b9cef1e351660e5443924e4f43ab25fbbed3e9a5f052df3677deb4d6b320"
dependencies = [
"convert_case",
"proc-macro2",
@ -1522,18 +1522,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.125"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171"
checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.125"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d"
checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43"
dependencies = [
"proc-macro2",
"quote",
@ -1874,9 +1874,9 @@ dependencies = [
[[package]]
name = "terminal_size"
version = "0.1.16"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86ca8ced750734db02076f44132d802af0b33b09942331f4459dde8636fd2406"
checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df"
dependencies = [
"libc",
"winapi",
@ -1977,9 +1977,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.5.0"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37"
dependencies = [
"autocfg",
"bytes",
@ -1997,9 +1997,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37"
dependencies = [
"proc-macro2",
"quote",
@ -2019,9 +2019,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
checksum = "f8864d706fdb3cc0843a49647ac892720dac98a6eeb818b77190592cf4994066"
dependencies = [
"futures-core",
"pin-project-lite",
@ -2031,9 +2031,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.6.6"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e"
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592"
dependencies = [
"bytes",
"futures-core",

228
src/cache/fs.rs vendored
View File

@ -17,10 +17,12 @@
use actix_web::error::PayloadError;
use bytes::{Buf, Bytes, BytesMut};
use futures::{Future, Stream, StreamExt};
use log::debug;
use log::{debug, warn};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::secretstream::Header;
use sodiumoxide::crypto::secretstream::{
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
@ -30,14 +32,19 @@ use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf};
use tokio::io::{
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
ReadBuf,
};
use tokio::sync::mpsc::Sender;
use tokio::sync::watch::{channel, Receiver};
use tokio::sync::RwLock;
use tokio_stream::wrappers::WatchStream;
use tokio_util::codec::{BytesCodec, FramedRead};
use super::{BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream};
use super::{
BoxedImageStream, CacheKey, CacheStreamItem, ImageMetadata, InnerStream, ENCRYPTION_KEY,
};
#[derive(Serialize, Deserialize)]
pub enum OnDiskMetadata {
@ -71,28 +78,118 @@ pub(super) async fn read_file(
path: &Path,
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
let std_file = std::fs::File::open(path).ok()?;
let file = File::from_std(std_file.try_clone().ok()?);
let mut file = File::from_std(std_file.try_clone().ok()?);
let mut reader = {
// If the encryption key was set, use the encrypted disk reader instead;
// else, just directly read from file.
let inner_reader: Pin<Box<dyn AsyncRead + Send>> = if let Some(key) = ENCRYPTION_KEY.get() {
let mut header_bytes = [0; HEADERBYTES];
if file.read_exact(&mut header_bytes).await.is_err() {
warn!("Found file, but encrypted header was not found. Assuming corrupted!");
return None;
}
Box::pin(EncryptedDiskReader::new(
file,
SecretStream::init_pull(
&Header::from_slice(&header_bytes).expect("failed to get header"),
key,
)
.expect("Failed to initialize decryption kesy"),
))
} else {
Box::pin(file)
};
BufReader::new(inner_reader)
};
let metadata = {
let mut de = serde_json::Deserializer::from_reader(std_file);
ImageMetadata::deserialize(&mut de).ok()?
let mut read = String::new();
reader
.read_line(&mut read)
.await
.expect("failed to read metadata");
serde_json::from_str(&read).ok()?
};
let reader = Box::pin(reader);
// False positive lint, `file` is used in both cases, which means that it's
// not possible to move this into a map_or_else without cloning `file`.
#[allow(clippy::option_if_let_else)]
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
InnerStream::Concurrent(ConcurrentFsStream::from_file(
file,
InnerStream::Concurrent(ConcurrentFsStream::from_reader(
reader,
WatchStream::new(status),
))
} else {
InnerStream::Completed(FramedRead::new(BufReader::new(file), BytesCodec::new()))
InnerStream::Completed(FramedRead::new(reader, BytesCodec::new()))
};
Some(Ok((stream, None, metadata)))
}
struct EncryptedDiskReader {
file: Pin<Box<File>>,
stream: SecretStream<Pull>,
buf: Vec<u8>,
}
impl EncryptedDiskReader {
fn new(file: File, stream: SecretStream<Pull>) -> Self {
Self {
file: Box::pin(file),
stream,
buf: vec![],
}
}
}
impl AsyncRead for EncryptedDiskReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let cursor_start = buf.filled().len();
let res = self.as_mut().file.as_mut().poll_read(cx, buf);
if res.is_pending() {
return Poll::Pending;
}
let cursor_new = buf.filled().len();
// pull_to_vec internally calls vec.clear() and vec.reserve(). Generally
// speaking we should be reading about the same amount of data each time
// so we shouldn't experience too much of a slow down w.r.t resizing the
// buffer...
let new_self = Pin::into_inner(self);
new_self
.stream
.pull_to_vec(
&buf.filled()[cursor_start..cursor_new],
None,
&mut new_self.buf,
)
.unwrap();
// data is strictly smaller than the encrypted stream, since you need to
// encode tags as well, so this is always safe.
// rewrite encrypted data into decrypted data
let buffer = buf.filled_mut();
for (old, new) in buffer[cursor_start..].iter_mut().zip(&new_self.buf) {
*old = *new;
}
buf.set_filled(cursor_start + &new_self.buf.len());
res
}
}
/// Writes the metadata and input stream (in that order) to a file, returning a
/// stream that reads from that file. Accepts a db callback function that is
/// provided the number of bytes written, and an optional on-complete callback
@ -111,7 +208,7 @@ where
{
let (tx, rx) = channel(WritingStatus::NotDone);
let mut file = {
let file = {
let mut write_lock = WRITING_STATUS.write().await;
let parent = path.parent().expect("The path to have a parent");
create_dir_all(parent).await?;
@ -122,6 +219,17 @@ where
let metadata_string = serde_json::to_string(&metadata).expect("serialization to work");
let metadata_size = metadata_string.len();
let (mut writer, maybe_header): (Pin<Box<dyn AsyncWrite + Send>>, _) =
if let Some((enc, header)) = ENCRYPTION_KEY
.get()
.map(|key| SecretStream::init_push(key).expect("Failed to init enc stream"))
{
(Box::pin(EncryptedDiskWriter::new(file, enc)), Some(header))
} else {
(Box::pin(file), None)
};
// need owned variant because async lifetime
let path_buf = path.to_path_buf();
tokio::spawn(async move {
@ -130,7 +238,8 @@ where
let mut bytes_written: u32 = 0;
let mut acc_bytes = BytesMut::new();
let accumulate = on_complete.is_some();
file.write_all(metadata_string.as_bytes()).await?;
writer.write_all(metadata_string.as_bytes()).await?;
writer.write_all(b"\n").await?;
while let Some(bytes) = byte_stream.next().await {
if let Ok(mut bytes) = bytes {
@ -139,7 +248,7 @@ where
}
loop {
match file.write(&bytes).await? {
match writer.write(&bytes).await? {
0 => break,
n => {
bytes.advance(n);
@ -162,8 +271,8 @@ where
// We don't care about the result of the call.
std::mem::drop(remove_file(&path_buf).await);
} else {
file.flush().await?;
file.sync_all().await?; // we need metadata
writer.flush().await?;
// writer.sync_all().await?; // we need metadata
debug!("writing to file done");
}
@ -204,13 +313,81 @@ where
InnerStream::Concurrent(
ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?,
),
None,
maybe_header,
))
}
struct EncryptedDiskWriter {
file: Pin<Box<File>>,
stream: Option<SecretStream<Push>>,
encryption_buffer: Vec<u8>,
write_buffer: Vec<u8>,
}
impl EncryptedDiskWriter {
fn new(file: File, stream: SecretStream<Push>) -> Self {
Self {
file: Box::pin(file),
stream: Some(stream),
encryption_buffer: vec![],
write_buffer: vec![],
}
}
}
impl AsyncWrite for EncryptedDiskWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let new_self = Pin::into_inner(self);
{
let encryption_buffer = &mut new_self.encryption_buffer;
new_self.stream.as_mut().map(|stream| {
stream
.push_to_vec(buf, None, Tag::Message, encryption_buffer)
.expect("Failed to write encrypted data to buffer");
});
}
new_self.write_buffer.extend(&new_self.encryption_buffer);
match new_self
.file
.as_mut()
.poll_write(cx, &new_self.write_buffer)
{
Poll::Ready(Ok(n)) => {
new_self.write_buffer.drain(..n);
Poll::Ready(Ok(n))
}
poll => poll,
}
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.file.as_mut().poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.as_mut()
.stream
.take()
.map(|stream| stream.finalize(None));
self.file.as_mut().poll_shutdown(cx)
}
}
pub struct ConcurrentFsStream {
/// The File to read from
file: Pin<Box<BufReader<File>>>,
reader: Pin<Box<dyn AsyncRead + Send>>,
/// The channel to get updates from. The writer must send its status, else
/// this reader will never complete.
receiver: Pin<Box<WatchStream<WritingStatus>>>,
@ -229,12 +406,15 @@ impl ConcurrentFsStream {
) -> Result<Self, std::io::Error> {
let mut file = File::open(path).await?;
file.seek(SeekFrom::Start(seek as u64)).await?;
Ok(Self::from_file(file, receiver))
Ok(Self::from_reader(Box::pin(file), receiver))
}
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
fn from_reader(
reader: Pin<Box<dyn AsyncRead + Send>>,
receiver: WatchStream<WritingStatus>,
) -> Self {
Self {
file: Box::pin(BufReader::new(file)),
reader: Box::pin(reader),
receiver: Box::pin(receiver),
bytes_read: 0,
bytes_total: None,
@ -263,7 +443,7 @@ impl Stream for ConcurrentFsStream {
// TODO: Might be more efficient to have a larger buffer
let mut bytes = [0; 4 * 1024].to_vec();
let mut buffer = ReadBuf::new(&mut bytes);
match self.file.as_mut().poll_read(cx, &mut buffer) {
match self.reader.as_mut().poll_read(cx, &mut buffer) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(_)) => return Poll::Ready(Some(Err(UpstreamError))),
Poll::Pending => return Poll::Pending,
@ -327,3 +507,9 @@ enum WritingStatus {
Done(u32),
Error,
}
#[cfg(test)]
mod storage {
#[test]
fn wut() {}
}

9
src/cache/mem.rs vendored
View File

@ -5,7 +5,8 @@ use std::sync::Arc;
use crate::cache::DiskCache;
use super::{
BoxedImageStream, Cache, CacheKey, CacheStream, ImageMetadata, InnerStream, MemStream,
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata, InnerStream,
MemStream,
};
use async_trait::async_trait;
use bytes::Bytes;
@ -86,7 +87,11 @@ impl<InternalCacheImpl: InternalMemoryCache> Cache for MemoryCache<InternalCache
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata))
}) {
Some(v) => Some(v.map(|(inner, metadata)| todo!())),
Some(v) => Some(v.and_then(|(inner, metadata)| {
CacheStream::new(inner, None)
.map(|v| (v, metadata))
.map_err(|_| CacheError::DecryptionFailure)
})),
None => self.inner.get(key).await,
},
None => self.inner.get(key).await,

13
src/cache/mod.rs vendored
View File

@ -10,13 +10,12 @@ use bytes::{Bytes, BytesMut};
use chrono::{DateTime, FixedOffset};
use fs::ConcurrentFsStream;
use futures::{Stream, StreamExt};
use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use sodiumoxide::crypto::secretstream::{gen_key, Header, Key, Pull, Stream as SecretStream};
use sodiumoxide::crypto::secretstream::{Header, Key, Pull, Stream as SecretStream};
use thiserror::Error;
use tokio::fs::File;
use tokio::io::BufReader;
use tokio::io::AsyncRead;
use tokio::sync::mpsc::Sender;
use tokio_util::codec::{BytesCodec, FramedRead};
@ -24,7 +23,7 @@ pub use disk::DiskCache;
pub use fs::UpstreamError;
pub use mem::MemoryCache;
static ENCRYPTION_KEY: Lazy<Key> = Lazy::new(gen_key);
pub static ENCRYPTION_KEY: OnceCell<Key> = OnceCell::new();
mod disk;
mod fs;
@ -216,7 +215,7 @@ impl CacheStream {
Ok(Self {
inner,
decrypt: header
.map(|header| SecretStream::init_pull(&header, &ENCRYPTION_KEY))
.map(|header| SecretStream::init_pull(&header, &ENCRYPTION_KEY.get().unwrap()))
.transpose()?,
})
}
@ -248,7 +247,7 @@ impl Stream for CacheStream {
pub(self) enum InnerStream {
Concurrent(ConcurrentFsStream),
Memory(MemStream),
Completed(FramedRead<BufReader<File>, BytesCodec>),
Completed(FramedRead<Pin<Box<dyn AsyncRead + Send>>, BytesCodec>),
}
impl From<CachedImage> for InnerStream {

View File

@ -18,15 +18,16 @@ use actix_web::{App, HttpServer};
use cache::{Cache, DiskCache};
use clap::Clap;
use config::CliArgs;
use log::{debug, error, warn, LevelFilter};
use log::{debug, error, info, warn, LevelFilter};
use parking_lot::RwLock;
use rustls::{NoClientAuth, ServerConfig};
use simple_logger::SimpleLogger;
use sodiumoxide::crypto::secretstream::gen_key;
use state::{RwLockServerState, ServerState};
use stop::send_stop;
use thiserror::Error;
use crate::cache::{mem, MemoryCache};
use crate::cache::{mem, MemoryCache, ENCRYPTION_KEY};
use crate::config::UnstableOptions;
use crate::state::DynamicServerCert;
@ -76,6 +77,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
(0, 0) => LevelFilter::Info,
(_, 1) => LevelFilter::Debug,
(_, n) if n > 1 => LevelFilter::Trace,
// compiler can't figure it out
_ => unsafe { unreachable_unchecked() },
};
@ -94,6 +96,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
};
let client_secret_1 = client_secret.clone();
if cli_args.ephemeral_disk_encryption {
info!("Running with at-rest encryption!");
ENCRYPTION_KEY.set(gen_key()).unwrap();
}
let server = ServerState::init(&client_secret, &cli_args).await?;
let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
let data_1 = Arc::clone(&data_0);
@ -119,7 +126,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if running_2.load(Ordering::SeqCst) {
send_stop(&client_secret).await;
} else {
warn!("Got second ctrl-c, forcefully exiting");
warn!("Got second Ctrl-C, forcefully exiting");
system.stop()
}
});
@ -159,7 +166,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
})
.shutdown_timeout(60)
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)?
// .bind(format!("0.0.0.0:{}", port))?
.run()
.await?;