From 8f03aa0236893f9b68307304c23079a1a20f96b0 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Tue, 20 Apr 2021 17:28:02 -0400 Subject: [PATCH] use cleaner concurrentfs implementation --- src/cache/fs.rs | 94 +++++++++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 38 deletions(-) diff --git a/src/cache/fs.rs b/src/cache/fs.rs index f5b8342..28f7a41 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -1,13 +1,13 @@ use actix_web::error::PayloadError; -use bytes::Buf; +use bytes::{Buf, Bytes}; use futures::{Stream, StreamExt}; use log::{debug, error}; use once_cell::sync::Lazy; -use std::collections::HashMap; use std::fmt::Display; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; +use std::{collections::HashMap, num::NonZeroU64}; use tokio::fs::{create_dir_all, remove_file, File}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::sync::mpsc::UnboundedSender; @@ -121,7 +121,7 @@ pub async fn write_file( if errored { let _ = tx.send(WritingStatus::Error); } else { - let _ = tx.send(WritingStatus::Done); + let _ = tx.send(WritingStatus::Done(bytes_written)); } write_lock.remove(&path_buf); @@ -145,6 +145,8 @@ pub async fn write_file( pub struct ConcurrentFsStream { file: Pin>, receiver: Pin>>, + bytes_read: u64, + bytes_total: Option, } impl ConcurrentFsStream { @@ -155,6 +157,8 @@ impl ConcurrentFsStream { Ok(Self { file: Box::pin(File::open(path).await?), receiver: Box::pin(receiver), + bytes_read: 0, + bytes_total: None, }) } } @@ -175,29 +179,55 @@ impl Stream for ConcurrentFsStream { type Item = CacheStreamItem; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.receiver.as_mut().poll_next_unpin(cx) { - Poll::Ready(status) => { - let mut bytes = [0; 1460].to_vec(); - let mut buffer = ReadBuf::new(&mut bytes); - let polled_result = self.file.as_mut().poll_read(cx, &mut buffer); - let filled = buffer.filled().len(); - match (status, filled) { - (Some(WritingStatus::NotDone), 0) => Poll::Pending, - // We got an error, abort the read. - (Some(WritingStatus::Error), _) => Poll::Ready(Some(Err(UpstreamError))), - _ => { - bytes.truncate(filled); - polled_result.map(|_| { - if bytes.is_empty() { - None - } else { - Some(Ok(bytes.into())) - } - }) - } + // First, try to read from the file... + + let mut bytes = [0; 1460].to_vec(); + let mut buffer = ReadBuf::new(&mut bytes); + match self.file.as_mut().poll_read(cx, &mut buffer) { + Poll::Ready(Ok(_)) => (), + Poll::Ready(Err(_)) => return Poll::Ready(Some(Err(UpstreamError))), + Poll::Pending => return Poll::Pending, + } + + // At this point, we know that we "successfully" read some amount of + // data. Let's see if there's actual data in there... + + let filled = buffer.filled().len(); + if filled == 0 { + // Filled is zero, which indicates two situations: + // 1. We are actually done. + // 2. We read to the EOF while the writer is still writing to it. + + // To handle the second case, we need to see the status the writer: + + if let Poll::Ready(Some(WritingStatus::Done(n))) = + self.receiver.as_mut().poll_next_unpin(cx) + { + self.bytes_total = Some(NonZeroU64::new(n).unwrap()) + } + + // Okay, now we know if we've read enough bytes or not. If the + // writer hasn't told use that it's done yet, then we know that + // there must be more bytes to read from. + + if let Some(bytes_total) = self.bytes_total { + if bytes_total.get() == self.bytes_read { + // We matched the number of bytes the writer said it wrote, + // so we're finally done + return Poll::Ready(None); } } - Poll::Pending => Poll::Pending, + + // We haven't read enough bytes, so just return an empty bytes and + // have the executor request some bytes some time in the future. + // + // This case might be solved by io_uring, but for now this is this + // the best we can do. + Poll::Ready(Some(Ok(Bytes::new()))) + } else { + // We have data! Give it to the reader! + self.bytes_read += filled as u64; + Poll::Ready(Some(Ok(bytes.into()))) } } } @@ -211,19 +241,7 @@ impl From for actix_web::Error { #[derive(Debug, Clone, Copy)] enum WritingStatus { - NotDone = 0, - Done, + NotDone, + Done(u64), Error, } - -impl From for WritingStatus { - #[inline] - fn from(v: u8) -> Self { - match v { - 0 => Self::NotDone, - 1 => Self::Done, - 2 => Self::Error, - _ => unreachable!(), - } - } -}