Compare commits
2 commits
c488eccc8d
...
6daadefaf1
Author | SHA1 | Date | |
---|---|---|---|
6daadefaf1 | |||
0fcde518c5 |
2 changed files with 27 additions and 10 deletions
32
src/cache/fs.rs
vendored
32
src/cache/fs.rs
vendored
|
@ -15,10 +15,11 @@
|
||||||
//! misses are treated as closer as a cache hit.
|
//! misses are treated as closer as a cache hit.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::convert::TryFrom;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::io::SeekFrom;
|
use std::io::SeekFrom;
|
||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroUsize;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
@ -26,7 +27,7 @@ use std::task::{Context, Poll};
|
||||||
use actix_web::error::PayloadError;
|
use actix_web::error::PayloadError;
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
use futures::{Future, Stream, StreamExt};
|
use futures::{Future, Stream, StreamExt};
|
||||||
use log::{debug, warn};
|
use log::{debug, error, warn};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sodiumoxide::crypto::secretstream::{
|
use sodiumoxide::crypto::secretstream::{
|
||||||
|
@ -236,7 +237,7 @@ where
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let path_buf = path_buf; // moves path buf into async
|
let path_buf = path_buf; // moves path buf into async
|
||||||
let mut errored = false;
|
let mut errored = false;
|
||||||
let mut bytes_written = 0;
|
let mut bytes_written: u32 = 0;
|
||||||
let mut acc_bytes = BytesMut::new();
|
let mut acc_bytes = BytesMut::new();
|
||||||
let accumulate = on_complete.is_some();
|
let accumulate = on_complete.is_some();
|
||||||
writer.write_all(metadata_string.as_bytes()).await?;
|
writer.write_all(metadata_string.as_bytes()).await?;
|
||||||
|
@ -253,7 +254,21 @@ where
|
||||||
0 => break,
|
0 => break,
|
||||||
n => {
|
n => {
|
||||||
bytes.advance(n);
|
bytes.advance(n);
|
||||||
bytes_written += n as u32;
|
let n = if let Ok(n) = u32::try_from(n) {
|
||||||
|
n
|
||||||
|
} else {
|
||||||
|
error!("Tried to advance larger than an u32?");
|
||||||
|
errored = true;
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
let (new_size, overflowed) = bytes_written.overflowing_add(n);
|
||||||
|
if overflowed {
|
||||||
|
error!("File size was larger than u32! Aborting!");
|
||||||
|
errored = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
bytes_written = new_size;
|
||||||
|
|
||||||
// We don't care if we don't have receivers
|
// We don't care if we don't have receivers
|
||||||
let _ = tx.send(WritingStatus::NotDone);
|
let _ = tx.send(WritingStatus::NotDone);
|
||||||
}
|
}
|
||||||
|
@ -393,10 +408,10 @@ pub struct ConcurrentFsStream {
|
||||||
/// this reader will never complete.
|
/// this reader will never complete.
|
||||||
receiver: Pin<Box<WatchStream<WritingStatus>>>,
|
receiver: Pin<Box<WatchStream<WritingStatus>>>,
|
||||||
/// The number of bytes the reader has read
|
/// The number of bytes the reader has read
|
||||||
bytes_read: u32,
|
bytes_read: usize,
|
||||||
/// The number of bytes that the writer has reported it has written. If the
|
/// The number of bytes that the writer has reported it has written. If the
|
||||||
/// writer has not reported yet, this value is None.
|
/// writer has not reported yet, this value is None.
|
||||||
bytes_total: Option<NonZeroU32>,
|
bytes_total: Option<NonZeroUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConcurrentFsStream {
|
impl ConcurrentFsStream {
|
||||||
|
@ -465,7 +480,8 @@ impl Stream for ConcurrentFsStream {
|
||||||
if let Poll::Ready(Some(WritingStatus::Done(n))) =
|
if let Poll::Ready(Some(WritingStatus::Done(n))) =
|
||||||
self.receiver.as_mut().poll_next_unpin(cx)
|
self.receiver.as_mut().poll_next_unpin(cx)
|
||||||
{
|
{
|
||||||
self.bytes_total = Some(NonZeroU32::new(n).expect("Stored a 0 byte image?"))
|
self.bytes_total =
|
||||||
|
Some(NonZeroUsize::new(n as usize).expect("Stored a 0 byte image?"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Okay, now we know if we've read enough bytes or not. If the
|
// Okay, now we know if we've read enough bytes or not. If the
|
||||||
|
@ -488,7 +504,7 @@ impl Stream for ConcurrentFsStream {
|
||||||
Poll::Ready(Some(Ok(Bytes::new())))
|
Poll::Ready(Some(Ok(Bytes::new())))
|
||||||
} else {
|
} else {
|
||||||
// We have data! Give it to the reader!
|
// We have data! Give it to the reader!
|
||||||
self.bytes_read += filled as u32;
|
self.bytes_read += filled;
|
||||||
bytes.truncate(filled);
|
bytes.truncate(filled);
|
||||||
Poll::Ready(Some(Ok(bytes.into())))
|
Poll::Ready(Some(Ok(bytes.into())))
|
||||||
}
|
}
|
||||||
|
|
5
src/cache/mod.rs
vendored
5
src/cache/mod.rs
vendored
|
@ -2,6 +2,7 @@ use std::fmt::Display;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_web::http::HeaderValue;
|
use actix_web::http::HeaderValue;
|
||||||
|
@ -175,7 +176,7 @@ pub trait Cache: Send + Sync {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: Cache> Cache for std::sync::Arc<T> {
|
impl<T: Cache> Cache for Arc<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn get(
|
async fn get(
|
||||||
&self,
|
&self,
|
||||||
|
@ -207,7 +208,7 @@ pub trait CallbackCache: Cache {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: CallbackCache> CallbackCache for std::sync::Arc<T> {
|
impl<T: CallbackCache> CallbackCache for Arc<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
async fn put_with_on_completed_callback(
|
async fn put_with_on_completed_callback(
|
||||||
&self,
|
&self,
|
||||||
|
|
Loading…
Reference in a new issue