From bd306455bc795ce1c64ec3be37663f0cc347b963 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Sun, 18 Jul 2021 11:37:39 -0400 Subject: [PATCH] Use ReaderStream --- src/cache/fs.rs | 5 ++--- src/cache/mem.rs | 4 ++-- src/cache/mod.rs | 11 ++++------- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/cache/fs.rs b/src/cache/fs.rs index ec41b89..62f592f 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -35,7 +35,7 @@ use tokio::io::{ ReadBuf, }; use tokio::sync::mpsc::Sender; -use tokio_util::codec::{BytesCodec, FramedRead}; +use tokio_util::io::ReaderStream; use tracing::{debug, instrument, warn}; use super::compat::LegacyImageMetadata; @@ -122,8 +122,7 @@ pub(super) async fn read_file( // successfully decoded the data; otherwise the file is garbage. if let Some(reader) = reader { - let stream = - CacheStream::Completed(FramedRead::new(reader as Pin>, BytesCodec::new())); + let stream = CacheStream::Completed(ReaderStream::new(reader)); parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata))) } else { debug!("Reader was invalid, file is corrupt"); diff --git a/src/cache/mem.rs b/src/cache/mem.rs index 4a1f282..751de35 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -322,7 +322,7 @@ mod test_util { use parking_lot::Mutex; use tokio::io::BufReader; use tokio::sync::mpsc::Sender; - use tokio_util::codec::{BytesCodec, FramedRead}; + use tokio_util::io::ReaderStream; #[derive(Default)] pub struct TestDiskCache( @@ -347,7 +347,7 @@ mod test_util { let reader = Box::pin(BufReader::new(tokio_util::io::StreamReader::new( tokio_stream::once(Ok::<_, std::io::Error>(image)), ))); - let stream = CacheStream::Completed(FramedRead::new(reader, BytesCodec::new())); + let stream = CacheStream::Completed(ReaderStream::new(reader)); self.0.lock().get_mut().insert(key, Ok((stream, metadata))); Ok(()) } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 19bd5cd..38fa167 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use actix_web::http::HeaderValue; use async_trait::async_trait; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use chacha20::Key; use chrono::{DateTime, FixedOffset}; use futures::{Stream, StreamExt}; @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use thiserror::Error; use tokio::sync::mpsc::Sender; -use tokio_util::codec::{BytesCodec, FramedRead}; +use tokio_util::io::ReaderStream; pub use disk::DiskCache; pub use fs::UpstreamError; @@ -252,7 +252,7 @@ pub struct CacheEntry { pub enum CacheStream { Memory(MemStream), - Completed(FramedRead>, BytesCodec>), + Completed(ReaderStream>>), } impl From for CacheStream { @@ -269,10 +269,7 @@ impl Stream for CacheStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.get_mut() { Self::Memory(stream) => stream.poll_next_unpin(cx), - Self::Completed(stream) => stream - .poll_next_unpin(cx) - .map_ok(BytesMut::freeze) - .map_err(|_| UpstreamError), + Self::Completed(stream) => stream.poll_next_unpin(cx).map_err(|_| UpstreamError), } } }