Compare commits

..

2 commits

Author SHA1 Message Date
6daadefaf1
Resolve clippy lints 2021-05-19 23:52:15 -04:00
0fcde518c5
Use qualified instead of absolute 2021-05-19 23:29:18 -04:00
2 changed files with 27 additions and 10 deletions

32
src/cache/fs.rs vendored
View file

@ -15,10 +15,11 @@
//! misses are treated as closer as a cache hit. //! misses are treated as closer as a cache hit.
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom;
use std::error::Error; use std::error::Error;
use std::fmt::Display; use std::fmt::Display;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::num::NonZeroU32; use std::num::NonZeroUsize;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -26,7 +27,7 @@ use std::task::{Context, Poll};
use actix_web::error::PayloadError; use actix_web::error::PayloadError;
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use futures::{Future, Stream, StreamExt}; use futures::{Future, Stream, StreamExt};
use log::{debug, warn}; use log::{debug, error, warn};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::secretstream::{ use sodiumoxide::crypto::secretstream::{
@ -236,7 +237,7 @@ where
tokio::spawn(async move { tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async let path_buf = path_buf; // moves path buf into async
let mut errored = false; let mut errored = false;
let mut bytes_written = 0; let mut bytes_written: u32 = 0;
let mut acc_bytes = BytesMut::new(); let mut acc_bytes = BytesMut::new();
let accumulate = on_complete.is_some(); let accumulate = on_complete.is_some();
writer.write_all(metadata_string.as_bytes()).await?; writer.write_all(metadata_string.as_bytes()).await?;
@ -253,7 +254,21 @@ where
0 => break, 0 => break,
n => { n => {
bytes.advance(n); bytes.advance(n);
bytes_written += n as u32; let n = if let Ok(n) = u32::try_from(n) {
n
} else {
error!("Tried to advance larger than an u32?");
errored = true;
break;
};
let (new_size, overflowed) = bytes_written.overflowing_add(n);
if overflowed {
error!("File size was larger than u32! Aborting!");
errored = true;
break;
}
bytes_written = new_size;
// We don't care if we don't have receivers // We don't care if we don't have receivers
let _ = tx.send(WritingStatus::NotDone); let _ = tx.send(WritingStatus::NotDone);
} }
@ -393,10 +408,10 @@ pub struct ConcurrentFsStream {
/// this reader will never complete. /// this reader will never complete.
receiver: Pin<Box<WatchStream<WritingStatus>>>, receiver: Pin<Box<WatchStream<WritingStatus>>>,
/// The number of bytes the reader has read /// The number of bytes the reader has read
bytes_read: u32, bytes_read: usize,
/// The number of bytes that the writer has reported it has written. If the /// The number of bytes that the writer has reported it has written. If the
/// writer has not reported yet, this value is None. /// writer has not reported yet, this value is None.
bytes_total: Option<NonZeroU32>, bytes_total: Option<NonZeroUsize>,
} }
impl ConcurrentFsStream { impl ConcurrentFsStream {
@ -465,7 +480,8 @@ impl Stream for ConcurrentFsStream {
if let Poll::Ready(Some(WritingStatus::Done(n))) = if let Poll::Ready(Some(WritingStatus::Done(n))) =
self.receiver.as_mut().poll_next_unpin(cx) self.receiver.as_mut().poll_next_unpin(cx)
{ {
self.bytes_total = Some(NonZeroU32::new(n).expect("Stored a 0 byte image?")) self.bytes_total =
Some(NonZeroUsize::new(n as usize).expect("Stored a 0 byte image?"))
} }
// Okay, now we know if we've read enough bytes or not. If the // Okay, now we know if we've read enough bytes or not. If the
@ -488,7 +504,7 @@ impl Stream for ConcurrentFsStream {
Poll::Ready(Some(Ok(Bytes::new()))) Poll::Ready(Some(Ok(Bytes::new())))
} else { } else {
// We have data! Give it to the reader! // We have data! Give it to the reader!
self.bytes_read += filled as u32; self.bytes_read += filled;
bytes.truncate(filled); bytes.truncate(filled);
Poll::Ready(Some(Ok(bytes.into()))) Poll::Ready(Some(Ok(bytes.into())))
} }

5
src/cache/mod.rs vendored
View file

@ -2,6 +2,7 @@ use std::fmt::Display;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_web::http::HeaderValue; use actix_web::http::HeaderValue;
@ -175,7 +176,7 @@ pub trait Cache: Send + Sync {
} }
#[async_trait] #[async_trait]
impl<T: Cache> Cache for std::sync::Arc<T> { impl<T: Cache> Cache for Arc<T> {
#[inline] #[inline]
async fn get( async fn get(
&self, &self,
@ -207,7 +208,7 @@ pub trait CallbackCache: Cache {
} }
#[async_trait] #[async_trait]
impl<T: CallbackCache> CallbackCache for std::sync::Arc<T> { impl<T: CallbackCache> CallbackCache for Arc<T> {
#[inline] #[inline]
async fn put_with_on_completed_callback( async fn put_with_on_completed_callback(
&self, &self,