Compare commits

...

4 commits

Author SHA1 Message Date
de17c738d2
remove todos 2021-04-18 17:11:30 -04:00
6717fbe20b
fix caching 2021-04-18 17:06:40 -04:00
63a2e0beb1
initial work into lowmem 2021-04-17 23:19:27 -04:00
6181486827
fix conflicts_with for low mem mode 2021-04-17 22:13:36 -04:00
7 changed files with 224 additions and 118 deletions

56
src/cache/fs.rs vendored
View file

@ -1,20 +1,21 @@
use std::collections::HashMap; use actix_web::HttpResponse;
use bytes::BytesMut;
use futures::{Future, Stream, StreamExt};
use once_cell::sync::Lazy;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use std::{collections::HashMap, fmt::Display};
use bytes::{Bytes, BytesMut};
use futures::{Future, Stream, StreamExt};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use reqwest::Error;
use tokio::fs::{remove_file, File}; use tokio::fs::{remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::sync::RwLock;
use tokio::time::Sleep; use tokio::time::Sleep;
use super::{BoxedImageStream, CacheStreamItem};
/// Keeps track of files that are currently being written to. /// Keeps track of files that are currently being written to.
/// ///
/// Why is this necessary? Consider the following situation: /// Why is this necessary? Consider the following situation:
@ -35,14 +36,15 @@ static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Arc<CacheStatus>>>> =
Lazy::new(|| RwLock::new(HashMap::new())); Lazy::new(|| RwLock::new(HashMap::new()));
/// Tries to read from the file, returning a byte stream if it exists /// Tries to read from the file, returning a byte stream if it exists
pub async fn read_file(path: &Path) -> Option<Result<FromFsStream, std::io::Error>> { pub async fn read_file(path: &Path) -> Option<Result<FsStream, std::io::Error>> {
if path.exists() { if path.exists() {
let status = WRITING_STATUS let status = WRITING_STATUS
.read() .read()
.await
.get(path) .get(path)
.map_or_else(|| Arc::new(CacheStatus::done()), Arc::clone); .map_or_else(|| Arc::new(CacheStatus::done()), Arc::clone);
Some(FromFsStream::new(path, status).await) Some(FsStream::new(path, status).await)
} else { } else {
None None
} }
@ -50,14 +52,14 @@ pub async fn read_file(path: &Path) -> Option<Result<FromFsStream, std::io::Erro
/// Maps the input byte stream into one that writes to disk instead, returning /// Maps the input byte stream into one that writes to disk instead, returning
/// a stream that reads from disk instead. /// a stream that reads from disk instead.
pub async fn transparent_file_stream( pub async fn write_file(
path: &Path, path: &Path,
mut byte_stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + Send + 'static, mut byte_stream: BoxedImageStream,
) -> Result<FromFsStream, std::io::Error> { ) -> Result<FsStream, std::io::Error> {
let done_writing_flag = Arc::new(CacheStatus::new()); let done_writing_flag = Arc::new(CacheStatus::new());
let mut file = { let mut file = {
let mut write_lock = WRITING_STATUS.write(); let mut write_lock = WRITING_STATUS.write().await;
let file = File::create(path).await?; // we need to make sure the file exists and is truncated. let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
write_lock.insert(path.to_path_buf(), Arc::clone(&done_writing_flag)); write_lock.insert(path.to_path_buf(), Arc::clone(&done_writing_flag));
file file
@ -87,7 +89,7 @@ pub async fn transparent_file_stream(
file.sync_all().await?; // we need metadata file.sync_all().await?; // we need metadata
} }
let mut write_lock = WRITING_STATUS.write(); let mut write_lock = WRITING_STATUS.write().await;
// This needs to be written atomically with the write lock, else // This needs to be written atomically with the write lock, else
// it's possible we have an inconsistent state // it's possible we have an inconsistent state
if errored { if errored {
@ -101,16 +103,16 @@ pub async fn transparent_file_stream(
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
}); });
Ok(FromFsStream::new(path, done_writing_flag).await?) Ok(FsStream::new(path, done_writing_flag).await?)
} }
pub struct FromFsStream { pub struct FsStream {
file: Pin<Box<File>>, file: Pin<Box<File>>,
sleep: Pin<Box<Sleep>>, sleep: Pin<Box<Sleep>>,
is_file_done_writing: Arc<CacheStatus>, is_file_done_writing: Arc<CacheStatus>,
} }
impl FromFsStream { impl FsStream {
async fn new(path: &Path, is_done: Arc<CacheStatus>) -> Result<Self, std::io::Error> { async fn new(path: &Path, is_done: Arc<CacheStatus>) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
file: Box::pin(File::open(path).await?), file: Box::pin(File::open(path).await?),
@ -122,10 +124,19 @@ impl FromFsStream {
} }
/// Represents some upstream error. /// Represents some upstream error.
#[derive(Debug)]
pub struct UpstreamError; pub struct UpstreamError;
impl Stream for FromFsStream { impl std::error::Error for UpstreamError {}
type Item = Result<Bytes, UpstreamError>;
impl Display for UpstreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "An upstream error occurred")
}
}
impl Stream for FsStream {
type Item = CacheStreamItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let status = self.is_file_done_writing.load(); let status = self.is_file_done_writing.load();
@ -147,6 +158,13 @@ impl Stream for FromFsStream {
} }
} }
impl From<UpstreamError> for actix_web::Error {
#[inline]
fn from(_: UpstreamError) -> Self {
HttpResponse::BadGateway().finish().into()
}
}
struct CacheStatus(AtomicU8); struct CacheStatus(AtomicU8);
impl CacheStatus { impl CacheStatus {

View file

@ -2,12 +2,15 @@ use std::path::PathBuf;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures::{stream::StreamExt, TryStreamExt};
use log::{debug, warn}; use log::{debug, warn};
use lru::LruCache; use lru::LruCache;
use tokio::fs::{remove_file, File}; use tokio::fs::{remove_file, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::{Cache, CacheKey, CachedImage, ImageMetadata}; use super::{
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CachedImage, ImageMetadata,
};
pub struct GenerationalCache { pub struct GenerationalCache {
in_memory: LruCache<CacheKey, (CachedImage, ImageMetadata)>, in_memory: LruCache<CacheKey, (CachedImage, ImageMetadata)>,
@ -132,9 +135,16 @@ impl GenerationalCache {
#[async_trait] #[async_trait]
impl Cache for GenerationalCache { impl Cache for GenerationalCache {
async fn get(&mut self, key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> { async fn get(
&mut self,
key: &CacheKey,
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> {
if self.in_memory.contains(key) { if self.in_memory.contains(key) {
return self.in_memory.get(key); return self
.in_memory
.get(key)
// TODO: get rid of clone?
.map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata)));
} }
if let Some(metadata) = self.on_disk.pop(key) { if let Some(metadata) = self.on_disk.pop(key) {
@ -149,7 +159,7 @@ impl Cache for GenerationalCache {
let mut buffer = metadata let mut buffer = metadata
.content_length .content_length
.map_or_else(Vec::new, Vec::with_capacity); .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize));
match file { match file {
Ok(mut file) => { Ok(mut file) => {
@ -173,20 +183,30 @@ impl Cache for GenerationalCache {
buffer.shrink_to_fit(); buffer.shrink_to_fit();
self.disk_cur_size -= buffer.len() as u64; self.disk_cur_size -= buffer.len() as u64;
let image = CachedImage(Bytes::from(buffer)); let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
// Since we just put it in the in-memory cache it should be there return Some(self.put(key.clone(), Box::new(image), metadata).await);
// when we retrieve it
self.put(key.clone(), image, metadata).await;
return self.get(key).await;
} }
None None
} }
#[inline] async fn put(
async fn put(&mut self, key: CacheKey, image: CachedImage, metadata: ImageMetadata) { &mut self,
key: CacheKey,
mut image: BoxedImageStream,
metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError> {
let mut hot_evicted = vec![]; let mut hot_evicted = vec![];
let image = {
let mut resolved = vec![];
while let Some(bytes) = image.next().await {
resolved.extend(bytes?);
}
CachedImage(Bytes::from(resolved))
};
let new_img_size = image.0.len() as u64; let new_img_size = image.0.len() as u64;
if self.memory_max_size >= new_img_size { if self.memory_max_size >= new_img_size {
@ -204,17 +224,19 @@ impl Cache for GenerationalCache {
} }
} }
self.in_memory.put(key, (image, metadata)); self.in_memory.put(key.clone(), (image, metadata));
self.memory_cur_size += new_img_size; self.memory_cur_size += new_img_size;
} else { } else {
// Image was larger than memory capacity, push directly into cold // Image was larger than memory capacity, push directly into cold
// storage. // storage.
self.push_into_cold(key, image, metadata).await; self.push_into_cold(key.clone(), image, metadata).await;
}; };
// Push evicted hot entires into cold storage. // Push evicted hot entires into cold storage.
for (key, image, metadata) in hot_evicted { for (key, image, metadata) in hot_evicted {
self.push_into_cold(key, image, metadata).await; self.push_into_cold(key, image, metadata).await;
} }
self.get(&key).await.unwrap()
} }
} }

39
src/cache/low_mem.rs vendored
View file

@ -1,16 +1,14 @@
//! Low memory caching stuff //! Low memory caching stuff
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use lru::LruCache; use lru::LruCache;
use super::{fs::FromFsStream, ByteStream, Cache, CacheKey}; use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
pub struct LowMemCache { pub struct LowMemCache {
on_disk: LruCache<CacheKey, ()>, on_disk: LruCache<CacheKey, ImageMetadata>,
disk_path: PathBuf, disk_path: PathBuf,
disk_max_size: u64, disk_max_size: u64,
disk_cur_size: u64, disk_cur_size: u64,
@ -27,18 +25,37 @@ impl LowMemCache {
} }
} }
// todo: schedule eviction
#[async_trait] #[async_trait]
impl Cache for LowMemCache { impl Cache for LowMemCache {
async fn get_stream(&mut self, key: &CacheKey) -> Option<Result<FromFsStream, std::io::Error>> { async fn get(
if self.on_disk.get(key).is_some() { &mut self,
super::fs::read_file(Path::new(&key.to_string())).await key: &CacheKey,
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> {
if let Some(metadata) = self.on_disk.get(key) {
let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
super::fs::read_file(&path).await.map(|res| {
res.map(|stream| (CacheStream::Fs(stream), metadata))
.map_err(Into::into)
})
} else { } else {
None None
} }
} }
async fn put_stream(&mut self, key: CacheKey, image: ByteStream) { async fn put(
// this call has a side effect and the returned future is for reading &mut self,
let _ = super::fs::transparent_file_stream(&PathBuf::from(key), image); key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError> {
let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
self.on_disk.put(key.clone(), metadata);
super::fs::write_file(&path, image)
.await
.map(CacheStream::Fs)
.map(move |stream| (stream, self.on_disk.get(&key).unwrap()))
.map_err(Into::into)
} }
} }

111
src/cache/mod.rs vendored
View file

@ -1,17 +1,21 @@
use std::fmt::Display;
use std::path::PathBuf; use std::path::PathBuf;
use std::{fmt::Display, str::FromStr}; use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use actix_web::http::HeaderValue; use actix_web::http::HeaderValue;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use futures::Stream; use fs::FsStream;
use futures::{Stream, StreamExt};
use thiserror::Error;
pub use fs::UpstreamError;
pub use generational::GenerationalCache; pub use generational::GenerationalCache;
pub use low_mem::LowMemCache; pub use low_mem::LowMemCache;
use self::fs::FromFsStream;
mod fs; mod fs;
mod generational; mod generational;
mod low_mem; mod low_mem;
@ -36,23 +40,23 @@ impl From<CacheKey> for PathBuf {
} }
} }
#[derive(Clone)]
pub struct CachedImage(pub Bytes); pub struct CachedImage(pub Bytes);
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct ImageMetadata { pub struct ImageMetadata {
pub content_type: Option<ImageContentType>, pub content_type: Option<ImageContentType>,
pub content_length: Option<usize>, // If we can guarantee a non-zero u32 here we can save 4 bytes
pub content_length: Option<u32>,
pub last_modified: Option<DateTime<FixedOffset>>, pub last_modified: Option<DateTime<FixedOffset>>,
} }
// Note to self: If these are wrong blame Triscuit 9 // Confirmed by Ply to be these types: https://link.eddie.sh/ZXfk0
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub enum ImageContentType { pub enum ImageContentType {
Png, Png,
Jpeg, Jpeg,
Gif, Gif,
Bmp,
Tif,
} }
pub struct InvalidContentType; pub struct InvalidContentType;
@ -66,8 +70,6 @@ impl FromStr for ImageContentType {
"image/png" => Ok(Self::Png), "image/png" => Ok(Self::Png),
"image/jpeg" => Ok(Self::Jpeg), "image/jpeg" => Ok(Self::Jpeg),
"image/gif" => Ok(Self::Gif), "image/gif" => Ok(Self::Gif),
"image/bmp" => Ok(Self::Bmp),
"image/tif" => Ok(Self::Tif),
_ => Err(InvalidContentType), _ => Err(InvalidContentType),
} }
} }
@ -80,8 +82,6 @@ impl AsRef<str> for ImageContentType {
Self::Png => "image/png", Self::Png => "image/png",
Self::Jpeg => "image/jpeg", Self::Jpeg => "image/jpeg",
Self::Gif => "image/gif", Self::Gif => "image/gif",
Self::Bmp => "image/bmp",
Self::Tif => "image/tif",
} }
} }
} }
@ -130,37 +130,78 @@ impl ImageMetadata {
} }
} }
type BoxedImageStream = Box<dyn Stream<Item = Result<Bytes, CacheError>> + Unpin + Send>;
#[derive(Error, Debug)]
pub enum CacheError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error(transparent)]
Upstream(#[from] UpstreamError),
}
#[async_trait] #[async_trait]
pub trait Cache: Send + Sync { pub trait Cache: Send + Sync {
async fn get(&mut self, _key: &CacheKey) -> Option<&(CachedImage, ImageMetadata)> { async fn get(
unimplemented!()
}
async fn put(&mut self, _key: CacheKey, _image: CachedImage, _metadata: ImageMetadata) {
unimplemented!()
}
async fn get_stream(
&mut self, &mut self,
_key: &CacheKey, key: &CacheKey,
) -> Option<Result<FromFsStream, std::io::Error>> { ) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>>;
unimplemented!() async fn put(
} &mut self,
key: CacheKey,
image: BoxedImageStream,
metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError>;
}
async fn put_stream(&mut self, _key: CacheKey, _image: ByteStream) { pub enum CacheStream {
unimplemented!() Fs(FsStream),
Memory(MemStream),
}
impl From<CachedImage> for CacheStream {
fn from(image: CachedImage) -> Self {
Self::Memory(MemStream(image.0))
} }
} }
pub enum ByteStream {} type CacheStreamItem = Result<Bytes, UpstreamError>;
impl Stream for ByteStream { impl Stream for CacheStream {
type Item = Result<Bytes, reqwest::Error>; type Item = CacheStreamItem;
fn poll_next( fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self: std::pin::Pin<&mut Self>, match self.get_mut() {
cx: &mut std::task::Context<'_>, Self::Fs(stream) => stream.poll_next_unpin(cx),
) -> std::task::Poll<Option<Self::Item>> { Self::Memory(stream) => stream.poll_next_unpin(cx),
todo!() }
}
}
pub struct MemStream(Bytes);
impl Stream for MemStream {
type Item = CacheStreamItem;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut new_bytes = Bytes::new();
std::mem::swap(&mut self.0, &mut new_bytes);
if new_bytes.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(new_bytes)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metadata_size() {
assert_eq!(std::mem::size_of::<ImageMetadata>(), 32);
} }
} }

