Compare commits
4 commits
0c78b379f1
...
5338ff81a5
Author | SHA1 | Date | |
---|---|---|---|
5338ff81a5 | |||
53015e116f | |||
7ce974b4f9 | |||
973ece3604 |
3 changed files with 231 additions and 23 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -4,4 +4,5 @@
|
||||||
flamegraph*.svg
|
flamegraph*.svg
|
||||||
perf.data*
|
perf.data*
|
||||||
dhat.out.*
|
dhat.out.*
|
||||||
settings.yaml
|
settings.yaml
|
||||||
|
tarpaulin-report.html
|
||||||
|
|
2
src/cache/disk.rs
vendored
2
src/cache/disk.rs
vendored
|
@ -378,6 +378,7 @@ mod db {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
#[cfg_attr(miri, ignore)]
|
||||||
async fn get() -> Result<(), Box<dyn Error>> {
|
async fn get() -> Result<(), Box<dyn Error>> {
|
||||||
let (cache, _) = DiskCache::in_memory();
|
let (cache, _) = DiskCache::in_memory();
|
||||||
let path = PathBuf::from_str("a/b/c")?;
|
let path = PathBuf::from_str("a/b/c")?;
|
||||||
|
@ -409,6 +410,7 @@ mod db {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
#[cfg_attr(miri, ignore)]
|
||||||
async fn put() -> Result<(), Box<dyn Error>> {
|
async fn put() -> Result<(), Box<dyn Error>> {
|
||||||
let (cache, _) = DiskCache::in_memory();
|
let (cache, _) = DiskCache::in_memory();
|
||||||
let path = PathBuf::from_str("a/b/c")?;
|
let path = PathBuf::from_str("a/b/c")?;
|
||||||
|
|
249
src/cache/fs.rs
vendored
249
src/cache/fs.rs
vendored
|
@ -104,7 +104,7 @@ async fn read_file(
|
||||||
debug!("header bytes: {:x?}", nonce_bytes);
|
debug!("header bytes: {:x?}", nonce_bytes);
|
||||||
|
|
||||||
maybe_header = Some(*XNonce::from_slice(&nonce_bytes));
|
maybe_header = Some(*XNonce::from_slice(&nonce_bytes));
|
||||||
reader = Some(Box::pin(BufReader::new(EncryptedDiskReader::new(
|
reader = Some(Box::pin(BufReader::new(EncryptedReader::new(
|
||||||
file,
|
file,
|
||||||
XNonce::from_slice(XNonce::from_slice(&nonce_bytes)),
|
XNonce::from_slice(XNonce::from_slice(&nonce_bytes)),
|
||||||
key,
|
key,
|
||||||
|
@ -138,13 +138,13 @@ async fn read_file(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct EncryptedDiskReader {
|
struct EncryptedReader<R> {
|
||||||
file: Pin<Box<File>>,
|
file: Pin<Box<R>>,
|
||||||
keystream: XChaCha20,
|
keystream: XChaCha20,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EncryptedDiskReader {
|
impl<R> EncryptedReader<R> {
|
||||||
fn new(file: File, nonce: &XNonce, key: &Key) -> Self {
|
fn new(file: R, nonce: &XNonce, key: &Key) -> Self {
|
||||||
Self {
|
Self {
|
||||||
file: Box::pin(file),
|
file: Box::pin(file),
|
||||||
keystream: XChaCha20::new(key, nonce),
|
keystream: XChaCha20::new(key, nonce),
|
||||||
|
@ -152,20 +152,7 @@ impl EncryptedDiskReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
impl<R: AsyncRead> AsyncRead for EncryptedReader<R> {
|
||||||
pub trait MetadataFetch: AsyncBufRead {
|
|
||||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<R: AsyncBufRead + Send> MetadataFetch for R {
|
|
||||||
#[inline]
|
|
||||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()> {
|
|
||||||
MetadataFuture(&mut self).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for EncryptedDiskReader {
|
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
|
@ -181,15 +168,30 @@ impl AsyncRead for EncryptedDiskReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MetadataFuture<'a, R>(&'a mut Pin<&'a mut R>);
|
#[async_trait]
|
||||||
|
pub trait MetadataFetch: AsyncBufRead {
|
||||||
|
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<R: AsyncBufRead + Send> MetadataFetch for R {
|
||||||
|
#[inline]
|
||||||
|
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()> {
|
||||||
|
MetadataFuture(self).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MetadataFuture<'a, R>(Pin<&'a mut R>);
|
||||||
|
|
||||||
impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
||||||
type Output = Result<ImageMetadata, ()>;
|
type Output = Result<ImageMetadata, ()>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let mut filled = 0;
|
let mut filled = 0;
|
||||||
|
let mut pinned = self.0.as_mut();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let buf = match self.0.as_mut().poll_fill_buf(cx) {
|
let buf = match pinned.as_mut().poll_fill_buf(cx) {
|
||||||
Poll::Ready(Ok(buffer)) => buffer,
|
Poll::Ready(Ok(buffer)) => buffer,
|
||||||
Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
|
Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
|
@ -208,9 +210,11 @@ impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
||||||
Some(Err(_)) | None => return Poll::Ready(Err(())),
|
Some(Err(_)) | None => return Poll::Ready(Err(())),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
assert_ne!(bytes_consumed, 0);
|
||||||
|
|
||||||
// This needs to be outside the loop because we need to drop the
|
// This needs to be outside the loop because we need to drop the
|
||||||
// reader ref, since that depends on a mut self.
|
// reader ref, since that depends on a mut self.
|
||||||
self.0.as_mut().consume(bytes_consumed);
|
pinned.as_mut().consume(dbg!(bytes_consumed));
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -392,6 +396,7 @@ mod read_file {
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
#[cfg_attr(miri, ignore)]
|
||||||
async fn can_read() {
|
async fn can_read() {
|
||||||
let mut temp_file = tempfile().unwrap();
|
let mut temp_file = tempfile().unwrap();
|
||||||
temp_file
|
temp_file
|
||||||
|
@ -431,6 +436,7 @@ mod read_file_compat {
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
#[cfg_attr(miri, ignore)]
|
||||||
async fn can_read_legacy() {
|
async fn can_read_legacy() {
|
||||||
let mut temp_file = tempfile().unwrap();
|
let mut temp_file = tempfile().unwrap();
|
||||||
temp_file
|
temp_file
|
||||||
|
@ -457,3 +463,202 @@ mod read_file_compat {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod metadata_future {
|
||||||
|
use std::{collections::VecDeque, io::ErrorKind};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use crate::cache::ImageContentType;
|
||||||
|
use chrono::DateTime;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TestReader {
|
||||||
|
fill_buf_events: VecDeque<Poll<std::io::Result<&'static [u8]>>>,
|
||||||
|
consume_events: VecDeque<usize>,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestReader {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push_fill_buf_event(&mut self, event: Poll<std::io::Result<&'static [u8]>>) {
|
||||||
|
self.fill_buf_events.push_back(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push_consume_event(&mut self, event: usize) {
|
||||||
|
self.consume_events.push_back(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for TestReader {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<std::io::Result<()>> {
|
||||||
|
assert!(self.consume_events.is_empty());
|
||||||
|
assert!(self
|
||||||
|
.fill_buf_events
|
||||||
|
.iter()
|
||||||
|
.all(|event| matches!(event, Poll::Pending)));
|
||||||
|
buf.put_slice(&self.as_mut().buffer.drain(..).collect::<Vec<_>>());
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncBufRead for TestReader {
|
||||||
|
fn poll_fill_buf(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<std::io::Result<&[u8]>> {
|
||||||
|
let pinned = Pin::into_inner(self);
|
||||||
|
match pinned.fill_buf_events.pop_front() {
|
||||||
|
Some(Poll::Ready(Ok(bytes))) => {
|
||||||
|
pinned.buffer.extend_from_slice(bytes);
|
||||||
|
String::from_utf8_lossy(&pinned.buffer);
|
||||||
|
return Poll::Ready(Ok(pinned.buffer.as_ref()));
|
||||||
|
}
|
||||||
|
Some(res @ Poll::Ready(_)) => res,
|
||||||
|
Some(Poll::Pending) => {
|
||||||
|
cx.waker().wake_by_ref();
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
None => panic!("poll_fill_buf was called but no events are left"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(mut self: Pin<&mut Self>, amt: usize) {
|
||||||
|
assert_eq!(self.as_mut().consume_events.pop_front(), Some(amt));
|
||||||
|
self.as_mut().buffer.drain(..amt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't use the tokio executor here because it relies on epoll, which
|
||||||
|
// isn't supported by miri
|
||||||
|
#[test]
|
||||||
|
fn full_data_is_available() -> Result<(), Box<dyn Error>> {
|
||||||
|
let content = br#"{"content_type":0,"content_length":708370,"last_modified":"2021-04-13T04:37:41+00:00"}abc"#;
|
||||||
|
let mut reader = Box::pin(BufReader::new(&content[..]));
|
||||||
|
let metadata = futures::executor::block_on(async {
|
||||||
|
MetadataFuture(reader.as_mut())
|
||||||
|
.await
|
||||||
|
.map_err(|_| "metadata future returned error")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
|
||||||
|
assert_eq!(metadata.content_length, Some(708370));
|
||||||
|
assert_eq!(
|
||||||
|
metadata.last_modified,
|
||||||
|
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut buf = vec![];
|
||||||
|
futures::executor::block_on(reader.read_to_end(&mut buf))?;
|
||||||
|
|
||||||
|
assert_eq!(&buf, b"abc");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn data_is_immediately_available_in_chunks() -> Result<(), Box<dyn Error>> {
|
||||||
|
let mut test_reader = TestReader::new();
|
||||||
|
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
|
||||||
|
let msg_1 = br#"modified":"2021-04-13T04:37:41+00:00"}abc"#;
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_1)));
|
||||||
|
test_reader.push_consume_event(86);
|
||||||
|
let mut reader = Box::pin(test_reader);
|
||||||
|
let metadata = futures::executor::block_on(async {
|
||||||
|
MetadataFuture(reader.as_mut())
|
||||||
|
.await
|
||||||
|
.map_err(|_| "metadata future returned error")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
|
||||||
|
assert_eq!(metadata.content_length, Some(708370));
|
||||||
|
assert_eq!(
|
||||||
|
metadata.last_modified,
|
||||||
|
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut buf = vec![];
|
||||||
|
futures::executor::block_on(reader.read_to_end(&mut buf))?;
|
||||||
|
|
||||||
|
assert_eq!(&buf, b"abc");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn data_is_available_in_chunks() -> Result<(), Box<dyn Error>> {
|
||||||
|
let mut test_reader = TestReader::new();
|
||||||
|
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
|
||||||
|
let msg_1 = br#"modified":"2021-04-13T04:37:41+00:00"}abc"#;
|
||||||
|
test_reader.push_fill_buf_event(Poll::Pending);
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||||
|
test_reader.push_fill_buf_event(Poll::Pending);
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_1)));
|
||||||
|
test_reader.push_fill_buf_event(Poll::Pending);
|
||||||
|
test_reader.push_consume_event(86);
|
||||||
|
let mut reader = Box::pin(test_reader);
|
||||||
|
let metadata = futures::executor::block_on(async {
|
||||||
|
MetadataFuture(reader.as_mut())
|
||||||
|
.await
|
||||||
|
.map_err(|_| "metadata future returned error")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
assert_eq!(metadata.content_type, Some(ImageContentType::Png));
|
||||||
|
assert_eq!(metadata.content_length, Some(708370));
|
||||||
|
assert_eq!(
|
||||||
|
metadata.last_modified,
|
||||||
|
Some(DateTime::parse_from_rfc3339("2021-04-13T04:37:41+00:00")?)
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut buf = vec![];
|
||||||
|
futures::executor::block_on(reader.read_to_end(&mut buf))?;
|
||||||
|
|
||||||
|
assert_eq!(&buf, b"abc");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn underlying_reader_reports_err() -> Result<(), Box<dyn Error>> {
|
||||||
|
let mut test_reader = TestReader::new();
|
||||||
|
let msg_0 = br#"{"content_type":0,"content_length":708370,"last_"#;
|
||||||
|
test_reader.push_fill_buf_event(Poll::Pending);
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||||
|
test_reader.push_fill_buf_event(Poll::Pending);
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Err(std::io::Error::new(
|
||||||
|
ErrorKind::Other,
|
||||||
|
"sup",
|
||||||
|
))));
|
||||||
|
let mut reader = Box::pin(test_reader);
|
||||||
|
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
|
||||||
|
assert!(metadata.is_err());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn underlying_reader_reports_early_eof() -> Result<(), Box<dyn Error>> {
|
||||||
|
let mut test_reader = TestReader::new();
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Ok(&[])));
|
||||||
|
let mut reader = Box::pin(test_reader);
|
||||||
|
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
|
||||||
|
assert!(metadata.is_err());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn invalid_metadata() -> Result<(), Box<dyn Error>> {
|
||||||
|
let mut test_reader = TestReader::new();
|
||||||
|
// content type is incorrect, should be a number
|
||||||
|
let msg_0 = br#"{"content_type":"foo","content_length":708370,"last_modified":"2021-04-13T04:37:41+00:00"}"#;
|
||||||
|
test_reader.push_fill_buf_event(Poll::Ready(Ok(msg_0)));
|
||||||
|
let mut reader = Box::pin(test_reader);
|
||||||
|
let metadata = futures::executor::block_on(MetadataFuture(reader.as_mut()));
|
||||||
|
assert!(metadata.is_err());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue