Compare commits

..

2 commits

Author SHA1 Message Date
84ea4bea89
update gitignore 2021-04-22 19:44:19 -04:00
2193d74e24
Fix not skipping over metadata 2021-04-22 19:22:34 -04:00
2 changed files with 12 additions and 8 deletions

1
.gitignore vendored
View file

@ -3,3 +3,4 @@
cache cache
flamegraph*.svg flamegraph*.svg
perf.data* perf.data*
dhat.out.*

17
src/cache/fs.rs vendored
View file

@ -6,12 +6,13 @@ use once_cell::sync::Lazy;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use std::io::SeekFrom;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, ReadBuf};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch::{channel, Receiver}; use tokio::sync::watch::{channel, Receiver};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -85,14 +86,15 @@ pub async fn write_file(
file file
}; };
let metadata = serde_json::to_string(&metadata).unwrap();
let metadata_size = metadata.len();
// need owned variant because async lifetime // need owned variant because async lifetime
let path_buf = path.to_path_buf(); let path_buf = path.to_path_buf();
tokio::spawn(async move { tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async let path_buf = path_buf; // moves path buf into async
let mut errored = false; let mut errored = false;
let mut bytes_written: u64 = 0; let mut bytes_written: u64 = 0;
file.write_all(serde_json::to_string(&metadata).unwrap().as_bytes()) file.write_all(&metadata.as_bytes()).await?;
.await?;
while let Some(bytes) = byte_stream.next().await { while let Some(bytes) = byte_stream.next().await {
if let Ok(mut bytes) = bytes { if let Ok(mut bytes) = bytes {
loop { loop {
@ -149,7 +151,7 @@ pub async fn write_file(
}); });
Ok(CacheStream::Concurrent( Ok(CacheStream::Concurrent(
ConcurrentFsStream::new(path, WatchStream::new(rx)).await?, ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?,
)) ))
} }
@ -163,11 +165,12 @@ pub struct ConcurrentFsStream {
impl ConcurrentFsStream { impl ConcurrentFsStream {
async fn new( async fn new(
path: &Path, path: &Path,
seek: usize,
receiver: WatchStream<WritingStatus>, receiver: WatchStream<WritingStatus>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
File::open(path) let mut file = File::open(path).await?;
.await file.seek(SeekFrom::Start(seek as u64)).await?;
.map(|file| Self::from_file(file, receiver)) Ok(Self::from_file(file, receiver))
} }
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self { fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {