Compare commits
2 commits
c488eccc8d
...
6daadefaf1
Author | SHA1 | Date | |
---|---|---|---|
6daadefaf1 | |||
0fcde518c5 |
2 changed files with 27 additions and 10 deletions
32
src/cache/fs.rs
vendored
32
src/cache/fs.rs
vendored
|
@ -15,10 +15,11 @@
|
|||
//! misses are treated as closer as a cache hit.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use std::error::Error;
|
||||
use std::fmt::Display;
|
||||
use std::io::SeekFrom;
|
||||
use std::num::NonZeroU32;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
@ -26,7 +27,7 @@ use std::task::{Context, Poll};
|
|||
use actix_web::error::PayloadError;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
use log::{debug, warn};
|
||||
use log::{debug, error, warn};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sodiumoxide::crypto::secretstream::{
|
||||
|
@ -236,7 +237,7 @@ where
|
|||
tokio::spawn(async move {
|
||||
let path_buf = path_buf; // moves path buf into async
|
||||
let mut errored = false;
|
||||
let mut bytes_written = 0;
|
||||
let mut bytes_written: u32 = 0;
|
||||
let mut acc_bytes = BytesMut::new();
|
||||
let accumulate = on_complete.is_some();
|
||||
writer.write_all(metadata_string.as_bytes()).await?;
|
||||
|
@ -253,7 +254,21 @@ where
|
|||
0 => break,
|
||||
n => {
|
||||
bytes.advance(n);
|
||||
bytes_written += n as u32;
|
||||
let n = if let Ok(n) = u32::try_from(n) {
|
||||
n
|
||||
} else {
|
||||
error!("Tried to advance larger than an u32?");
|
||||
errored = true;
|
||||
break;
|
||||
};
|
||||
let (new_size, overflowed) = bytes_written.overflowing_add(n);
|
||||
if overflowed {
|
||||
error!("File size was larger than u32! Aborting!");
|
||||
errored = true;
|
||||
break;
|
||||
}
|
||||
bytes_written = new_size;
|
||||
|
||||
// We don't care if we don't have receivers
|
||||
let _ = tx.send(WritingStatus::NotDone);
|
||||
}
|
||||
|
@ -393,10 +408,10 @@ pub struct ConcurrentFsStream {
|
|||
/// this reader will never complete.
|
||||
receiver: Pin<Box<WatchStream<WritingStatus>>>,
|
||||
/// The number of bytes the reader has read
|
||||
bytes_read: u32,
|
||||
bytes_read: usize,
|
||||
/// The number of bytes that the writer has reported it has written. If the
|
||||
/// writer has not reported yet, this value is None.
|
||||
bytes_total: Option<NonZeroU32>,
|
||||
bytes_total: Option<NonZeroUsize>,
|
||||
}
|
||||
|
||||
impl ConcurrentFsStream {
|
||||
|
@ -465,7 +480,8 @@ impl Stream for ConcurrentFsStream {
|
|||
if let Poll::Ready(Some(WritingStatus::Done(n))) =
|
||||
self.receiver.as_mut().poll_next_unpin(cx)
|
||||
{
|
||||
self.bytes_total = Some(NonZeroU32::new(n).expect("Stored a 0 byte image?"))
|
||||
self.bytes_total =
|
||||
Some(NonZeroUsize::new(n as usize).expect("Stored a 0 byte image?"))
|
||||
}
|
||||
|
||||
// Okay, now we know if we've read enough bytes or not. If the
|
||||
|
@ -488,7 +504,7 @@ impl Stream for ConcurrentFsStream {
|
|||
Poll::Ready(Some(Ok(Bytes::new())))
|
||||
} else {
|
||||
// We have data! Give it to the reader!
|
||||
self.bytes_read += filled as u32;
|
||||
self.bytes_read += filled;
|
||||
bytes.truncate(filled);
|
||||
Poll::Ready(Some(Ok(bytes.into())))
|
||||
}
|
||||
|
|
5
src/cache/mod.rs
vendored
5
src/cache/mod.rs
vendored
|
@ -2,6 +2,7 @@ use std::fmt::Display;
|
|||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_web::http::HeaderValue;
|
||||
|
@ -175,7 +176,7 @@ pub trait Cache: Send + Sync {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Cache> Cache for std::sync::Arc<T> {
|
||||
impl<T: Cache> Cache for Arc<T> {
|
||||
#[inline]
|
||||
async fn get(
|
||||
&self,
|
||||
|
@ -207,7 +208,7 @@ pub trait CallbackCache: Cache {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: CallbackCache> CallbackCache for std::sync::Arc<T> {
|
||||
impl<T: CallbackCache> CallbackCache for Arc<T> {
|
||||
#[inline]
|
||||
async fn put_with_on_completed_callback(
|
||||
&self,
|
||||
|
|
Loading…
Reference in a new issue