more partial work into encryption

This commit is contained in:
Edward Shen 2021-07-13 16:39:32 -04:00
parent 2ace8d3d66
commit 656543b539
Signed by: edward
GPG key ID: 19182661E818369F

181
src/cache/fs.rs vendored
View file

@ -14,6 +14,7 @@
//! upstream no longer needs to process duplicate requests and sequential cache //! upstream no longer needs to process duplicate requests and sequential cache
//! misses are treated as closer as a cache hit. //! misses are treated as closer as a cache hit.
use std::convert::TryInto;
use std::error::Error; use std::error::Error;
use std::fmt::Display; use std::fmt::Display;
use std::io::{Seek, SeekFrom}; use std::io::{Seek, SeekFrom};
@ -107,8 +108,6 @@ async fn read_file(
return None; return None;
} }
dbg!(&header_bytes);
debug!("header bytes: {:x?}", header_bytes); debug!("header bytes: {:x?}", header_bytes);
let file_header = if let Some(header) = Header::from_slice(&header_bytes) { let file_header = if let Some(header) = Header::from_slice(&header_bytes) {
@ -131,8 +130,16 @@ async fn read_file(
} }
parsed_metadata = if let Some(reader) = reader.as_mut() { parsed_metadata = if let Some(reader) = reader.as_mut() {
debug!("trying to read metadata"); match reader.as_mut().metadata().await {
dbg!(reader.as_mut().metadata().await.ok()) Ok(metadata) => {
debug!("Successfully parsed encrypted metadata");
Some(metadata)
}
Err(_) => {
debug!("Failed to parse encrypted metadata");
None
}
}
} else { } else {
debug!("Failed to read encrypted data"); debug!("Failed to read encrypted data");
None None
@ -157,9 +164,8 @@ struct EncryptedDiskReader {
stream: SecretStream<Pull>, stream: SecretStream<Pull>,
// Bytes we read from the secret stream // Bytes we read from the secret stream
read_buffer: Box<[u8; 4096]>, read_buffer: Box<[u8; 4096]>,
read_data: Vec<u8>,
decryption_buffer: Vec<u8>, decryption_buffer: Vec<u8>,
// Bytes we write out to the read buf
write_buffer: Vec<u8>,
} }
impl EncryptedDiskReader { impl EncryptedDiskReader {
@ -168,8 +174,8 @@ impl EncryptedDiskReader {
file: Box::pin(file), file: Box::pin(file),
stream, stream,
read_buffer: Box::new([0; 4096]), read_buffer: Box::new([0; 4096]),
read_data: Vec::with_capacity(4096),
decryption_buffer: Vec::with_capacity(4096), decryption_buffer: Vec::with_capacity(4096),
write_buffer: Vec::with_capacity(4096),
} }
} }
} }
@ -189,64 +195,78 @@ impl<R: AsyncBufRead + Send> MetadataFetch for R {
impl AsyncRead for EncryptedDiskReader { impl AsyncRead for EncryptedDiskReader {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
// First, try and read from the underlying file. let buf_res = match self.as_mut().poll_fill_buf(cx) {
let pinned_self = Pin::into_inner(self); Poll::Ready(Ok(bytes)) => bytes,
let mut read_buf = ReadBuf::new(pinned_self.read_buffer.as_mut()); Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf); Poll::Pending => return Poll::Pending,
};
// If the file let size = buf.capacity().min(buf_res.len());
if read_res.is_pending() { buf.put_slice(&buf_res[..size]);
return Poll::Pending; self.as_mut().consume(size);
}
if pinned_self
.stream
.pull_to_vec(read_buf.filled(), None, &mut pinned_self.decryption_buffer)
.is_err()
{
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to decrypt data",
)));
}
pinned_self
.write_buffer
.extend_from_slice(&pinned_self.decryption_buffer);
// find the amount of bytes we can put into the output buffer.
let bytes_to_write = buf.remaining().min(pinned_self.write_buffer.len());
buf.put_slice(
&pinned_self
.write_buffer
.drain(..bytes_to_write)
.collect::<Vec<_>>(),
);
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }
impl AsyncBufRead for EncryptedDiskReader { impl AsyncBufRead for EncryptedDiskReader {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
// First, try and read from the underlying file.
let pinned_self = Pin::into_inner(self); let pinned_self = Pin::into_inner(self);
let mut read_buf = ReadBuf::new(pinned_self.read_buffer.as_mut()); let mut read_buf = ReadBuf::new(pinned_self.read_buffer.as_mut());
// // Load content from the leftovers
// read_buf.put_slice(&pinned_self.read_leftovers);
let msg = loop {
let prev_len = read_buf.filled().len();
// First, try and read from the underlying file.
let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf); let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf);
// If the file // Wait until we have something to read
if read_res.is_pending() { if read_res.is_pending() {
dbg!(line!());
return Poll::Pending; return Poll::Pending;
} }
dbg!(&read_buf.filled().len()); if prev_len == read_buf.filled().len() && pinned_self.read_data.is_empty() {
dbg!(line!());
return Poll::Ready(Ok(&[]));
}
pinned_self.read_data.extend_from_slice(read_buf.filled());
if dbg!(pinned_self.read_data.len()) >= std::mem::size_of::<usize>() {
let (size, rest) = pinned_self.read_data.split_at(std::mem::size_of::<usize>());
let size = usize::from_le_bytes(size.try_into().unwrap());
if dbg!(size) <= dbg!(rest.len()) {
let (data, leftovers) = rest.split_at(size);
// Extract data into their own vec
let data = data.to_vec();
// temporarily store leftovers in decryption buffer,
// then move leftovers back into read_data.
// This is to avoid an alloc
pinned_self.decryption_buffer.clear();
pinned_self.decryption_buffer.extend_from_slice(&leftovers);
pinned_self.read_data.clear();
pinned_self
.read_data
.extend_from_slice(&pinned_self.decryption_buffer);
dbg!(line!());
break data;
}
}
};
debug!("read message of size {:?}", msg.len());
debug!("First 5 bytes {:x?}", &msg[..5]);
debug!("Last 5 bytes {:x?}", &msg[msg.len() - 5..]);
if pinned_self if pinned_self
.stream .stream
.pull_to_vec(read_buf.filled(), None, &mut pinned_self.decryption_buffer) .pull_to_vec(&msg, None, &mut pinned_self.decryption_buffer)
.is_err() .is_err()
{ {
dbg!(line!()); dbg!(line!());
@ -256,15 +276,14 @@ impl AsyncBufRead for EncryptedDiskReader {
))); )));
} }
pinned_self // debug!("decrypted read: {:x?}", &pinned_self.decryption_buffer);
.write_buffer
.extend_from_slice(&pinned_self.decryption_buffer);
Poll::Ready(Ok(&pinned_self.write_buffer)) dbg!(line!());
Poll::Ready(Ok(&pinned_self.decryption_buffer))
} }
fn consume(mut self: Pin<&mut Self>, amt: usize) { fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.as_mut().write_buffer.drain(..amt); self.as_mut().decryption_buffer.drain(..amt);
} }
} }
@ -306,7 +325,7 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
// This needs to be outside the loop because we need to drop the // This needs to be outside the loop because we need to drop the
// reader ref, since that depends on a mut self. // reader ref, since that depends on a mut self.
self.0.as_mut().consume(bytes_consumed); self.0.as_mut().consume(dbg!(bytes_consumed));
return res; return res;
} }
} }
@ -339,7 +358,7 @@ where
.get() .get()
.map(|key| SecretStream::init_push(key).expect("Failed to init enc stream")) .map(|key| SecretStream::init_push(key).expect("Failed to init enc stream"))
{ {
file.write_all(dbg!(header.as_ref())).await?; file.write_all(header.as_ref()).await?;
Box::pin(EncryptedDiskWriter::new(file, enc)) Box::pin(EncryptedDiskWriter::new(file, enc))
} else { } else {
Box::pin(file) Box::pin(file)
@ -350,6 +369,7 @@ where
let mut error = writer.write_all(metadata_string.as_bytes()).await.err(); let mut error = writer.write_all(metadata_string.as_bytes()).await.err();
if error.is_none() { if error.is_none() {
debug!("decrypted write {:x?}", &bytes[..40]);
error = writer.write_all(&bytes).await.err(); error = writer.write_all(&bytes).await.err();
} }
@ -411,6 +431,9 @@ impl AsyncWrite for EncryptedDiskWriter {
.expect("Failed to write encrypted data to buffer"); .expect("Failed to write encrypted data to buffer");
} }
new_self
.write_buffer
.extend_from_slice(&new_self.encryption_buffer.len().to_le_bytes());
new_self.write_buffer.extend(&new_self.encryption_buffer); new_self.write_buffer.extend(&new_self.encryption_buffer);
match new_self match new_self
@ -419,7 +442,19 @@ impl AsyncWrite for EncryptedDiskWriter {
.poll_write(cx, &new_self.write_buffer) .poll_write(cx, &new_self.write_buffer)
{ {
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
new_self.write_buffer.drain(..n); let bytes_written = new_self.write_buffer.drain(..n).collect::<Vec<_>>();
debug!(
"wrote message of size {:?}",
bytes_written.len() - std::mem::size_of::<usize>()
);
debug!(
"First 5 bytes {:x?}",
&bytes_written[std::mem::size_of::<usize>()..std::mem::size_of::<usize>() + 5]
);
debug!(
"Last 5 bytes {:x?}",
&bytes_written[bytes_written.len() - 5..]
);
// We buffered all the bytes that were provided to use. // We buffered all the bytes that were provided to use.
Poll::Ready(Ok(buf.len())) Poll::Ready(Ok(buf.len()))
} }
@ -435,18 +470,28 @@ impl AsyncWrite for EncryptedDiskWriter {
self.file.as_mut().poll_flush(cx) self.file.as_mut().poll_flush(cx)
} else { } else {
let new_self = Pin::into_inner(self); let new_self = Pin::into_inner(self);
let buffer = new_self.write_buffer.as_ref();
match new_self.file.as_mut().poll_write(cx, buffer) { // Write as many bytes that we can immediately write
Poll::Ready(res) => { loop {
let n = res?; match new_self
new_self.write_buffer.drain(..n); .file
// We're immediately ready to do some more flushing! .as_mut()
cx.waker().wake_by_ref(); .poll_write(cx, &new_self.write_buffer)
// Return pending here because we still need to flush the {
// file Poll::Ready(Ok(n)) => {
Poll::Pending let bytes_written = new_self.write_buffer.drain(..n).collect::<Vec<_>>();
debug!(
"wrote message of size {:?}",
bytes_written.len() - std::mem::size_of::<usize>()
);
// We buffered all the bytes that were provided to use.
if new_self.write_buffer.is_empty() {
return Poll::Ready(Ok(()));
}
}
poll => return poll.map(|res| res.map(|_| ())),
} }
Poll::Pending => Poll::Pending,
} }
} }
} }
@ -466,12 +511,12 @@ impl AsyncWrite for EncryptedDiskWriter {
if let Some(Ok(bytes)) = maybe_bytes { if let Some(Ok(bytes)) = maybe_bytes {
// We just need to push it into the buffer, we don't really care // We just need to push it into the buffer, we don't really care
// about the result, since we can check later // about the result, since we can check later
let _ = self.as_mut().poll_write(cx, &bytes); let _ = self.as_mut().write_buffer.extend_from_slice(&bytes);
} }
// Now wait for us to fully flush out our write buffer // Now wait for us to fully flush out our write buffer
if !self.write_buffer.is_empty() { if self.as_mut().poll_flush(cx).is_pending() {
return self.poll_flush(cx); return Poll::Pending;
} }
// Write buffer is empty, flush file // Write buffer is empty, flush file