Compare commits
No commits in common. "491d0c9fda1e7bf91ce9a916aa70f2017fc1598e" and "34c716bfceb737072acbc0d1ffd42359021c9549" have entirely different histories.
491d0c9fda
...
34c716bfce
4 changed files with 43 additions and 78 deletions
110
src/cache/fs.rs
vendored
110
src/cache/fs.rs
vendored
|
@ -33,7 +33,10 @@ use sodiumoxide::crypto::secretstream::{
|
||||||
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
|
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
|
||||||
};
|
};
|
||||||
use tokio::fs::{create_dir_all, remove_file, File};
|
use tokio::fs::{create_dir_all, remove_file, File};
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
use tokio::io::{
|
||||||
|
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
|
||||||
|
ReadBuf,
|
||||||
|
};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::sync::watch::{channel, Receiver};
|
use tokio::sync::watch::{channel, Receiver};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
@ -75,35 +78,12 @@ static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
|
||||||
pub(super) async fn read_file(
|
pub(super) async fn read_file(
|
||||||
path: &Path,
|
path: &Path,
|
||||||
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
||||||
let file = std::fs::File::open(path).ok()?;
|
let mut file = File::open(path).await.ok()?;
|
||||||
let file_0 = file.try_clone().unwrap();
|
|
||||||
// Try reading decrypted header first...
|
|
||||||
let mut deserializer = serde_json::Deserializer::from_reader(file);
|
|
||||||
let maybe_metadata = ImageMetadata::deserialize(&mut deserializer);
|
|
||||||
|
|
||||||
let parsed_metadata;
|
|
||||||
let mut maybe_header = None;
|
|
||||||
let mut reader: Option<Pin<Box<dyn AsyncRead + Send>>> = None;
|
|
||||||
if let Ok(metadata) = maybe_metadata {
|
|
||||||
// image is decrypted
|
|
||||||
if ENCRYPTION_KEY.get().is_some() {
|
|
||||||
// invalidate cache since we're running in at-rest encryption and
|
|
||||||
// the file wasn't encrypted.
|
|
||||||
warn!("Found file, but encrypted header was not found. Assuming corrupted!");
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
reader = Some(Box::pin(File::from_std(file_0)));
|
|
||||||
parsed_metadata = Some(metadata);
|
|
||||||
} else {
|
|
||||||
let mut file = File::from_std(file_0);
|
|
||||||
let file_0 = file.try_clone().await.unwrap();
|
|
||||||
|
|
||||||
// image is decrypted or corrupt
|
|
||||||
|
|
||||||
|
let mut reader = {
|
||||||
// If the encryption key was set, use the encrypted disk reader instead;
|
// If the encryption key was set, use the encrypted disk reader instead;
|
||||||
// else, just directly read from file.
|
// else, just directly read from file.
|
||||||
if let Some(key) = ENCRYPTION_KEY.get() {
|
let inner_reader: Pin<Box<dyn AsyncRead + Send>> = if let Some(key) = ENCRYPTION_KEY.get() {
|
||||||
let mut header_bytes = [0; HEADERBYTES];
|
let mut header_bytes = [0; HEADERBYTES];
|
||||||
if file.read_exact(&mut header_bytes).await.is_err() {
|
if file.read_exact(&mut header_bytes).await.is_err() {
|
||||||
warn!("Found file, but encrypted header was not found. Assuming corrupted!");
|
warn!("Found file, but encrypted header was not found. Assuming corrupted!");
|
||||||
|
@ -124,35 +104,38 @@ pub(super) async fn read_file(
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
|
|
||||||
maybe_header = Some(header);
|
Box::pin(EncryptedDiskReader::new(file, secret_stream))
|
||||||
|
|
||||||
reader = Some(Box::pin(EncryptedDiskReader::new(file, secret_stream)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut deserializer = serde_json::Deserializer::from_reader(file_0.into_std().await);
|
|
||||||
parsed_metadata = ImageMetadata::deserialize(&mut deserializer).ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
// parsed_metadata is either set or unset here. If it's set then we
|
|
||||||
// successfully decoded the data; otherwise the file is garbage.
|
|
||||||
|
|
||||||
if let Some(reader) = reader {
|
|
||||||
// False positive lint, `file` is used in both cases, which means that it's
|
|
||||||
// not possible to move this into a map_or_else without cloning `file`.
|
|
||||||
#[allow(clippy::option_if_let_else)]
|
|
||||||
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
|
|
||||||
InnerStream::Concurrent(ConcurrentFsStream::from_reader(
|
|
||||||
reader,
|
|
||||||
WatchStream::new(status),
|
|
||||||
))
|
|
||||||
} else {
|
} else {
|
||||||
InnerStream::Completed(FramedRead::new(reader, BytesCodec::new()))
|
Box::pin(file)
|
||||||
};
|
};
|
||||||
|
|
||||||
parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata)))
|
BufReader::new(inner_reader)
|
||||||
|
};
|
||||||
|
|
||||||
|
let metadata = {
|
||||||
|
let mut read = String::new();
|
||||||
|
reader
|
||||||
|
.read_line(&mut read)
|
||||||
|
.await
|
||||||
|
.expect("failed to read metadata");
|
||||||
|
serde_json::from_str(&read).ok()?
|
||||||
|
};
|
||||||
|
|
||||||
|
let reader = Box::pin(reader);
|
||||||
|
|
||||||
|
// False positive lint, `file` is used in both cases, which means that it's
|
||||||
|
// not possible to move this into a map_or_else without cloning `file`.
|
||||||
|
#[allow(clippy::option_if_let_else)]
|
||||||
|
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
|
||||||
|
InnerStream::Concurrent(ConcurrentFsStream::from_reader(
|
||||||
|
reader,
|
||||||
|
WatchStream::new(status),
|
||||||
|
))
|
||||||
} else {
|
} else {
|
||||||
None
|
InnerStream::Completed(FramedRead::new(reader, BytesCodec::new()))
|
||||||
}
|
};
|
||||||
|
|
||||||
|
Some(Ok((stream, None, metadata)))
|
||||||
}
|
}
|
||||||
|
|
||||||
struct EncryptedDiskReader {
|
struct EncryptedDiskReader {
|
||||||
|
@ -263,6 +246,7 @@ where
|
||||||
let mut acc_bytes = BytesMut::new();
|
let mut acc_bytes = BytesMut::new();
|
||||||
let accumulate = on_complete.is_some();
|
let accumulate = on_complete.is_some();
|
||||||
writer.write_all(metadata_string.as_bytes()).await?;
|
writer.write_all(metadata_string.as_bytes()).await?;
|
||||||
|
writer.write_all(b"\n").await?;
|
||||||
|
|
||||||
while let Some(bytes) = byte_stream.next().await {
|
while let Some(bytes) = byte_stream.next().await {
|
||||||
if let Ok(mut bytes) = bytes {
|
if let Ok(mut bytes) = bytes {
|
||||||
|
@ -379,8 +363,7 @@ impl AsyncWrite for EncryptedDiskWriter {
|
||||||
{
|
{
|
||||||
Poll::Ready(Ok(n)) => {
|
Poll::Ready(Ok(n)) => {
|
||||||
new_self.write_buffer.drain(..n);
|
new_self.write_buffer.drain(..n);
|
||||||
// We buffered all the bytes that were provided to use.
|
Poll::Ready(Ok(n))
|
||||||
Poll::Ready(Ok(buf.len()))
|
|
||||||
}
|
}
|
||||||
poll => poll,
|
poll => poll,
|
||||||
}
|
}
|
||||||
|
@ -390,24 +373,7 @@ impl AsyncWrite for EncryptedDiskWriter {
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
if !self.as_ref().write_buffer.is_empty() {
|
self.file.as_mut().poll_flush(cx)
|
||||||
let new_self = Pin::into_inner(self);
|
|
||||||
let buffer = new_self.write_buffer.as_ref();
|
|
||||||
match new_self.file.as_mut().poll_write(cx, buffer) {
|
|
||||||
Poll::Ready(res) => {
|
|
||||||
let n = res?;
|
|
||||||
new_self.write_buffer.drain(..n);
|
|
||||||
// We're immediately ready to do some more flushing!
|
|
||||||
cx.waker().wake_by_ref();
|
|
||||||
// Return pending here because we still need to flush the
|
|
||||||
// file
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.file.as_mut().poll_flush(cx)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_shutdown(
|
fn poll_shutdown(
|
||||||
|
|
|
@ -176,10 +176,9 @@ pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwL
|
||||||
debug!("got write guard for server state");
|
debug!("got write guard for server state");
|
||||||
let mut write_guard = data.0.write();
|
let mut write_guard = data.0.write();
|
||||||
|
|
||||||
let image_server_changed = write_guard.image_server != resp.image_server;
|
if !write_guard.url_overridden && write_guard.image_server != resp.image_server {
|
||||||
if !write_guard.url_overridden && image_server_changed {
|
|
||||||
write_guard.image_server = resp.image_server;
|
write_guard.image_server = resp.image_server;
|
||||||
} else if image_server_changed {
|
} else {
|
||||||
warn!("Ignoring new upstream url!");
|
warn!("Ignoring new upstream url!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,7 +220,7 @@ pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwL
|
||||||
PREVIOUSLY_COMPROMISED.store(resp.compromised, Ordering::Release);
|
PREVIOUSLY_COMPROMISED.store(resp.compromised, Ordering::Release);
|
||||||
if resp.compromised {
|
if resp.compromised {
|
||||||
error!("Got compromised response from control center!");
|
error!("Got compromised response from control center!");
|
||||||
} else if previously_compromised {
|
} else {
|
||||||
info!("No longer compromised!");
|
info!("No longer compromised!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ use actix_web::{get, web::Data, HttpRequest, HttpResponse, Responder};
|
||||||
use base64::DecodeError;
|
use base64::DecodeError;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use futures::{Stream, TryStreamExt};
|
use futures::{AsyncReadExt, Stream, TryStreamExt};
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use prometheus::{Encoder, TextEncoder};
|
use prometheus::{Encoder, TextEncoder};
|
||||||
|
|
|
@ -75,7 +75,7 @@ impl ServerState {
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
PREVIOUSLY_COMPROMISED.store(resp.compromised, Ordering::Release);
|
PREVIOUSLY_COMPROMISED.store(resp.paused, Ordering::Release);
|
||||||
if resp.compromised {
|
if resp.compromised {
|
||||||
error!("Got compromised response from control center!");
|
error!("Got compromised response from control center!");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue