Compare commits

..

No commits in common. "6daadefaf159864d11e55a8036ec9f830b57401b" and "c488eccc8d3001aff960962c76443f0b2885dcd2" have entirely different histories.

2 changed files with 10 additions and 27 deletions

32
src/cache/fs.rs vendored
View file

@ -15,11 +15,10 @@
//! misses are treated as closer as a cache hit.
use std::collections::HashMap;
use std::convert::TryFrom;
use std::error::Error;
use std::fmt::Display;
use std::io::SeekFrom;
use std::num::NonZeroUsize;
use std::num::NonZeroU32;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
@ -27,7 +26,7 @@ use std::task::{Context, Poll};
use actix_web::error::PayloadError;
use bytes::{Buf, Bytes, BytesMut};
use futures::{Future, Stream, StreamExt};
use log::{debug, error, warn};
use log::{debug, warn};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::secretstream::{
@ -237,7 +236,7 @@ where
tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async
let mut errored = false;
let mut bytes_written: u32 = 0;
let mut bytes_written = 0;
let mut acc_bytes = BytesMut::new();
let accumulate = on_complete.is_some();
writer.write_all(metadata_string.as_bytes()).await?;
@ -254,21 +253,7 @@ where
0 => break,
n => {
bytes.advance(n);
let n = if let Ok(n) = u32::try_from(n) {
n
} else {
error!("Tried to advance larger than an u32?");
errored = true;
break;
};
let (new_size, overflowed) = bytes_written.overflowing_add(n);
if overflowed {
error!("File size was larger than u32! Aborting!");
errored = true;
break;
}
bytes_written = new_size;
bytes_written += n as u32;
// We don't care if we don't have receivers
let _ = tx.send(WritingStatus::NotDone);
}
@ -408,10 +393,10 @@ pub struct ConcurrentFsStream {
/// this reader will never complete.
receiver: Pin<Box<WatchStream<WritingStatus>>>,
/// The number of bytes the reader has read
bytes_read: usize,
bytes_read: u32,
/// The number of bytes that the writer has reported it has written. If the
/// writer has not reported yet, this value is None.
bytes_total: Option<NonZeroUsize>,
bytes_total: Option<NonZeroU32>,
}
impl ConcurrentFsStream {
@ -480,8 +465,7 @@ impl Stream for ConcurrentFsStream {
if let Poll::Ready(Some(WritingStatus::Done(n))) =
self.receiver.as_mut().poll_next_unpin(cx)
{
self.bytes_total =
Some(NonZeroUsize::new(n as usize).expect("Stored a 0 byte image?"))
self.bytes_total = Some(NonZeroU32::new(n).expect("Stored a 0 byte image?"))
}
// Okay, now we know if we've read enough bytes or not. If the
@ -504,7 +488,7 @@ impl Stream for ConcurrentFsStream {
Poll::Ready(Some(Ok(Bytes::new())))
} else {
// We have data! Give it to the reader!
self.bytes_read += filled;
self.bytes_read += filled as u32;
bytes.truncate(filled);
Poll::Ready(Some(Ok(bytes.into())))
}

5
src/cache/mod.rs vendored
View file

@ -2,7 +2,6 @@ use std::fmt::Display;
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use actix_web::http::HeaderValue;
@ -176,7 +175,7 @@ pub trait Cache: Send + Sync {
}
#[async_trait]
impl<T: Cache> Cache for Arc<T> {
impl<T: Cache> Cache for std::sync::Arc<T> {
#[inline]
async fn get(
&self,
@ -208,7 +207,7 @@ pub trait CallbackCache: Cache {
}
#[async_trait]
impl<T: CallbackCache> CallbackCache for Arc<T> {
impl<T: CallbackCache> CallbackCache for std::sync::Arc<T> {
#[inline]
async fn put_with_on_completed_callback(
&self,