use bufreaders

This commit is contained in:
Edward Shen 2021-04-22 17:52:16 -04:00
parent 3c1388fced
commit f335b99024
Signed by: edward
GPG key ID: 19182661E818369F
5 changed files with 32317 additions and 9 deletions

32305
dhat.out.3975303 Normal file

File diff suppressed because it is too large Load diff

8
src/cache/fs.rs vendored
View file

@ -11,7 +11,7 @@ use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::fs::{create_dir_all, remove_file, File}; use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, BufReader, ReadBuf};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch::{channel, Receiver}; use tokio::sync::watch::{channel, Receiver};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -60,7 +60,7 @@ pub async fn read_file(
WatchStream::new(status), WatchStream::new(status),
)) ))
} else { } else {
CacheStream::Completed(FramedRead::new(file, BytesCodec::new())) CacheStream::Completed(FramedRead::new(BufReader::new(file), BytesCodec::new()))
}; };
Some(Ok((stream, metadata))) Some(Ok((stream, metadata)))
@ -154,7 +154,7 @@ pub async fn write_file(
} }
pub struct ConcurrentFsStream { pub struct ConcurrentFsStream {
file: Pin<Box<File>>, file: Pin<Box<BufReader<File>>>,
receiver: Pin<Box<WatchStream<WritingStatus>>>, receiver: Pin<Box<WatchStream<WritingStatus>>>,
bytes_read: u64, bytes_read: u64,
bytes_total: Option<NonZeroU64>, bytes_total: Option<NonZeroU64>,
@ -172,7 +172,7 @@ impl ConcurrentFsStream {
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self { fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
Self { Self {
file: Box::pin(file), file: Box::pin(BufReader::new(file)),
receiver: Box::pin(receiver), receiver: Box::pin(receiver),
bytes_read: 0, bytes_read: 0,
bytes_total: None, bytes_total: None,

View file

@ -1,13 +1,15 @@
//! Low memory caching stuff //! Low memory caching stuff
use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{atomic::Ordering, Arc}; use std::sync::atomic::{AtomicU64, Ordering};
use std::{path::PathBuf, sync::atomic::AtomicU64}; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use log::LevelFilter; use log::LevelFilter;
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool}; use sqlx::sqlite::SqliteConnectOptions;
use sqlx::{ConnectOptions, SqlitePool};
use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender}; use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;

3
src/cache/mod.rs vendored
View file

@ -13,6 +13,7 @@ use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::BufReader;
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
pub use fs::UpstreamError; pub use fs::UpstreamError;
@ -170,7 +171,7 @@ pub trait Cache: Send + Sync {
pub enum CacheStream { pub enum CacheStream {
Concurrent(ConcurrentFsStream), Concurrent(ConcurrentFsStream),
Memory(MemStream), Memory(MemStream),
Completed(FramedRead<File, BytesCodec>), Completed(FramedRead<BufReader<File>, BytesCodec>),
} }
impl From<CachedImage> for CacheStream { impl From<CachedImage> for CacheStream {

View file

@ -1,4 +1,4 @@
#![warn(clippy::pedantic, clippy::nursery)] #![warn(clippy::pedantic, clippy::nursery, clippy::perf)]
// We're end users, so these is ok // We're end users, so these is ok
#![allow(clippy::module_name_repetitions)] #![allow(clippy::module_name_repetitions)]