Compare commits
4 commits
0c78b379f1
...
5338ff81a5
Author | SHA1 | Date | |
---|---|---|---|
5338ff81a5 | |||
53015e116f | |||
7ce974b4f9 | |||
973ece3604 |
3 changed files with 231 additions and 23 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -4,4 +4,5 @@
|
|||
flamegraph*.svg
|
||||
perf.data*
|
||||
dhat.out.*
|
||||
settings.yaml
|
||||
settings.yaml
|
||||
tarpaulin-report.html
|
||||
|
|
2
src/cache/disk.rs
vendored
2
src/cache/disk.rs
vendored
|
@ -378,6 +378,7 @@ mod db {
|
|||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg_attr(miri, ignore)]
|
||||
async fn get() -> Result<(), Box<dyn Error>> {
|
||||
let (cache, _) = DiskCache::in_memory();
|
||||
let path = PathBuf::from_str("a/b/c")?;
|
||||
|
@ -409,6 +410,7 @@ mod db {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg_attr(miri, ignore)]
|
||||
async fn put() -> Result<(), Box<dyn Error>> {
|
||||
let (cache, _) = DiskCache::in_memory();
|
||||
let path = PathBuf::from_str("a/b/c")?;
|
||||
|
|
249
src/cache/fs.rs
vendored
249
src/cache/fs.rs
vendored
|
@ -104,7 +104,7 @@ async fn read_file(
|
|||
debug!("header bytes: {:x?}", nonce_bytes);
|
||||
|
||||
maybe_header = Some(*XNonce::from_slice(&nonce_bytes));
|
||||
reader = Some(Box::pin(BufReader::new(EncryptedDiskReader::new(
|
||||
reader = Some(Box::pin(BufReader::new(EncryptedReader::new(
|
||||
file,
|
||||
XNonce::from_slice(XNonce::from_slice(&nonce_bytes)),
|
||||
key,
|
||||
|
@ -138,13 +138,13 @@ async fn read_file(
|
|||
}
|
||||
}
|
||||
|
||||
struct EncryptedDiskReader {
|
||||
file: Pin<Box<File>>,
|
||||
struct EncryptedReader<R> {
|
||||
file: Pin<Box<R>>,
|
||||
keystream: XChaCha20,
|
||||
}
|
||||
|
||||
impl EncryptedDiskReader {
|
||||
fn new(file: File, nonce: &XNonce, key: &Key) -> Self {
|
||||
impl<R> EncryptedReader<R> {
|
||||
fn new(file: R, nonce: &XNonce, key: &Key) -> Self {
|
||||
Self {
|
||||
file: Box::pin(file),
|
||||
keystream: XChaCha20::new(key, nonce),
|
||||
|
@ -152,20 +152,7 @@ impl EncryptedDiskReader {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait MetadataFetch: AsyncBufRead {
|
||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: AsyncBufRead + Send> MetadataFetch for R {
|
||||
#[inline]
|
||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()> {
|
||||
MetadataFuture(&mut self).await
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for EncryptedDiskReader {
|
||||
impl<R: AsyncRead> AsyncRead for EncryptedReader<R> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
|
@ -181,15 +168,30 @@ impl AsyncRead for EncryptedDiskReader {
|
|||
}
|
||||
}
|
||||
|
||||
struct MetadataFuture<'a, R>(&'a mut Pin<&'a mut R>);
|
||||
#[async_trait]
|
||||
pub trait MetadataFetch: AsyncBufRead {
|
||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: AsyncBufRead + Send> MetadataFetch for R {
|
||||
#[inline]
|
||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()> {
|
||||
MetadataFuture(self).await
|
||||
}
|
||||
}
|
||||
|
||||
struct MetadataFuture<'a, R>(Pin<&'a mut R>);
|
||||
|
||||
impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
||||
type Output = Result<ImageMetadata, ()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut filled = 0;
|
||||
let mut pinned = self.0.as_mut();
|
||||
|
||||
loop {
|
||||
let buf = match self.0.as_mut().poll_fill_buf(cx) {
|
||||
let buf = match pinned.as_mut().poll_fill_buf(cx) {
|
||||
Poll::Ready(Ok(buffer)) => buffer,
|
||||
Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
|
@ -208,9 +210,11 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
|||
Some(Err(_)) | None => return Poll::Ready(Err(())),
|
||||
};
|
||||
|
||||
assert_ne!(bytes_consumed, 0);
|
||||
|
||||
// This needs to be outside the loop because we need to drop the
|
||||
// reader ref, since that depends on a mut self.
|
||||
self.0.as_mut().consume(bytes_consumed);
|
||||
pinned.as_mut().consume(dbg!(bytes_consumed));
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
@ -392,6 +396,7 @@ mod read_file {
|
|||
use tempfile::tempfile;
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg_attr(miri, ignore)]
|
||||
async fn can_read() {
|
||||
let mut temp_file = tempfile().unwrap();
|
||||
temp_file
|
||||
|
@ -431,6 +436,7 @@ mod read_file_compat {
|
|||
use tempfile::tempfile;
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg_attr(miri, ignore)]
|
||||
async fn can_read_legacy() {
|
||||
let mut temp_file = tempfile().unwrap();
|
||||
temp_file
|
||||
|
@ -457,3 +463,202 @@ mod read_file_compat {
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod metadata_future {
|
||||
use std::{collections::VecDeque, io::ErrorKind};
|
||||
|
||||
use super::*;
|
||||
use crate::cache::ImageContentType;
|
||||
use chrono::DateTime;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestReader {
|
||||
fill_buf_events: VecDeque<Poll<std::io::Result<&'static [u8]>>>,
|
||||
consume_events: VecDeque<usize>,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl TestReader {
|
||||
fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn push_fill_buf_event(&mut self, event: Poll<std::io::Result<&'static [u8]>>) {
|
||||
self.fill_buf_events.push_back(event);
|
||||
}
|
||||
|
||||
fn push_consume_event(&mut self, event: usize) {
|
||||
self.consume_events.push_back(event);
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for TestReader {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
assert!(self.consume_events.is_empty());
|
||||
assert!(self
|
||||
.fill_buf_events
|
||||
.iter()
|
||||
.all(|event| matches!(event, Poll::Pending)));
|
||||
buf.put_slice(&self.as_mut().buffer.drain(..).collect::<Vec<_>>());
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncBufRead for TestReader {
|
||||
fn poll_fill_buf(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<std::io::Result<&[u8]>> {
|
||||
let pinned = Pin::into_inner(self);
|
||||
match pinned.fill_buf_events.pop_front() {
|
||||
Some(Poll::Ready(Ok(bytes))) => {
|
||||
pinned.buffer.extend_from_slice(bytes);
|
||||
String::from_utf8_lossy(&pinned.buffer);
|
||||
return Poll::Ready(Ok(pinned.buffer.as_ref()));
|
||||
}
|
||||
Some(res @ Poll::Ready(_)) => res,
|
||||
Some(Poll::Pending) => {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
None => panic!("poll_fill_buf was called but no events are left"),
|
||||
}
|
||||
}
|
||||
|
||||
fn consume(mut self: Pin<&mut Self>, amt: usize) {
|
||||
assert_eq!(self.as_mut().consume_events.pop_front(), Some(amt));
|
||||
self.as_mut().buffer.drain(..amt);
|
||||
}
|
||||
}
|
||||
|
||||
// We don't use the tokio executor here because it relies on epoll, which
|
||||
// isn't supported by miri
|
||||
#[test]
|
||||
fn full_data_is_available() -> Result<(), Box<dyn Error>> {
|
||||
let content = br#"{"content_type":0,"content_length":708370,"last_modified":"2021-04-13T04:37:41+00:00"}abc"#;
|
||||
let mut reader = Box::pin(BufReader::new(&content[..]));
|
||||
let metadata = futures::executor::block_on(async {
|
||||
MetadataFuture(reader.as_mut())
|
||||
.await
|
||||
.map_err(|_| "metadata future returned error")
|
||||
})?;
|
||||
|
||||
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
|
||||
assert_eq!(metadata.content_length, Some(708370));
|
||||
assert_eq!(
|
||||
metadata.last_modified,
|
||||
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
|
||||
);
|
||||
|
||||
let mut buf = vec![];
|
||||
futures::executor::block_on(reader.read_to_end(&mut buf))?;
|
||||
|
||||
assert_eq!(&buf, b"abc");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn data_is_immediately_available_in_chunks() -> Result<(), Box<dyn Error>> {
|
||||
let mut test_reader = TestReader::new();
|
||||
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
|
||||
let msg_1 = br#"modified":"2021-04-13T04:37:41+00:00"}abc"#;
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_1)));
|
||||
test_reader.push_consume_event(86);
|
||||
let mut reader = Box::pin(test_reader);
|
||||
let metadata = futures::executor::block_on(async {
|
||||
MetadataFuture(reader.as_mut())
|
||||
.await
|
||||
.map_err(|_| "metadata future returned error")
|
||||
})?;
|
||||
|
||||
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
|
||||
assert_eq!(metadata.content_length, Some(708370));
|
||||
assert_eq!(
|
||||
metadata.last_modified,
|
||||
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
|
||||
);
|
||||
|
||||
let mut buf = vec![];
|
||||
futures::executor::block_on(reader.read_to_end(&mut buf))?;
|
||||
|
||||
assert_eq!(&buf, b"abc");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn data_is_available_in_chunks() -> Result<(), Box<dyn Error>> {
|
||||
let mut test_reader = TestReader::new();
|
||||
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
|
||||
let msg_1 = br#"modified":"2021-04-13T04:37:41+00:00"}abc"#;
|
||||
test_reader.push_fill_buf_event(Poll::Pending);
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||
test_reader.push_fill_buf_event(Poll::Pending);
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_1)));
|
||||
test_reader.push_fill_buf_event(Poll::Pending);
|
||||
test_reader.push_consume_event(86);
|
||||
let mut reader = Box::pin(test_reader);
|
||||
let metadata = futures::executor::block_on(async {
|
||||
MetadataFuture(reader.as_mut())
|
||||
.await
|
||||
.map_err(|_| "metadata future returned error")
|
||||
})?;
|
||||
|
||||
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
|
||||
assert_eq!(metadata.content_length, Some(708370));
|
||||
assert_eq!(
|
||||
metadata.last_modified,
|
||||
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
|
||||
);
|
||||
|
||||
let mut buf = vec![];
|
||||
futures::executor::block_on(reader.read_to_end(&mut buf))?;
|
||||
|
||||
assert_eq!(&buf, b"abc");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn underlying_reader_reports_err() -> Result<(), Box<dyn Error>> {
|
||||
let mut test_reader = TestReader::new();
|
||||
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
|
||||
test_reader.push_fill_buf_event(Poll::Pending);
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||
test_reader.push_fill_buf_event(Poll::Pending);
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Err(std::io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"sup",
|
||||
))));
|
||||
let mut reader = Box::pin(test_reader);
|
||||
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
|
||||
assert!(metadata.is_err());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn underlying_reader_reports_early_eof() -> Result<(), Box<dyn Error>> {
|
||||
let mut test_reader = TestReader::new();
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Ok(&[])));
|
||||
let mut reader = Box::pin(test_reader);
|
||||
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
|
||||
assert!(metadata.is_err());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_metadata() -> Result<(), Box<dyn Error>> {
|
||||
let mut test_reader = TestReader::new();
|
||||
// content type is incorrect, should be a number
|
||||
let msg_0 = br#"{"content_type":"foo","content_length":708370,"last_modified":"2021-04-13T04:37:41+00:00"}"#;
|
||||
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||
let mut reader = Box::pin(test_reader);
|
||||
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
|
||||
assert!(metadata.is_err());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue