Compare commits
2 commits
8cc21f4803
...
84ea4bea89
Author | SHA1 | Date | |
---|---|---|---|
84ea4bea89 | |||
2193d74e24 |
2 changed files with 12 additions and 8 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -3,3 +3,4 @@
|
||||||
cache
|
cache
|
||||||
flamegraph*.svg
|
flamegraph*.svg
|
||||||
perf.data*
|
perf.data*
|
||||||
|
dhat.out.*
|
17
src/cache/fs.rs
vendored
17
src/cache/fs.rs
vendored
|
@ -6,12 +6,13 @@ use once_cell::sync::Lazy;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
|
use std::io::SeekFrom;
|
||||||
use std::num::NonZeroU64;
|
use std::num::NonZeroU64;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use tokio::fs::{create_dir_all, remove_file, File};
|
use tokio::fs::{create_dir_all, remove_file, File};
|
||||||
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
|
use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt, ReadBuf};
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio::sync::watch::{channel, Receiver};
|
use tokio::sync::watch::{channel, Receiver};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
@ -85,14 +86,15 @@ pub async fn write_file(
|
||||||
file
|
file
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let metadata = serde_json::to_string(&metadata).unwrap();
|
||||||
|
let metadata_size = metadata.len();
|
||||||
// need owned variant because async lifetime
|
// need owned variant because async lifetime
|
||||||
let path_buf = path.to_path_buf();
|
let path_buf = path.to_path_buf();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let path_buf = path_buf; // moves path buf into async
|
let path_buf = path_buf; // moves path buf into async
|
||||||
let mut errored = false;
|
let mut errored = false;
|
||||||
let mut bytes_written: u64 = 0;
|
let mut bytes_written: u64 = 0;
|
||||||
file.write_all(serde_json::to_string(&metadata).unwrap().as_bytes())
|
file.write_all(&metadata.as_bytes()).await?;
|
||||||
.await?;
|
|
||||||
while let Some(bytes) = byte_stream.next().await {
|
while let Some(bytes) = byte_stream.next().await {
|
||||||
if let Ok(mut bytes) = bytes {
|
if let Ok(mut bytes) = bytes {
|
||||||
loop {
|
loop {
|
||||||
|
@ -149,7 +151,7 @@ pub async fn write_file(
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(CacheStream::Concurrent(
|
Ok(CacheStream::Concurrent(
|
||||||
ConcurrentFsStream::new(path, WatchStream::new(rx)).await?,
|
ConcurrentFsStream::new(path, metadata_size, WatchStream::new(rx)).await?,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,11 +165,12 @@ pub struct ConcurrentFsStream {
|
||||||
impl ConcurrentFsStream {
|
impl ConcurrentFsStream {
|
||||||
async fn new(
|
async fn new(
|
||||||
path: &Path,
|
path: &Path,
|
||||||
|
seek: usize,
|
||||||
receiver: WatchStream<WritingStatus>,
|
receiver: WatchStream<WritingStatus>,
|
||||||
) -> Result<Self, std::io::Error> {
|
) -> Result<Self, std::io::Error> {
|
||||||
File::open(path)
|
let mut file = File::open(path).await?;
|
||||||
.await
|
file.seek(SeekFrom::Start(seek as u64)).await?;
|
||||||
.map(|file| Self::from_file(file, receiver))
|
Ok(Self::from_file(file, receiver))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
|
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
|
||||||
|
|
Loading…
Reference in a new issue