use callback based pruning

feature/v32-tokens
Edward Shen 2021-04-19 22:01:32 -04:00
parent 74966678bd
commit 9904ba2cfc
Signed by: edward
GPG Key ID: 19182661E818369F
8 changed files with 136 additions and 223 deletions

4
.gitignore vendored
View File

@ -1,3 +1,5 @@
/target
.env
cache
cache
flamegraph*.svg
perf.data*

152
Cargo.lock generated
View File

@ -28,7 +28,7 @@ dependencies = [
"actix-tls",
"actix-utils",
"ahash 0.7.2",
"base64 0.13.0",
"base64",
"bitflags",
"brotli2",
"bytes",
@ -54,7 +54,7 @@ dependencies = [
"rand",
"regex",
"serde",
"sha-1 0.9.4",
"sha-1",
"smallvec",
"time 0.2.26",
"tokio",
@ -268,15 +268,6 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b"
[[package]]
name = "base64"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e"
dependencies = [
"byteorder",
]
[[package]]
name = "base64"
version = "0.13.0"
@ -298,34 +289,13 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "block-buffer"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
dependencies = [
"block-padding",
"byte-tools",
"byteorder",
"generic-array 0.12.4",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array 0.14.4",
]
[[package]]
name = "block-padding"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5"
dependencies = [
"byte-tools",
"generic-array",
]
[[package]]
@ -354,18 +324,6 @@ version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
[[package]]
name = "byte-tools"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.0.1"
@ -511,22 +469,13 @@ dependencies = [
"syn",
]
[[package]]
name = "digest"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
dependencies = [
"generic-array 0.12.4",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array 0.14.4",
"generic-array",
]
[[package]]
@ -556,12 +505,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "flate2"
version = "1.0.20"
@ -682,15 +625,6 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd"
dependencies = [
"typenum",
]
[[package]]
name = "generic-array"
version = "0.14.4"
@ -758,12 +692,6 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
[[package]]
name = "http"
version = "0.2.4"
@ -968,7 +896,7 @@ version = "0.2.1"
dependencies = [
"actix-web",
"async-trait",
"base64 0.13.0",
"base64",
"bincode",
"bytes",
"chrono",
@ -986,9 +914,9 @@ dependencies = [
"serde_json",
"simple_logger",
"sodiumoxide",
"ssri",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"url",
]
@ -1099,12 +1027,6 @@ version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
[[package]]
name = "opaque-debug"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
[[package]]
name = "opaque-debug"
version = "0.3.0"
@ -1324,7 +1246,7 @@ version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2296f2fac53979e8ccbc4a1136b25dcefd37be9ed7e4a1f6b05a6029c84ff124"
dependencies = [
"base64 0.13.0",
"base64",
"bytes",
"encoding_rs",
"futures-core",
@ -1384,7 +1306,7 @@ version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64 0.13.0",
"base64",
"log",
"ring",
"sct",
@ -1471,29 +1393,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha-1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df"
dependencies = [
"block-buffer 0.7.3",
"digest 0.8.1",
"fake-simd",
"opaque-debug 0.2.3",
]
[[package]]
name = "sha-1"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f"
dependencies = [
"block-buffer 0.9.0",
"block-buffer",
"cfg-if",
"cpuid-bool",
"digest 0.9.0",
"opaque-debug 0.3.0",
"digest",
"opaque-debug",
]
[[package]]
@ -1502,18 +1412,6 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
[[package]]
name = "sha2"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69"
dependencies = [
"block-buffer 0.7.3",
"digest 0.8.1",
"fake-simd",
"opaque-debug 0.2.3",
]
[[package]]
name = "signal-hook-registry"
version = "1.3.0"
@ -1575,22 +1473,6 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "ssri"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e875e58f12c33f832d09cff94b079aee84a48d8188d7a7132b3434358afb70c9"
dependencies = [
"base64 0.10.1",
"digest 0.8.1",
"hex",
"serde",
"serde_derive",
"sha-1 0.8.2",
"sha2",
"thiserror",
]
[[package]]
name = "standback"
version = "0.2.17"
@ -1820,6 +1702,18 @@ dependencies = [
"webpki",
]
[[package]]
name = "tokio-stream"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.6.6"

View File

@ -29,12 +29,12 @@ serde = "1"
serde_json = "1"
simple_logger = "1"
sodiumoxide = "0.2"
ssri = "5"
thiserror = "1"
tokio = { version = "1", features = [ "full", "parking_lot" ] }
tokio-stream = { version = "0.1", features = [ "sync" ] }
tokio-util = { version = "0.6", features = [ "codec" ] }
url = { version = "2", features = [ "serde" ] }
[profile.release]
lto = true
codegen-units = 1
codegen-units = 1

131
src/cache/fs.rs vendored
View File

@ -1,19 +1,19 @@
use actix_web::error::PayloadError;
use bytes::Buf;
use futures::{Stream, StreamExt};
use log::debug;
use log::{debug, error};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::fmt::Display;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::fs::{create_dir_all, remove_file, File};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch::{channel, Receiver};
use tokio::sync::RwLock;
use tokio::time::Interval;
use tokio_stream::wrappers::WatchStream;
use tokio_util::codec::{BytesCodec, FramedRead};
use super::{BoxedImageStream, CacheStream, CacheStreamItem};
@ -34,17 +34,17 @@ use super::{BoxedImageStream, CacheStream, CacheStreamItem};
/// We effectively use `WRITING_STATUS` as a status relay to ensure concurrent
/// reads to the file while it's being written to will wait for writing to be
/// completed.
static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Arc<CacheStatus>>>> =
static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
/// Tries to read from the file, returning a byte stream if it exists
pub async fn read_file(path: &Path) -> Option<Result<CacheStream, std::io::Error>> {
if path.exists() {
let status = WRITING_STATUS.read().await.get(path).map(Arc::clone);
let status = WRITING_STATUS.read().await.get(path).map(Clone::clone);
if let Some(status) = status {
Some(
ConcurrentFsStream::new(path, status)
ConcurrentFsStream::new(path, WatchStream::new(status))
.await
.map(CacheStream::Concurrent),
)
@ -65,27 +65,38 @@ pub async fn read_file(path: &Path) -> Option<Result<CacheStream, std::io::Error
pub async fn write_file(
path: &Path,
mut byte_stream: BoxedImageStream,
notifier: UnboundedSender<u64>,
) -> Result<CacheStream, std::io::Error> {
let done_writing_flag = Arc::new(CacheStatus::new());
let (tx, rx) = channel(WritingStatus::NotDone);
let mut file = {
let mut write_lock = WRITING_STATUS.write().await;
let parent = path.parent().unwrap();
create_dir_all(parent).await?;
let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
write_lock.insert(path.to_path_buf(), Arc::clone(&done_writing_flag));
let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
write_lock.insert(path.to_path_buf(), rx.clone());
file
};
let write_flag = Arc::clone(&done_writing_flag);
// need owned variant because async lifetime
let path_buf = path.to_path_buf();
tokio::spawn(async move {
let path_buf = path_buf; // moves path buf into async
let mut errored = false;
let mut bytes_written: u64 = 0;
while let Some(bytes) = byte_stream.next().await {
if let Ok(bytes) = bytes {
file.write_all(&bytes).await?
if let Ok(mut bytes) = bytes {
loop {
match file.write(&bytes).await? {
0 => break,
n => {
bytes.advance(n);
// We don't care if we don't have receivers
bytes_written += n as u64;
let _ = tx.send(WritingStatus::NotDone);
}
}
}
} else {
errored = true;
break;
@ -105,35 +116,45 @@ pub async fn write_file(
let mut write_lock = WRITING_STATUS.write().await;
// This needs to be written atomically with the write lock, else
// it's possible we have an inconsistent state
//
// We don't really care if we have no receivers
if errored {
write_flag.store(WritingStatus::Error);
let _ = tx.send(WritingStatus::Error);
} else {
write_flag.store(WritingStatus::Done);
let _ = tx.send(WritingStatus::Done);
}
write_lock.remove(&path_buf);
// notify
if let Err(e) = notifier.send(bytes_written) {
error!(
"Failed to notify cache of new entry size: {}. Cache no longer can prune FS!",
e
);
}
// We don't ever check this, so the return value doesn't matter
Ok::<_, std::io::Error>(())
});
Ok(CacheStream::Concurrent(
ConcurrentFsStream::new(path, done_writing_flag).await?,
ConcurrentFsStream::new(path, WatchStream::new(rx)).await?,
))
}
pub struct ConcurrentFsStream {
file: Pin<Box<File>>,
sleep: Pin<Box<Interval>>,
is_file_done_writing: Arc<CacheStatus>,
receiver: Pin<Box<WatchStream<WritingStatus>>>,
}
impl ConcurrentFsStream {
async fn new(path: &Path, is_done: Arc<CacheStatus>) -> Result<Self, std::io::Error> {
async fn new(
path: &Path,
receiver: WatchStream<WritingStatus>,
) -> Result<Self, std::io::Error> {
Ok(Self {
file: Box::pin(File::open(path).await?),
// 0.5ms
sleep: Box::pin(tokio::time::interval(Duration::from_micros(250))),
is_file_done_writing: is_done,
receiver: Box::pin(receiver),
})
}
}
@ -154,30 +175,29 @@ impl Stream for ConcurrentFsStream {
type Item = CacheStreamItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let status = self.is_file_done_writing.load();
let mut bytes = [0; 1460].to_vec();
let mut buffer = ReadBuf::new(&mut bytes);
let polled_result = self.file.as_mut().poll_read(cx, &mut buffer);
let filled = buffer.filled().len();
match (status, filled) {
// Prematurely reached EOF, schedule a poll in the future
(WritingStatus::NotDone, 0) => {
let _ = self.sleep.as_mut().poll_tick(cx);
Poll::Pending
}
// We got an error, abort the read.
(WritingStatus::Error, _) => Poll::Ready(Some(Err(UpstreamError))),
_ => {
bytes.truncate(filled);
polled_result.map(|_| {
if bytes.is_empty() {
None
} else {
Some(Ok(bytes.into()))
match self.receiver.as_mut().poll_next_unpin(cx) {
Poll::Ready(status) => {
let mut bytes = [0; 1460].to_vec();
let mut buffer = ReadBuf::new(&mut bytes);
let polled_result = self.file.as_mut().poll_read(cx, &mut buffer);
let filled = buffer.filled().len();
match (status, filled) {
(Some(WritingStatus::NotDone), 0) => Poll::Pending,
// We got an error, abort the read.
(Some(WritingStatus::Error), _) => Poll::Ready(Some(Err(UpstreamError))),
_ => {
bytes.truncate(filled);
polled_result.map(|_| {
if bytes.is_empty() {
None
} else {
Some(Ok(bytes.into()))
}
})
}
})
}
}
Poll::Pending => Poll::Pending,
}
}
}
@ -189,26 +209,7 @@ impl From<UpstreamError> for actix_web::Error {
}
}
struct CacheStatus(AtomicU8);
impl CacheStatus {
#[inline]
const fn new() -> Self {
Self(AtomicU8::new(WritingStatus::NotDone as u8))
}
#[inline]
fn store(&self, status: WritingStatus) {
self.0.store(status as u8, Ordering::Release);
}
#[inline]
fn load(&self) -> WritingStatus {
self.0.load(Ordering::Acquire).into()
}
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
enum WritingStatus {
NotDone = 0,
Done,

View File

@ -239,4 +239,7 @@ impl Cache for GenerationalCache {
self.get(&key).await.unwrap()
}
// noop
async fn increase_usage(&mut self, _amt: u64) {}
}

41
src/cache/low_mem.rs vendored
View File

@ -1,10 +1,11 @@
//! Low memory caching stuff
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};
use async_trait::async_trait;
use log::warn;
use lru::LruCache;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::RwLock;
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
@ -13,16 +14,37 @@ pub struct LowMemCache {
disk_path: PathBuf,
disk_max_size: u64,
disk_cur_size: u64,
master_sender: UnboundedSender<u64>,
}
impl LowMemCache {
pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Self {
Self {
pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<RwLock<Box<dyn Cache>>> {
let (tx, mut rx) = unbounded_channel();
let new_self: Arc<RwLock<Box<dyn Cache>>> = Arc::new(RwLock::new(Box::new(Self {
on_disk: LruCache::unbounded(),
disk_path,
disk_max_size,
disk_cur_size: 0,
}
master_sender: tx,
})));
let new_self_0 = Arc::clone(&new_self);
tokio::spawn(async move {
loop {
let new_size = match rx.recv().await {
Some(v) => v,
None => break,
};
new_self_0.write().await.increase_usage(new_size).await;
}
});
new_self.clone()
}
async fn prune(&mut self) {
todo!()
}
}
@ -47,13 +69,16 @@ impl Cache for LowMemCache {
) -> Result<(CacheStream, &ImageMetadata), CacheError> {
let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
self.on_disk.put(key.clone(), metadata);
super::fs::write_file(&path, image)
super::fs::write_file(&path, image, self.master_sender.clone())
.await
.map(move |stream| (stream, self.on_disk.get(&key).unwrap()))
.map_err(Into::into)
}
async fn prune(&mut self) {
warn!("Trimming has not been implemented yet. Cache is unbounded!");
async fn increase_usage(&mut self, amt: u64) {
self.disk_cur_size += amt;
if self.disk_cur_size > self.disk_max_size {
self.prune().await;
}
}
}

5
src/cache/mod.rs vendored
View File

@ -10,7 +10,6 @@ use bytes::Bytes;
use chrono::{DateTime, FixedOffset};
use fs::ConcurrentFsStream;
use futures::{Stream, StreamExt};
use log::debug;
use thiserror::Error;
pub use fs::UpstreamError;
@ -159,9 +158,7 @@ pub trait Cache: Send + Sync {
metadata: ImageMetadata,
) -> Result<(CacheStream, &ImageMetadata), CacheError>;
async fn prune(&mut self) {
debug!("Would trim but cache does not implement trimming!");
}
async fn increase_usage(&mut self, amt: u64);
}
pub enum CacheStream {

View File

@ -120,27 +120,18 @@ async fn main() -> Result<(), std::io::Error> {
}
});
let cache: Box<dyn Cache> = if low_mem_mode {
Box::new(LowMemCache::new(disk_quota, cache_path.clone()))
let cache: Arc<TokioRwLock<Box<dyn Cache>>> = if low_mem_mode {
LowMemCache::new(disk_quota, cache_path.clone())
} else {
Box::new(GenerationalCache::new(
Arc::new(TokioRwLock::new(Box::new(GenerationalCache::new(
memory_max_size,
disk_quota,
cache_path.clone(),
))
))))
};
let cache = Arc::new(TokioRwLock::new(cache));
let cache = Arc::clone(&cache);
let cache1 = Arc::clone(&cache);
// Spawn periodic cache trimming
spawn(async move {
let mut interval = time::interval(Duration::from_secs(3 * 60));
loop {
interval.tick().await;
cache.write().await.prune().await;
}
});
// Start HTTPS server
HttpServer::new(move || {
App::new()