Compare commits

..

4 commits

3 changed files with 231 additions and 23 deletions

1
.gitignore vendored
View file

@ -5,3 +5,4 @@ flamegraph*.svg
perf.data*
dhat.out.*
settings.yaml
tarpaulin-report.html

2
src/cache/disk.rs vendored
View file

@ -378,6 +378,7 @@ mod db {
use super::*;
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn get() -> Result<(), Box<dyn Error>> {
let (cache, _) = DiskCache::in_memory();
let path = PathBuf::from_str("a/b/c")?;
@ -409,6 +410,7 @@ mod db {
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn put() -> Result<(), Box<dyn Error>> {
let (cache, _) = DiskCache::in_memory();
let path = PathBuf::from_str("a/b/c")?;

249
src/cache/fs.rs vendored
View file

@ -104,7 +104,7 @@ async fn read_file(
debug!("header bytes: {:x?}", nonce_bytes);
maybe_header = Some(*XNonce::from_slice(&nonce_bytes));
reader = Some(Box::pin(BufReader::new(EncryptedDiskReader::new(
reader = Some(Box::pin(BufReader::new(EncryptedReader::new(
file,
XNonce::from_slice(XNonce::from_slice(&nonce_bytes)),
key,
@ -138,13 +138,13 @@ async fn read_file(
}
}
struct EncryptedDiskReader {
file: Pin<Box<File>>,
struct EncryptedReader<R> {
file: Pin<Box<R>>,
keystream: XChaCha20,
}
impl EncryptedDiskReader {
fn new(file: File, nonce: &XNonce, key: &Key) -> Self {
impl<R> EncryptedReader<R> {
fn new(file: R, nonce: &XNonce, key: &Key) -> Self {
Self {
file: Box::pin(file),
keystream: XChaCha20::new(key, nonce),
@ -152,20 +152,7 @@ impl EncryptedDiskReader {
}
}
#[async_trait]
pub trait MetadataFetch: AsyncBufRead {
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()>;
}
#[async_trait]
impl<R: AsyncBufRead + Send> MetadataFetch for R {
#[inline]
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()> {
MetadataFuture(&mut self).await
}
}
impl AsyncRead for EncryptedDiskReader {
impl<R: AsyncRead> AsyncRead for EncryptedReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -181,15 +168,30 @@ impl AsyncRead for EncryptedDiskReader {
}
}
struct MetadataFuture<'a, R>(&'a mut Pin<&'a mut R>);
#[async_trait]
pub trait MetadataFetch: AsyncBufRead {
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()>;
}
#[async_trait]
impl<R: AsyncBufRead + Send> MetadataFetch for R {
#[inline]
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()> {
MetadataFuture(self).await
}
}
struct MetadataFuture<'a, R>(Pin<&'a mut R>);
impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
type Output = Result<ImageMetadata, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut filled = 0;
let mut pinned = self.0.as_mut();
loop {
let buf = match self.0.as_mut().poll_fill_buf(cx) {
let buf = match pinned.as_mut().poll_fill_buf(cx) {
Poll::Ready(Ok(buffer)) => buffer,
Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
Poll::Pending => return Poll::Pending,
@ -208,9 +210,11 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
Some(Err(_)) | None => return Poll::Ready(Err(())),
};
assert_ne!(bytes_consumed, 0);
// This needs to be outside the loop because we need to drop the
// reader ref, since that depends on a mut self.
self.0.as_mut().consume(bytes_consumed);
pinned.as_mut().consume(dbg!(bytes_consumed));
return res;
}
}
@ -392,6 +396,7 @@ mod read_file {
use tempfile::tempfile;
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn can_read() {
let mut temp_file = tempfile().unwrap();
temp_file
@ -431,6 +436,7 @@ mod read_file_compat {
use tempfile::tempfile;
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn can_read_legacy() {
let mut temp_file = tempfile().unwrap();
temp_file
@ -457,3 +463,202 @@ mod read_file_compat {
);
}
}
#[cfg(test)]
mod metadata_future {
use std::{collections::VecDeque, io::ErrorKind};
use super::*;
use crate::cache::ImageContentType;
use chrono::DateTime;
#[derive(Default)]
struct TestReader {
fill_buf_events: VecDeque<Poll<std::io::Result<&'static [u8]>>>,
consume_events: VecDeque<usize>,
buffer: Vec<u8>,
}
impl TestReader {
fn new() -> Self {
Self::default()
}
fn push_fill_buf_event(&mut self, event: Poll<std::io::Result<&'static [u8]>>) {
self.fill_buf_events.push_back(event);
}
fn push_consume_event(&mut self, event: usize) {
self.consume_events.push_back(event);
}
}
impl AsyncRead for TestReader {
fn poll_read(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
assert!(self.consume_events.is_empty());
assert!(self
.fill_buf_events
.iter()
.all(|event| matches!(event, Poll::Pending)));
buf.put_slice(&self.as_mut().buffer.drain(..).collect::<Vec<_>>());
Poll::Ready(Ok(()))
}
}
impl AsyncBufRead for TestReader {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<&[u8]>> {
let pinned = Pin::into_inner(self);
match pinned.fill_buf_events.pop_front() {
Some(Poll::Ready(Ok(bytes))) => {
pinned.buffer.extend_from_slice(bytes);
String::from_utf8_lossy(&pinned.buffer);
return Poll::Ready(Ok(pinned.buffer.as_ref()));
}
Some(res @ Poll::Ready(_)) => res,
Some(Poll::Pending) => {
cx.waker().wake_by_ref();
Poll::Pending
}
None => panic!("poll_fill_buf was called but no events are left"),
}
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
assert_eq!(self.as_mut().consume_events.pop_front(), Some(amt));
self.as_mut().buffer.drain(..amt);
}
}
// We don't use the tokio executor here because it relies on epoll, which
// isn't supported by miri
#[test]
fn full_data_is_available() -> Result<(), Box<dyn Error>> {
let content = br#"{"content_type":0,"content_length":708370,"last_modified":"2021-04-13T04:37:41+00:00"}abc"#;
let mut reader = Box::pin(BufReader::new(&content[..]));
let metadata = futures::executor::block_on(async {
MetadataFuture(reader.as_mut())
.await
.map_err(|_| "metadata future returned error")
})?;
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
assert_eq!(metadata.content_length, Some(708370));
assert_eq!(
metadata.last_modified,
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
);
let mut buf = vec![];
futures::executor::block_on(reader.read_to_end(&mut buf))?;
assert_eq!(&buf, b"abc");
Ok(())
}
#[test]
fn data_is_immediately_available_in_chunks() -> Result<(), Box<dyn Error>> {
let mut test_reader = TestReader::new();
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
let msg_1 = br#"modified":"2021-04-13T04:37:41+00:00"}abc"#;
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_1)));
test_reader.push_consume_event(86);
let mut reader = Box::pin(test_reader);
let metadata = futures::executor::block_on(async {
MetadataFuture(reader.as_mut())
.await
.map_err(|_| "metadata future returned error")
})?;
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
assert_eq!(metadata.content_length, Some(708370));
assert_eq!(
metadata.last_modified,
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
);
let mut buf = vec![];
futures::executor::block_on(reader.read_to_end(&mut buf))?;
assert_eq!(&buf, b"abc");
Ok(())
}
#[test]
fn data_is_available_in_chunks() -> Result<(), Box<dyn Error>> {
let mut test_reader = TestReader::new();
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
let msg_1 = br#"modified":"2021-04-13T04:37:41+00:00"}abc"#;
test_reader.push_fill_buf_event(Poll::Pending);
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
test_reader.push_fill_buf_event(Poll::Pending);
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_1)));
test_reader.push_fill_buf_event(Poll::Pending);
test_reader.push_consume_event(86);
let mut reader = Box::pin(test_reader);
let metadata = futures::executor::block_on(async {
MetadataFuture(reader.as_mut())
.await
.map_err(|_| "metadata future returned error")
})?;
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
assert_eq!(metadata.content_length, Some(708370));
assert_eq!(
metadata.last_modified,
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
);
let mut buf = vec![];
futures::executor::block_on(reader.read_to_end(&mut buf))?;
assert_eq!(&buf, b"abc");
Ok(())
}
#[test]
fn underlying_reader_reports_err() -> Result<(), Box<dyn Error>> {
let mut test_reader = TestReader::new();
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
test_reader.push_fill_buf_event(Poll::Pending);
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
test_reader.push_fill_buf_event(Poll::Pending);
test_reader.push_fill_buf_event(Poll::Ready(Err(std::io::Error::new(
ErrorKind::Other,
"sup",
))));
let mut reader = Box::pin(test_reader);
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
assert!(metadata.is_err());
Ok(())
}
#[test]
fn underlying_reader_reports_early_eof() -> Result<(), Box<dyn Error>> {
let mut test_reader = TestReader::new();
test_reader.push_fill_buf_event(Poll::Ready(Ok(&[])));
let mut reader = Box::pin(test_reader);
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
assert!(metadata.is_err());
Ok(())
}
#[test]
fn invalid_metadata() -> Result<(), Box<dyn Error>> {
let mut test_reader = TestReader::new();
// content type is incorrect, should be a number
let msg_0 = br#"{"content_type":"foo","content_length":708370,"last_modified":"2021-04-13T04:37:41+00:00"}"#;
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
let mut reader = Box::pin(test_reader);
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
assert!(metadata.is_err());
Ok(())
}
}