diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 0872c96..c5d22fb 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -12,7 +12,7 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{create_dir_all, remove_file, File}; -use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, ReadBuf}; +use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch::{channel, Receiver}; use tokio::sync::RwLock; @@ -61,7 +61,7 @@ pub async fn read_file( WatchStream::new(status), )) } else { - CacheStream::Completed(FramedRead::new(file, BytesCodec::new())) + CacheStream::Completed(FramedRead::new(BufReader::new(file), BytesCodec::new())) }; Some(Ok((stream, metadata))) @@ -156,7 +156,7 @@ pub async fn write_file( } pub struct ConcurrentFsStream { - file: Pin>, + file: Pin>>, receiver: Pin>>, bytes_read: u64, bytes_total: Option, @@ -175,7 +175,7 @@ impl ConcurrentFsStream { fn from_file(file: File, receiver: WatchStream) -> Self { Self { - file: Box::pin(file), + file: Box::pin(BufReader::new(file)), receiver: Box::pin(receiver), bytes_read: 0, bytes_total: None, diff --git a/src/cache/mod.rs b/src/cache/mod.rs index bd778a4..ec21e53 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -12,7 +12,7 @@ use fs::ConcurrentFsStream; use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::fs::File; +use tokio::{fs::File, io::BufReader}; use tokio_util::codec::{BytesCodec, FramedRead}; pub use fs::UpstreamError; @@ -170,7 +170,7 @@ pub trait Cache: Send + Sync { pub enum CacheStream { Concurrent(ConcurrentFsStream), Memory(MemStream), - Completed(FramedRead), + Completed(FramedRead, BytesCodec>), } impl From for CacheStream {