View file

@ -2,7 +2,7 @@ use std::num::{NonZeroU16, NonZeroU64};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use clap::Clap; use clap::{crate_authors, crate_description, crate_version, Clap};
// Validate tokens is an atomic because it's faster than locking on rwlock. // Validate tokens is an atomic because it's faster than locking on rwlock.
pub static VALIDATE_TOKENS: AtomicBool = AtomicBool::new(false); pub static VALIDATE_TOKENS: AtomicBool = AtomicBool::new(false);
@ -11,6 +11,7 @@ pub static VALIDATE_TOKENS: AtomicBool = AtomicBool::new(false);
pub static SEND_SERVER_VERSION: AtomicBool = AtomicBool::new(false); pub static SEND_SERVER_VERSION: AtomicBool = AtomicBool::new(false);
#[derive(Clap, Clone)] #[derive(Clap, Clone)]
#[clap(version = crate_version!(), author = crate_authors!(), about = crate_description!())]
pub struct CliArgs { pub struct CliArgs {
/// The port to listen on. /// The port to listen on.
#[clap(short, long, default_value = "42069", env = "PORT")] #[clap(short, long, default_value = "42069", env = "PORT")]
@ -34,6 +35,8 @@ pub struct CliArgs {
/// reasons. /// reasons.
#[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)] #[clap(long, env = "ENABLE_SERVER_STRING", takes_value = false)]
pub enable_server_string: bool, pub enable_server_string: bool,
#[clap(short, long, conflicts_with("memory_quota"), env = "LOW_MEMORY_MODE")] #[clap(short, long, conflicts_with("memory-quota"), env = "LOW_MEMORY_MODE")]
pub low_memory: bool, pub low_memory: bool,
#[clap(short, long, parse(from_occurrences))]
pub verbose: usize,
} }

View file

@ -1,6 +1,6 @@
#![warn(clippy::pedantic, clippy::nursery)] #![warn(clippy::pedantic, clippy::nursery)]
// We're end users, so these is ok // We're end users, so these is ok
#![allow(clippy::future_not_send, clippy::module_name_repetitions)] #![allow(clippy::module_name_repetitions)]
use std::env::{self, VarError}; use std::env::{self, VarError};
use std::process; use std::process;
@ -53,7 +53,9 @@ async fn main() -> Result<(), std::io::Error> {
println!(concat!( println!(concat!(
env!("CARGO_PKG_NAME"), env!("CARGO_PKG_NAME"),
" Copyright (C) 2021 Edward Shen\n\n", " Copyright (C) 2021 ",
env!("CARGO_PKG_AUTHORS"),
"\n\n",
env!("CARGO_PKG_NAME"), env!("CARGO_PKG_NAME"),
" is free software: you can redistribute it and/or modify\n\ " is free software: you can redistribute it and/or modify\n\
it under the terms of the GNU General Public License as published by\n\ it under the terms of the GNU General Public License as published by\n\
@ -76,8 +78,11 @@ async fn main() -> Result<(), std::io::Error> {
let cache_path = cli_args.cache_path.clone(); let cache_path = cli_args.cache_path.clone();
let low_mem_mode = cli_args.low_memory; let low_mem_mode = cli_args.low_memory;
SimpleLogger::new() match cli_args.verbose {
.with_level(LevelFilter::Info) 0 => SimpleLogger::new().with_level(LevelFilter::Info),
1 => SimpleLogger::new().with_level(LevelFilter::Debug),
_ => SimpleLogger::new().with_level(LevelFilter::Trace),
}
.init() .init()
.unwrap(); .unwrap();

View file

@ -1,4 +1,3 @@
use std::convert::Infallible;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use actix_web::dev::HttpResponseBuilder; use actix_web::dev::HttpResponseBuilder;
@ -11,14 +10,14 @@ use actix_web::{get, web::Data, HttpRequest, HttpResponse, Responder};
use base64::DecodeError; use base64::DecodeError;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::stream; use futures::{Stream, TryStreamExt};
use log::{error, info, warn}; use log::{error, info, warn};
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::Deserialize; use serde::Deserialize;
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES}; use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
use thiserror::Error; use thiserror::Error;
use crate::cache::{Cache, CacheKey, CachedImage, ImageMetadata}; use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
use crate::client_api_version; use crate::client_api_version;
use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS}; use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS};
use crate::state::RwLockServerState; use crate::state::RwLockServerState;
@ -182,8 +181,10 @@ async fn fetch_image(
) -> ServerResponse { ) -> ServerResponse {
let key = CacheKey(chapter_hash, file_name, is_data_saver); let key = CacheKey(chapter_hash, file_name, is_data_saver);
if let Some((image, metadata)) = cache.lock().get(&key).await { match cache.lock().get(&key).await {
return construct_response(image, metadata); Some(Ok((image, metadata))) => return construct_response(image, metadata),
Some(Err(_)) => return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish()),
_ => (),
} }
// It's important to not get a write lock before this request, else we're // It's important to not get a write lock before this request, else we're
@ -238,22 +239,22 @@ async fn fetch_image(
headers.remove(LAST_MODIFIED), headers.remove(LAST_MODIFIED),
) )
}; };
let body = resp.bytes().await;
match body { let body = resp.bytes_stream().map_err(|e| e.into());
Ok(bytes) => { let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap();
let cached = ImageMetadata::new(content_type, length, last_mod).unwrap(); let (stream, metadata) = {
let image = CachedImage(bytes); match cache.lock().put(key, Box::new(body), metadata).await {
let resp = construct_response(&image, &cached); Ok((stream, metadata)) => (stream, *metadata),
cache.lock().put(key, image, cached).await;
return resp;
}
Err(e) => { Err(e) => {
warn!("Got payload error from image server: {}", e); warn!("Failed to insert into cache: {}", e);
ServerResponse::HttpResponse( return ServerResponse::HttpResponse(
push_headers(&mut HttpResponse::ServiceUnavailable()).finish(), HttpResponse::InternalServerError().finish(),
) );
} }
} }
};
return construct_response(stream, &metadata);
} }
Err(e) => { Err(e) => {
error!("Failed to fetch image from server: {}", e); error!("Failed to fetch image from server: {}", e);
@ -264,23 +265,22 @@ async fn fetch_image(
} }
} }
fn construct_response(cached: &CachedImage, metadata: &ImageMetadata) -> ServerResponse { fn construct_response(
let data: Vec<Result<Bytes, Infallible>> = cached data: impl Stream<Item = Result<Bytes, UpstreamError>> + Unpin + 'static,
.0 metadata: &ImageMetadata,
.to_vec() ) -> ServerResponse {
.chunks(1460) // TCP MSS default size
.map(|v| Ok(Bytes::from(v.to_vec())))
.collect();
let mut resp = HttpResponse::Ok(); let mut resp = HttpResponse::Ok();
if let Some(content_type) = &metadata.content_type { if let Some(content_type) = metadata.content_type {
resp.append_header((CONTENT_TYPE, content_type.as_ref())); resp.append_header((CONTENT_TYPE, content_type.as_ref()));
} }
if let Some(content_length) = &metadata.content_length {
resp.append_header((CONTENT_LENGTH, content_length.to_string())); if let Some(content_length) = metadata.content_length {
resp.append_header((CONTENT_LENGTH, content_length));
} }
if let Some(last_modified) = &metadata.last_modified {
if let Some(last_modified) = metadata.last_modified {
resp.append_header((LAST_MODIFIED, last_modified.to_rfc2822())); resp.append_header((LAST_MODIFIED, last_modified.to_rfc2822()));
} }
ServerResponse::HttpResponse(push_headers(&mut resp).streaming(stream::iter(data))) ServerResponse::HttpResponse(push_headers(&mut resp).streaming(data))
} }