diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 8c86467..0872c96 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -6,12 +6,13 @@ use once_cell::sync::Lazy; use serde::Deserialize; use std::collections::HashMap; use std::fmt::Display; +use std::io::SeekFrom; use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{create_dir_all, remove_file, File}; -use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; +use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, ReadBuf}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch::{channel, Receiver}; use tokio::sync::RwLock; @@ -85,14 +86,15 @@ pub async fn write_file( file }; + let metadata = serde_json::to_string(&metadata).unwrap(); + let metadata_size = metadata.len(); // need owned variant because async lifetime let path_buf = path.to_path_buf(); tokio::spawn(async move { let path_buf = path_buf; // moves path buf into async let mut errored = false; let mut bytes_written: u64 = 0; - file.write_all(serde_json::to_string(&metadata).unwrap().as_bytes()) - .await?; + file.write_all(&metadata.as_bytes()).await?; while let Some(bytes) = byte_stream.next().await { if let Ok(mut bytes) = bytes { loop { @@ -149,7 +151,7 @@ pub async fn write_file( }); Ok(CacheStream::Concurrent( - ConcurrentFsStream::new(path, WatchStream::new(rx)).await?, + ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?, )) } @@ -163,11 +165,12 @@ pub struct ConcurrentFsStream { impl ConcurrentFsStream { async fn new( path: &Path, + seek: usize, receiver: WatchStream, ) -> Result { - File::open(path) - .await - .map(|file| Self::from_file(file, receiver)) + let mut file = File::open(path).await?; + file.seek(SeekFrom::Start(seek as u64)).await?; + Ok(Self::from_file(file, receiver)) } fn from_file(file: File, receiver: WatchStream) -> Self {