Compare commits

...

3 commits

Author SHA1 Message Date
491d0c9fda
Fix reading from encrypted files 2021-05-27 16:22:32 -04:00
f038452263
Fix logging messages 2021-05-27 16:22:15 -04:00
c32e534300
Revamp reading potentially encrypted files
Previously, we'd assume that if the header was successfully parsed, then
we'd have a valid encrypted header. This isn't the case, and now instead
we try to read metadata, first unencrypted, then encrypted, to make sure
we're reading with the correct abstraction.
2021-05-27 16:19:26 -04:00
4 changed files with 78 additions and 43 deletions

110
src/cache/fs.rs vendored
View file

@ -33,10 +33,7 @@ use sodiumoxide::crypto::secretstream::{
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES, Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
}; };
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ReadBuf};
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
ReadBuf,
};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::watch::{channel, Receiver}; use tokio::sync::watch::{channel, Receiver};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -78,12 +75,35 @@ static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
pub(super) async fn read_file( pub(super) async fn read_file(
path: &Path, path: &Path,
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> { ) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
let mut file = File::open(path).await.ok()?; let file = std::fs::File::open(path).ok()?;
let file_0 = file.try_clone().unwrap();
// Try reading decrypted header first...
let mut deserializer = serde_json::Deserializer::from_reader(file);
let maybe_metadata = ImageMetadata::deserialize(&mut deserializer);
let parsed_metadata;
let mut maybe_header = None;
let mut reader: Option<Pin<Box<dyn AsyncRead + Send>>> = None;
if let Ok(metadata) = maybe_metadata {
// image is decrypted
if ENCRYPTION_KEY.get().is_some() {
// invalidate cache since we're running in at-rest encryption and
// the file wasn't encrypted.
warn!("Found file, but encrypted header was not found. Assuming corrupted!");
return None;
}
reader = Some(Box::pin(File::from_std(file_0)));
parsed_metadata = Some(metadata);
} else {
let mut file = File::from_std(file_0);
let file_0 = file.try_clone().await.unwrap();
// image is decrypted or corrupt
let mut reader = {
// If the encryption key was set, use the encrypted disk reader instead; // If the encryption key was set, use the encrypted disk reader instead;
// else, just directly read from file. // else, just directly read from file.
let inner_reader: Pin<Box<dyn AsyncRead + Send>> = if let Some(key) = ENCRYPTION_KEY.get() { if let Some(key) = ENCRYPTION_KEY.get() {
let mut header_bytes = [0; HEADERBYTES]; let mut header_bytes = [0; HEADERBYTES];
if file.read_exact(&mut header_bytes).await.is_err() { if file.read_exact(&mut header_bytes).await.is_err() {
warn!("Found file, but encrypted header was not found. Assuming corrupted!"); warn!("Found file, but encrypted header was not found. Assuming corrupted!");
@ -104,38 +124,35 @@ pub(super) async fn read_file(
return None; return None;
}; };
Box::pin(EncryptedDiskReader::new(file, secret_stream)) maybe_header = Some(header);
reader = Some(Box::pin(EncryptedDiskReader::new(file, secret_stream)));
}
let mut deserializer = serde_json::Deserializer::from_reader(file_0.into_std().await);
parsed_metadata = ImageMetadata::deserialize(&mut deserializer).ok();
}
// parsed_metadata is either set or unset here. If it's set then we
// successfully decoded the data; otherwise the file is garbage.
if let Some(reader) = reader {
// False positive lint, `file` is used in both cases, which means that it's
// not possible to move this into a map_or_else without cloning `file`.
#[allow(clippy::option_if_let_else)]
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
InnerStream::Concurrent(ConcurrentFsStream::from_reader(
reader,
WatchStream::new(status),
))
} else { } else {
Box::pin(file) InnerStream::Completed(FramedRead::new(reader, BytesCodec::new()))
}; };
BufReader::new(inner_reader) parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata)))
};
let metadata = {
let mut read = String::new();
reader
.read_line(&mut read)
.await
.expect("failed to read metadata");
serde_json::from_str(&read).ok()?
};
let reader = Box::pin(reader);
// False positive lint, `file` is used in both cases, which means that it's
// not possible to move this into a map_or_else without cloning `file`.
#[allow(clippy::option_if_let_else)]
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
InnerStream::Concurrent(ConcurrentFsStream::from_reader(
reader,
WatchStream::new(status),
))
} else { } else {
InnerStream::Completed(FramedRead::new(reader, BytesCodec::new())) None
}; }
Some(Ok((stream, None, metadata)))
} }
struct EncryptedDiskReader { struct EncryptedDiskReader {
@ -246,7 +263,6 @@ where
let mut acc_bytes = BytesMut::new(); let mut acc_bytes = BytesMut::new();
let accumulate = on_complete.is_some(); let accumulate = on_complete.is_some();
writer.write_all(metadata_string.as_bytes()).await?; writer.write_all(metadata_string.as_bytes()).await?;
writer.write_all(b"\n").await?;
while let Some(bytes) = byte_stream.next().await { while let Some(bytes) = byte_stream.next().await {
if let Ok(mut bytes) = bytes { if let Ok(mut bytes) = bytes {
@ -363,7 +379,8 @@ impl AsyncWrite for EncryptedDiskWriter {
{ {
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
new_self.write_buffer.drain(..n); new_self.write_buffer.drain(..n);
Poll::Ready(Ok(n)) // We buffered all the bytes that were provided to use.
Poll::Ready(Ok(buf.len()))
} }
poll => poll, poll => poll,
} }
@ -373,7 +390,24 @@ impl AsyncWrite for EncryptedDiskWriter {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> { ) -> Poll<Result<(), std::io::Error>> {
self.file.as_mut().poll_flush(cx) if !self.as_ref().write_buffer.is_empty() {
let new_self = Pin::into_inner(self);
let buffer = new_self.write_buffer.as_ref();
match new_self.file.as_mut().poll_write(cx, buffer) {
Poll::Ready(res) => {
let n = res?;
new_self.write_buffer.drain(..n);
// We're immediately ready to do some more flushing!
cx.waker().wake_by_ref();
// Return pending here because we still need to flush the
// file
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
} else {
self.file.as_mut().poll_flush(cx)
}
} }
fn poll_shutdown( fn poll_shutdown(

View file

@ -176,9 +176,10 @@ pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwL
debug!("got write guard for server state"); debug!("got write guard for server state");
let mut write_guard = data.0.write(); let mut write_guard = data.0.write();
if !write_guard.url_overridden && write_guard.image_server != resp.image_server { let image_server_changed = write_guard.image_server != resp.image_server;
if !write_guard.url_overridden && image_server_changed {
write_guard.image_server = resp.image_server; write_guard.image_server = resp.image_server;
} else { } else if image_server_changed {
warn!("Ignoring new upstream url!"); warn!("Ignoring new upstream url!");
} }
@ -220,7 +221,7 @@ pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwL
PREVIOUSLY_COMPROMISED.store(resp.compromised, Ordering::Release); PREVIOUSLY_COMPROMISED.store(resp.compromised, Ordering::Release);
if resp.compromised { if resp.compromised {
error!("Got compromised response from control center!"); error!("Got compromised response from control center!");
} else { } else if previously_compromised {
info!("No longer compromised!"); info!("No longer compromised!");
} }
} }

View file

@ -11,7 +11,7 @@ use actix_web::{get, web::Data, HttpRequest, HttpResponse, Responder};
use base64::DecodeError; use base64::DecodeError;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::{AsyncReadExt, Stream, TryStreamExt}; use futures::{Stream, TryStreamExt};
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use prometheus::{Encoder, TextEncoder}; use prometheus::{Encoder, TextEncoder};

View file

@ -75,7 +75,7 @@ impl ServerState {
} }
})?; })?;
PREVIOUSLY_COMPROMISED.store(resp.paused, Ordering::Release); PREVIOUSLY_COMPROMISED.store(resp.compromised, Ordering::Release);
if resp.compromised { if resp.compromised {
error!("Got compromised response from control center!"); error!("Got compromised response from control center!");
} }