diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 03acb52..3c9f619 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -14,6 +14,7 @@ //! upstream no longer needs to process duplicate requests and sequential cache //! misses are treated as closer as a cache hit. +use std::convert::TryInto; use std::error::Error; use std::fmt::Display; use std::io::{Seek, SeekFrom}; @@ -107,8 +108,6 @@ async fn read_file( return None; } - dbg!(&header_bytes); - debug!("header bytes: {:x?}", header_bytes); let file_header = if let Some(header) = Header::from_slice(&header_bytes) { @@ -131,8 +130,16 @@ async fn read_file( } parsed_metadata = if let Some(reader) = reader.as_mut() { - debug!("trying to read metadata"); - dbg!(reader.as_mut().metadata().await.ok()) + match reader.as_mut().metadata().await { + Ok(metadata) => { + debug!("Successfully parsed encrypted metadata"); + Some(metadata) + } + Err(_) => { + debug!("Failed to parse encrypted metadata"); + None + } + } } else { debug!("Failed to read encrypted data"); None @@ -157,9 +164,8 @@ struct EncryptedDiskReader { stream: SecretStream, // Bytes we read from the secret stream read_buffer: Box<[u8; 4096]>, + read_data: Vec, decryption_buffer: Vec, - // Bytes we write out to the read buf - write_buffer: Vec, } impl EncryptedDiskReader { @@ -168,8 +174,8 @@ impl EncryptedDiskReader { file: Box::pin(file), stream, read_buffer: Box::new([0; 4096]), + read_data: Vec::with_capacity(4096), decryption_buffer: Vec::with_capacity(4096), - write_buffer: Vec::with_capacity(4096), } } } @@ -189,64 +195,78 @@ impl MetadataFetch for R { impl AsyncRead for EncryptedDiskReader { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - // First, try and read from the underlying file. - let pinned_self = Pin::into_inner(self); - let mut read_buf = ReadBuf::new(pinned_self.read_buffer.as_mut()); - let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf); - - // If the file - if read_res.is_pending() { - return Poll::Pending; - } - - if pinned_self - .stream - .pull_to_vec(read_buf.filled(), None, &mut pinned_self.decryption_buffer) - .is_err() - { - return Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to decrypt data", - ))); - } - - pinned_self - .write_buffer - .extend_from_slice(&pinned_self.decryption_buffer); - - // find the amount of bytes we can put into the output buffer. - let bytes_to_write = buf.remaining().min(pinned_self.write_buffer.len()); - buf.put_slice( - &pinned_self - .write_buffer - .drain(..bytes_to_write) - .collect::>(), - ); - + let buf_res = match self.as_mut().poll_fill_buf(cx) { + Poll::Ready(Ok(bytes)) => bytes, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + }; + let size = buf.capacity().min(buf_res.len()); + buf.put_slice(&buf_res[..size]); + self.as_mut().consume(size); Poll::Ready(Ok(())) } } impl AsyncBufRead for EncryptedDiskReader { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // First, try and read from the underlying file. let pinned_self = Pin::into_inner(self); let mut read_buf = ReadBuf::new(pinned_self.read_buffer.as_mut()); - let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf); - // If the file - if read_res.is_pending() { - return Poll::Pending; - } + // // Load content from the leftovers + // read_buf.put_slice(&pinned_self.read_leftovers); + + let msg = loop { + let prev_len = read_buf.filled().len(); + // First, try and read from the underlying file. + let read_res = pinned_self.file.as_mut().poll_read(cx, &mut read_buf); + + // Wait until we have something to read + if read_res.is_pending() { + dbg!(line!()); + return Poll::Pending; + } + + if prev_len == read_buf.filled().len() && pinned_self.read_data.is_empty() { + dbg!(line!()); + return Poll::Ready(Ok(&[])); + } + + pinned_self.read_data.extend_from_slice(read_buf.filled()); + + if dbg!(pinned_self.read_data.len()) >= std::mem::size_of::() { + let (size, rest) = pinned_self.read_data.split_at(std::mem::size_of::()); + let size = usize::from_le_bytes(size.try_into().unwrap()); + if dbg!(size) <= dbg!(rest.len()) { + let (data, leftovers) = rest.split_at(size); + // Extract data into their own vec + let data = data.to_vec(); + + // temporarily store leftovers in decryption buffer, + // then move leftovers back into read_data. + // This is to avoid an alloc + pinned_self.decryption_buffer.clear(); + pinned_self.decryption_buffer.extend_from_slice(&leftovers); + pinned_self.read_data.clear(); + pinned_self + .read_data + .extend_from_slice(&pinned_self.decryption_buffer); + dbg!(line!()); + break data; + } + } + }; + + debug!("read message of size {:?}", msg.len()); + debug!("First 5 bytes {:x?}", &msg[..5]); + debug!("Last 5 bytes {:x?}", &msg[msg.len() - 5..]); - dbg!(&read_buf.filled().len()); if pinned_self .stream - .pull_to_vec(read_buf.filled(), None, &mut pinned_self.decryption_buffer) + .pull_to_vec(&msg, None, &mut pinned_self.decryption_buffer) .is_err() { dbg!(line!()); @@ -256,15 +276,14 @@ impl AsyncBufRead for EncryptedDiskReader { ))); } - pinned_self - .write_buffer - .extend_from_slice(&pinned_self.decryption_buffer); + // debug!("decrypted read: {:x?}", &pinned_self.decryption_buffer); - Poll::Ready(Ok(&pinned_self.write_buffer)) + dbg!(line!()); + Poll::Ready(Ok(&pinned_self.decryption_buffer)) } fn consume(mut self: Pin<&mut Self>, amt: usize) { - self.as_mut().write_buffer.drain(..amt); + self.as_mut().decryption_buffer.drain(..amt); } } @@ -306,7 +325,7 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> { // This needs to be outside the loop because we need to drop the // reader ref, since that depends on a mut self. - self.0.as_mut().consume(bytes_consumed); + self.0.as_mut().consume(dbg!(bytes_consumed)); return res; } } @@ -339,7 +358,7 @@ where .get() .map(|key| SecretStream::init_push(key).expect("Failed to init enc stream")) { - file.write_all(dbg!(header.as_ref())).await?; + file.write_all(header.as_ref()).await?; Box::pin(EncryptedDiskWriter::new(file, enc)) } else { Box::pin(file) @@ -350,6 +369,7 @@ where let mut error = writer.write_all(metadata_string.as_bytes()).await.err(); if error.is_none() { + debug!("decrypted write {:x?}", &bytes[..40]); error = writer.write_all(&bytes).await.err(); } @@ -411,6 +431,9 @@ impl AsyncWrite for EncryptedDiskWriter { .expect("Failed to write encrypted data to buffer"); } + new_self + .write_buffer + .extend_from_slice(&new_self.encryption_buffer.len().to_le_bytes()); new_self.write_buffer.extend(&new_self.encryption_buffer); match new_self @@ -419,7 +442,19 @@ impl AsyncWrite for EncryptedDiskWriter { .poll_write(cx, &new_self.write_buffer) { Poll::Ready(Ok(n)) => { - new_self.write_buffer.drain(..n); + let bytes_written = new_self.write_buffer.drain(..n).collect::>(); + debug!( + "wrote message of size {:?}", + bytes_written.len() - std::mem::size_of::() + ); + debug!( + "First 5 bytes {:x?}", + &bytes_written[std::mem::size_of::()..std::mem::size_of::() + 5] + ); + debug!( + "Last 5 bytes {:x?}", + &bytes_written[bytes_written.len() - 5..] + ); // We buffered all the bytes that were provided to use. Poll::Ready(Ok(buf.len())) } @@ -435,18 +470,28 @@ impl AsyncWrite for EncryptedDiskWriter { self.file.as_mut().poll_flush(cx) } else { let new_self = Pin::into_inner(self); - let buffer = new_self.write_buffer.as_ref(); - match new_self.file.as_mut().poll_write(cx, buffer) { - Poll::Ready(res) => { - let n = res?; - new_self.write_buffer.drain(..n); - // We're immediately ready to do some more flushing! - cx.waker().wake_by_ref(); - // Return pending here because we still need to flush the - // file - Poll::Pending + + // Write as many bytes that we can immediately write + loop { + match new_self + .file + .as_mut() + .poll_write(cx, &new_self.write_buffer) + { + Poll::Ready(Ok(n)) => { + let bytes_written = new_self.write_buffer.drain(..n).collect::>(); + debug!( + "wrote message of size {:?}", + bytes_written.len() - std::mem::size_of::() + ); + + // We buffered all the bytes that were provided to use. + if new_self.write_buffer.is_empty() { + return Poll::Ready(Ok(())); + } + } + poll => return poll.map(|res| res.map(|_| ())), } - Poll::Pending => Poll::Pending, } } } @@ -466,12 +511,12 @@ impl AsyncWrite for EncryptedDiskWriter { if let Some(Ok(bytes)) = maybe_bytes { // We just need to push it into the buffer, we don't really care // about the result, since we can check later - let _ = self.as_mut().poll_write(cx, &bytes); + let _ = self.as_mut().write_buffer.extend_from_slice(&bytes); } // Now wait for us to fully flush out our write buffer - if !self.write_buffer.is_empty() { - return self.poll_flush(cx); + if self.as_mut().poll_flush(cx).is_pending() { + return Poll::Pending; } // Write buffer is empty, flush file