Compare commits
No commits in common. "94375b185fa4d920a1f76dc6cd9aefdab49a555e" and "9f76a7a1b37f9ce19dd0d4fd0c6a5b6b3007dfe3" have entirely different histories.
94375b185f
...
9f76a7a1b3
14 changed files with 258 additions and 375 deletions
160
Cargo.lock
generated
160
Cargo.lock
generated
|
@ -238,15 +238,6 @@ dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ansi_term"
|
|
||||||
version = "0.12.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
|
|
||||||
dependencies = [
|
|
||||||
"winapi",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anyhow"
|
name = "anyhow"
|
||||||
version = "1.0.42"
|
version = "1.0.42"
|
||||||
|
@ -418,17 +409,6 @@ version = "1.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "chacha20"
|
|
||||||
version = "0.7.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "fee7ad89dc1128635074c268ee661f90c3f7e83d9fd12910608c36b47d6c3412"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"cipher",
|
|
||||||
"cpufeatures",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "chrono"
|
name = "chrono"
|
||||||
version = "0.4.19"
|
version = "0.4.19"
|
||||||
|
@ -443,15 +423,6 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "cipher"
|
|
||||||
version = "0.3.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
|
|
||||||
dependencies = [
|
|
||||||
"generic-array",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "3.0.0-beta.2"
|
version = "3.0.0-beta.2"
|
||||||
|
@ -485,6 +456,17 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "colored"
|
||||||
|
version = "1.9.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f4ffc801dacf156c5854b9df4f425a626539c3a6ef7893cc0c5084a23f0b6c59"
|
||||||
|
dependencies = [
|
||||||
|
"atty",
|
||||||
|
"lazy_static",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "const_fn"
|
name = "const_fn"
|
||||||
version = "0.4.8"
|
version = "0.4.8"
|
||||||
|
@ -1197,7 +1179,6 @@ dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"bincode",
|
"bincode",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chacha20",
|
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"ctrlc",
|
"ctrlc",
|
||||||
|
@ -1216,6 +1197,7 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_repr",
|
"serde_repr",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
|
"simple_logger",
|
||||||
"sodiumoxide",
|
"sodiumoxide",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
|
@ -1223,9 +1205,6 @@ dependencies = [
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
|
||||||
"tracing-futures",
|
|
||||||
"tracing-subscriber",
|
|
||||||
"url",
|
"url",
|
||||||
"vergen",
|
"vergen",
|
||||||
]
|
]
|
||||||
|
@ -1236,15 +1215,6 @@ version = "1.0.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "matchers"
|
|
||||||
version = "0.0.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
|
|
||||||
dependencies = [
|
|
||||||
"regex-automata",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "matches"
|
name = "matches"
|
||||||
version = "0.1.8"
|
version = "0.1.8"
|
||||||
|
@ -1634,15 +1604,6 @@ dependencies = [
|
||||||
"regex-syntax",
|
"regex-syntax",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "regex-automata"
|
|
||||||
version = "0.1.10"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
|
||||||
dependencies = [
|
|
||||||
"regex-syntax",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex-syntax"
|
name = "regex-syntax"
|
||||||
version = "0.6.25"
|
version = "0.6.25"
|
||||||
|
@ -1924,15 +1885,6 @@ dependencies = [
|
||||||
"opaque-debug",
|
"opaque-debug",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "sharded-slab"
|
|
||||||
version = "0.1.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "79c719719ee05df97490f80a45acfc99e5a30ce98a1e4fb67aee422745ae14e3"
|
|
||||||
dependencies = [
|
|
||||||
"lazy_static",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
|
@ -1948,6 +1900,19 @@ version = "1.3.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c19772be3c4dd2ceaacf03cb41d5885f2a02c4d8804884918e3a258480803335"
|
checksum = "c19772be3c4dd2ceaacf03cb41d5885f2a02c4d8804884918e3a258480803335"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "simple_logger"
|
||||||
|
version = "1.11.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cd57f17c093ead1d4a1499dc9acaafdd71240908d64775465543b8d9a9f1d198"
|
||||||
|
dependencies = [
|
||||||
|
"atty",
|
||||||
|
"chrono",
|
||||||
|
"colored",
|
||||||
|
"log",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "slab"
|
name = "slab"
|
||||||
version = "0.4.3"
|
version = "0.4.3"
|
||||||
|
@ -2267,15 +2232,6 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "thread_local"
|
|
||||||
version = "1.1.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd"
|
|
||||||
dependencies = [
|
|
||||||
"once_cell",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.1.43"
|
version = "0.1.43"
|
||||||
|
@ -2421,21 +2377,9 @@ checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tracing-attributes",
|
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-attributes"
|
|
||||||
version = "0.1.15"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing-core"
|
name = "tracing-core"
|
||||||
version = "0.1.18"
|
version = "0.1.18"
|
||||||
|
@ -2445,60 +2389,6 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-futures"
|
|
||||||
version = "0.2.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
|
|
||||||
dependencies = [
|
|
||||||
"pin-project",
|
|
||||||
"tracing",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-log"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3"
|
|
||||||
dependencies = [
|
|
||||||
"lazy_static",
|
|
||||||
"log",
|
|
||||||
"tracing-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-serde"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b"
|
|
||||||
dependencies = [
|
|
||||||
"serde",
|
|
||||||
"tracing-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-subscriber"
|
|
||||||
version = "0.2.19"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ab69019741fca4d98be3c62d2b75254528b5432233fd8a4d2739fec20278de48"
|
|
||||||
dependencies = [
|
|
||||||
"ansi_term",
|
|
||||||
"chrono",
|
|
||||||
"lazy_static",
|
|
||||||
"matchers",
|
|
||||||
"parking_lot",
|
|
||||||
"regex",
|
|
||||||
"serde",
|
|
||||||
"serde_json",
|
|
||||||
"sharded-slab",
|
|
||||||
"smallvec",
|
|
||||||
"thread_local",
|
|
||||||
"tracing",
|
|
||||||
"tracing-core",
|
|
||||||
"tracing-log",
|
|
||||||
"tracing-serde",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "try-lock"
|
name = "try-lock"
|
||||||
version = "0.2.3"
|
version = "0.2.3"
|
||||||
|
|
11
Cargo.toml
11
Cargo.toml
|
@ -9,9 +9,9 @@ description = "A MangaDex@Home implementation in Rust."
|
||||||
repository = "https://github.com/edward-shen/mangadex-home-rs"
|
repository = "https://github.com/edward-shen/mangadex-home-rs"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
# lto = true
|
lto = true
|
||||||
# codegen-units = 1
|
codegen-units = 1
|
||||||
debug = 1
|
# debug = 1
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = { version = "4.0.0-beta.4", features = [ "rustls" ] }
|
actix-web = { version = "4.0.0-beta.4", features = [ "rustls" ] }
|
||||||
|
@ -20,7 +20,6 @@ async-trait = "0.1"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
bincode = "1"
|
bincode = "1"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
chacha20 = "0.7"
|
|
||||||
chrono = { version = "0.4", features = [ "serde" ] }
|
chrono = { version = "0.4", features = [ "serde" ] }
|
||||||
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
|
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
|
||||||
ctrlc = "3"
|
ctrlc = "3"
|
||||||
|
@ -39,15 +38,13 @@ serde = "1"
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
serde_repr = "0.1"
|
serde_repr = "0.1"
|
||||||
serde_yaml = "0.8"
|
serde_yaml = "0.8"
|
||||||
|
simple_logger = "1"
|
||||||
sodiumoxide = "0.2"
|
sodiumoxide = "0.2"
|
||||||
sqlx = { version = "0.5", features = [ "runtime-actix-rustls", "sqlite", "time", "chrono", "macros", "offline" ] }
|
sqlx = { version = "0.5", features = [ "runtime-actix-rustls", "sqlite", "time", "chrono", "macros", "offline" ] }
|
||||||
thiserror = "1"
|
thiserror = "1"
|
||||||
tokio = { version = "1", features = [ "full", "parking_lot" ] }
|
tokio = { version = "1", features = [ "full", "parking_lot" ] }
|
||||||
tokio-stream = { version = "0.1", features = [ "sync" ] }
|
tokio-stream = { version = "0.1", features = [ "sync" ] }
|
||||||
tokio-util = { version = "0.6", features = [ "codec" ] }
|
tokio-util = { version = "0.6", features = [ "codec" ] }
|
||||||
tracing = "0.1"
|
|
||||||
tracing-futures = "0.2"
|
|
||||||
tracing-subscriber = { version = "0.2", features = [ "parking_lot" ] }
|
|
||||||
url = { version = "2", features = [ "serde" ] }
|
url = { version = "2", features = [ "serde" ] }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
|
|
|
@ -54,38 +54,3 @@ server_settings:
|
||||||
# the backend will infer it from where it was sent from, which may fail in the
|
# the backend will infer it from where it was sent from, which may fail in the
|
||||||
# presence of multiple IPs.
|
# presence of multiple IPs.
|
||||||
# external_ip: ~
|
# external_ip: ~
|
||||||
|
|
||||||
# These settings are unique to the Rust client, and may be ignored or behave
|
|
||||||
# differently from the official client.
|
|
||||||
extended_options:
|
|
||||||
# Which cache type to use. By default, this is `on_disk`, but one can select
|
|
||||||
# `lfu` or `lru` to use a LFU or LRU in addition to the on-disk cache to
|
|
||||||
# improve lookup times. Generally speaking, using one is almost always better,
|
|
||||||
# but by how much depends on how much memory you let the node use, how large
|
|
||||||
# is your node, and which caching implementation you use.
|
|
||||||
# cache_type: on_disk
|
|
||||||
|
|
||||||
# The amount of memory the client should use when using an in-memory cache.
|
|
||||||
# This does nothing if only the on-disk cache is used.
|
|
||||||
# memory_quota: 0
|
|
||||||
|
|
||||||
# Whether or not to expose a prometheus endpoint at /metrics. This is a
|
|
||||||
# completely open endpoint, so best practice is to make sure this is only
|
|
||||||
# readable from the internal network.
|
|
||||||
# enable_metrics: false
|
|
||||||
|
|
||||||
# If you'd like to specify a different path location for the cache, you can do
|
|
||||||
# so here.
|
|
||||||
# cache_path: "./cache"
|
|
||||||
|
|
||||||
# What logging level to use. Valid options are "error", "warn", "info",
|
|
||||||
# "debug", "trace", and "off", which disables logging.
|
|
||||||
# logging_level: info
|
|
||||||
|
|
||||||
# Warning: Experimental. Will cause problems.
|
|
||||||
#
|
|
||||||
# Enables disk encryption where the key is stored in memory. In other words,
|
|
||||||
# when the MD@H program is stopped, all cached files are irrecoverable.
|
|
||||||
# Practically speaking, this isn't all too useful (and definitely hurts
|
|
||||||
# performance), but for peace of mind, this may be useful.
|
|
||||||
# ephemeral_disk_encryption: false
|
|
14
src/cache/disk.rs
vendored
14
src/cache/disk.rs
vendored
|
@ -10,7 +10,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::LevelFilter;
|
use log::{debug, error, warn, LevelFilter};
|
||||||
use md5::digest::generic_array::GenericArray;
|
use md5::digest::generic_array::GenericArray;
|
||||||
use md5::{Digest, Md5};
|
use md5::{Digest, Md5};
|
||||||
use sqlx::sqlite::SqliteConnectOptions;
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
|
@ -18,13 +18,11 @@ use sqlx::{ConnectOptions, Sqlite, SqlitePool, Transaction};
|
||||||
use tokio::fs::remove_file;
|
use tokio::fs::remove_file;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tracing::{debug, error, instrument, warn};
|
|
||||||
|
|
||||||
use crate::units::Bytes;
|
use crate::units::Bytes;
|
||||||
|
|
||||||
use super::{Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata};
|
use super::{Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata};
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct DiskCache {
|
pub struct DiskCache {
|
||||||
disk_path: PathBuf,
|
disk_path: PathBuf,
|
||||||
disk_cur_size: AtomicU64,
|
disk_cur_size: AtomicU64,
|
||||||
|
@ -216,7 +214,6 @@ async fn db_listener(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "debug", skip(transaction))]
|
|
||||||
async fn handle_db_get(entry: &Path, transaction: &mut Transaction<'_, Sqlite>) {
|
async fn handle_db_get(entry: &Path, transaction: &mut Transaction<'_, Sqlite>) {
|
||||||
let hash = if let Ok(hash) = Md5Hash::try_from(entry) {
|
let hash = if let Ok(hash) = Md5Hash::try_from(entry) {
|
||||||
hash
|
hash
|
||||||
|
@ -244,7 +241,6 @@ async fn handle_db_get(entry: &Path, transaction: &mut Transaction<'_, Sqlite>)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "debug", skip(transaction, cache))]
|
|
||||||
async fn handle_db_put(
|
async fn handle_db_put(
|
||||||
entry: &Path,
|
entry: &Path,
|
||||||
size: u64,
|
size: u64,
|
||||||
|
@ -267,7 +263,7 @@ async fn handle_db_put(
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Err(e) = query {
|
if let Err(e) = query {
|
||||||
warn!("Failed to add to db: {}", e);
|
warn!("Failed to add {:?} to db: {}", key, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.disk_cur_size.fetch_add(size, Ordering::Release);
|
cache.disk_cur_size.fetch_add(size, Ordering::Release);
|
||||||
|
@ -328,15 +324,15 @@ impl Cache for DiskCache {
|
||||||
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
||||||
let channel = self.db_update_channel_sender.clone();
|
let channel = self.db_update_channel_sender.clone();
|
||||||
|
|
||||||
// TODO: Check legacy path as well
|
|
||||||
|
|
||||||
let path = Arc::new(self.disk_path.clone().join(PathBuf::from(key)));
|
let path = Arc::new(self.disk_path.clone().join(PathBuf::from(key)));
|
||||||
let path_0 = Arc::clone(&path);
|
let path_0 = Arc::clone(&path);
|
||||||
|
|
||||||
tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await });
|
tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await });
|
||||||
|
|
||||||
super::fs::read_file_from_path(&path).await.map(|res| {
|
super::fs::read_file_from_path(&path).await.map(|res| {
|
||||||
res.map(|(stream, _, metadata)| (stream, metadata))
|
let (inner, maybe_header, metadata) = res?;
|
||||||
|
CacheStream::new(inner, maybe_header)
|
||||||
|
.map(|stream| (stream, metadata))
|
||||||
.map_err(|_| CacheError::DecryptionFailure)
|
.map_err(|_| CacheError::DecryptionFailure)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
290
src/cache/fs.rs
vendored
290
src/cache/fs.rs
vendored
|
@ -22,24 +22,26 @@ use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_web::error::PayloadError;
|
use actix_web::error::PayloadError;
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use chacha20::cipher::{NewCipher, StreamCipher};
|
|
||||||
use chacha20::{Key, XChaCha20, XNonce};
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use serde::Deserialize;
|
use log::{debug, warn};
|
||||||
use sodiumoxide::crypto::stream::xchacha20::{gen_nonce, NONCEBYTES};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::fs::{create_dir_all, remove_file, File};
|
use sodiumoxide::crypto::secretstream::{
|
||||||
use tokio::io::{
|
Header, Pull, Push, Stream as SecretStream, Tag, HEADERBYTES,
|
||||||
AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader,
|
|
||||||
ReadBuf,
|
|
||||||
};
|
};
|
||||||
|
use tokio::fs::{create_dir_all, remove_file, File};
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
use tracing::{debug, instrument, warn};
|
|
||||||
|
|
||||||
use super::compat::LegacyImageMetadata;
|
use super::compat::LegacyImageMetadata;
|
||||||
use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY};
|
use super::{CacheKey, ImageMetadata, InnerStream, ENCRYPTION_KEY};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub enum OnDiskMetadata {
|
||||||
|
Encrypted(Header, ImageMetadata),
|
||||||
|
Plaintext(ImageMetadata),
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempts to lookup the file on disk, returning a byte stream if it exists.
|
/// Attempts to lookup the file on disk, returning a byte stream if it exists.
|
||||||
/// Note that this could return two types of streams, depending on if the file
|
/// Note that this could return two types of streams, depending on if the file
|
||||||
|
@ -47,14 +49,13 @@ use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY};
|
||||||
#[inline]
|
#[inline]
|
||||||
pub(super) async fn read_file_from_path(
|
pub(super) async fn read_file_from_path(
|
||||||
path: &Path,
|
path: &Path,
|
||||||
) -> Option<Result<(CacheStream, Option<XNonce>, ImageMetadata), std::io::Error>> {
|
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
||||||
read_file(std::fs::File::open(path).ok()?).await
|
read_file(std::fs::File::open(path).ok()?).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "debug")]
|
|
||||||
async fn read_file(
|
async fn read_file(
|
||||||
file: std::fs::File,
|
file: std::fs::File,
|
||||||
) -> Option<Result<(CacheStream, Option<XNonce>, ImageMetadata), std::io::Error>> {
|
) -> Option<Result<(InnerStream, Option<Header>, ImageMetadata), std::io::Error>> {
|
||||||
let mut file_0 = file.try_clone().unwrap();
|
let mut file_0 = file.try_clone().unwrap();
|
||||||
let file_1 = file.try_clone().unwrap();
|
let file_1 = file.try_clone().unwrap();
|
||||||
|
|
||||||
|
@ -72,7 +73,7 @@ async fn read_file(
|
||||||
|
|
||||||
let parsed_metadata;
|
let parsed_metadata;
|
||||||
let mut maybe_header = None;
|
let mut maybe_header = None;
|
||||||
let mut reader: Option<Pin<Box<dyn MetadataFetch + Send>>> = None;
|
let mut reader: Option<Pin<Box<dyn AsyncRead + Send>>> = None;
|
||||||
if let Ok(metadata) = maybe_metadata {
|
if let Ok(metadata) = maybe_metadata {
|
||||||
// image is decrypted
|
// image is decrypted
|
||||||
if ENCRYPTION_KEY.get().is_some() {
|
if ENCRYPTION_KEY.get().is_some() {
|
||||||
|
@ -82,55 +83,57 @@ async fn read_file(
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
reader = Some(Box::pin(BufReader::new(File::from_std(file_1))));
|
reader = Some(Box::pin(File::from_std(file_1)));
|
||||||
parsed_metadata = Some(metadata);
|
parsed_metadata = Some(metadata);
|
||||||
debug!("Found not encrypted file");
|
debug!("Found not encrypted file");
|
||||||
} else {
|
} else {
|
||||||
debug!("metadata read failed, trying to see if it's encrypted");
|
|
||||||
let mut file = File::from_std(file_1);
|
let mut file = File::from_std(file_1);
|
||||||
file.seek(SeekFrom::Start(0)).await.ok()?;
|
file.seek(SeekFrom::Start(0)).await.ok()?;
|
||||||
|
let file_0 = file.try_clone().await.unwrap();
|
||||||
|
|
||||||
// image is encrypted or corrupt
|
// image is encrypted or corrupt
|
||||||
|
|
||||||
// If the encryption key was set, use the encrypted disk reader instead;
|
// If the encryption key was set, use the encrypted disk reader instead;
|
||||||
// else, just directly read from file.
|
// else, just directly read from file.
|
||||||
if let Some(key) = ENCRYPTION_KEY.get() {
|
if let Some(key) = ENCRYPTION_KEY.get() {
|
||||||
let mut nonce_bytes = [0; NONCEBYTES];
|
let mut header_bytes = [0; HEADERBYTES];
|
||||||
if let Err(e) = file.read_exact(&mut nonce_bytes).await {
|
if let Err(e) = file.read_exact(&mut header_bytes).await {
|
||||||
warn!("Found file but failed reading header: {}", e);
|
warn!("Found file but failed reading header: {}", e);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("header bytes: {:x?}", nonce_bytes);
|
let file_header = if let Some(header) = Header::from_slice(&header_bytes) {
|
||||||
|
header
|
||||||
|
} else {
|
||||||
|
warn!("Found file, but encrypted header was invalid. Assuming corrupted!");
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
maybe_header = Some(*XNonce::from_slice(&nonce_bytes));
|
let secret_stream = if let Ok(stream) = SecretStream::init_pull(&file_header, key) {
|
||||||
reader = Some(Box::pin(BufReader::new(EncryptedDiskReader::new(
|
stream
|
||||||
file,
|
} else {
|
||||||
XNonce::from_slice(XNonce::from_slice(&nonce_bytes)),
|
warn!("Failed to init secret stream with key and header. Assuming corrupted!");
|
||||||
key,
|
return None;
|
||||||
))));
|
};
|
||||||
|
|
||||||
|
maybe_header = Some(file_header);
|
||||||
|
|
||||||
|
reader = Some(Box::pin(EncryptedDiskReader::new(file, secret_stream)));
|
||||||
}
|
}
|
||||||
|
|
||||||
parsed_metadata = if let Some(reader) = reader.as_mut() {
|
let mut deserializer = serde_json::Deserializer::from_reader(file_0.into_std().await);
|
||||||
if let Ok(metadata) = reader.as_mut().metadata().await {
|
parsed_metadata = ImageMetadata::deserialize(&mut deserializer).ok();
|
||||||
debug!("Successfully parsed encrypted metadata");
|
|
||||||
Some(metadata)
|
if parsed_metadata.is_some() {
|
||||||
} else {
|
debug!("Found encrypted file");
|
||||||
debug!("Failed to parse encrypted metadata");
|
}
|
||||||
None
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
debug!("Failed to read encrypted data");
|
|
||||||
None
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// parsed_metadata is either set or unset here. If it's set then we
|
// parsed_metadata is either set or unset here. If it's set then we
|
||||||
// successfully decoded the data; otherwise the file is garbage.
|
// successfully decoded the data; otherwise the file is garbage.
|
||||||
|
|
||||||
if let Some(reader) = reader {
|
if let Some(reader) = reader {
|
||||||
let stream =
|
let stream = InnerStream::Completed(FramedRead::new(reader, BytesCodec::new()));
|
||||||
CacheStream::Completed(FramedRead::new(reader as Pin<Box<_>>, BytesCodec::new()));
|
|
||||||
parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata)))
|
parsed_metadata.map(|metadata| Ok((stream, maybe_header, metadata)))
|
||||||
} else {
|
} else {
|
||||||
debug!("Reader was invalid, file is corrupt");
|
debug!("Reader was invalid, file is corrupt");
|
||||||
|
@ -140,79 +143,60 @@ async fn read_file(
|
||||||
|
|
||||||
struct EncryptedDiskReader {
|
struct EncryptedDiskReader {
|
||||||
file: Pin<Box<File>>,
|
file: Pin<Box<File>>,
|
||||||
keystream: XChaCha20,
|
stream: SecretStream<Pull>,
|
||||||
|
buf: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EncryptedDiskReader {
|
impl EncryptedDiskReader {
|
||||||
fn new(file: File, nonce: &XNonce, key: &Key) -> Self {
|
fn new(file: File, stream: SecretStream<Pull>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
file: Box::pin(file),
|
file: Box::pin(file),
|
||||||
keystream: XChaCha20::new(key, nonce),
|
stream,
|
||||||
|
buf: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait MetadataFetch: AsyncBufRead {
|
|
||||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<R: AsyncBufRead + Send> MetadataFetch for R {
|
|
||||||
#[inline]
|
|
||||||
async fn metadata(mut self: Pin<&mut Self>) -> Result<ImageMetadata, ()> {
|
|
||||||
MetadataFuture(&mut self).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for EncryptedDiskReader {
|
impl AsyncRead for EncryptedDiskReader {
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut ReadBuf<'_>,
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<std::io::Result<()>> {
|
) -> Poll<std::io::Result<()>> {
|
||||||
let previously_read = buf.filled().len();
|
let cursor_start = buf.filled().len();
|
||||||
|
|
||||||
let res = self.as_mut().file.as_mut().poll_read(cx, buf);
|
let res = self.as_mut().file.as_mut().poll_read(cx, buf);
|
||||||
let bytes_modified = buf.filled().len() - previously_read;
|
if res.is_pending() {
|
||||||
self.keystream.apply_keystream(
|
return Poll::Pending;
|
||||||
&mut buf.filled_mut()[previously_read..previously_read + bytes_modified],
|
|
||||||
);
|
|
||||||
res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct MetadataFuture<'a, R>(&'a mut Pin<&'a mut R>);
|
|
||||||
|
|
||||||
impl<'a, R: AsyncBufRead> Future for MetadataFuture<'a, R> {
|
|
||||||
type Output = Result<ImageMetadata, ()>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let mut filled = 0;
|
|
||||||
loop {
|
|
||||||
let buf = match self.0.as_mut().poll_fill_buf(cx) {
|
|
||||||
Poll::Ready(Ok(buffer)) => buffer,
|
|
||||||
Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
|
|
||||||
Poll::Pending => return Poll::Pending,
|
|
||||||
};
|
|
||||||
|
|
||||||
if filled == buf.len() {
|
|
||||||
return Poll::Ready(Err(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
filled = buf.len();
|
|
||||||
|
|
||||||
let mut reader = serde_json::Deserializer::from_slice(buf).into_iter();
|
|
||||||
let (res, bytes_consumed) = match reader.next() {
|
|
||||||
Some(Ok(metadata)) => (Poll::Ready(Ok(metadata)), reader.byte_offset()),
|
|
||||||
Some(Err(e)) if e.is_eof() => continue,
|
|
||||||
Some(Err(_)) | None => return Poll::Ready(Err(())),
|
|
||||||
};
|
|
||||||
|
|
||||||
// This needs to be outside the loop because we need to drop the
|
|
||||||
// reader ref, since that depends on a mut self.
|
|
||||||
self.0.as_mut().consume(bytes_consumed);
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let cursor_new = buf.filled().len();
|
||||||
|
|
||||||
|
// pull_to_vec internally calls vec.clear() and vec.reserve(). Generally
|
||||||
|
// speaking we should be reading about the same amount of data each time
|
||||||
|
// so we shouldn't experience too much of a slow down w.r.t resizing the
|
||||||
|
// buffer...
|
||||||
|
let new_self = Pin::into_inner(self);
|
||||||
|
new_self
|
||||||
|
.stream
|
||||||
|
.pull_to_vec(
|
||||||
|
&buf.filled()[cursor_start..cursor_new],
|
||||||
|
None,
|
||||||
|
&mut new_self.buf,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// data is strictly smaller than the encrypted stream, since you need to
|
||||||
|
// encode tags as well, so this is always safe.
|
||||||
|
|
||||||
|
// rewrite encrypted data into decrypted data
|
||||||
|
let buffer = buf.filled_mut();
|
||||||
|
for (old, new) in buffer[cursor_start..].iter_mut().zip(&new_self.buf) {
|
||||||
|
*old = *new;
|
||||||
|
}
|
||||||
|
buf.set_filled(cursor_start + new_self.buf.len());
|
||||||
|
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,31 +216,36 @@ where
|
||||||
Fut: 'static + Send + Sync + Future<Output = ()>,
|
Fut: 'static + Send + Sync + Future<Output = ()>,
|
||||||
DbCallback: 'static + Send + Sync + FnOnce(u64) -> Fut,
|
DbCallback: 'static + Send + Sync + FnOnce(u64) -> Fut,
|
||||||
{
|
{
|
||||||
let mut file = {
|
let file = {
|
||||||
let parent = path.parent().expect("The path to have a parent");
|
let parent = path.parent().expect("The path to have a parent");
|
||||||
create_dir_all(parent).await?;
|
create_dir_all(parent).await?;
|
||||||
let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
|
let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
|
||||||
file
|
file
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut writer: Pin<Box<dyn AsyncWrite + Send>> = if let Some(key) = ENCRYPTION_KEY.get() {
|
|
||||||
let nonce = gen_nonce();
|
|
||||||
file.write_all(nonce.as_ref()).await?;
|
|
||||||
Box::pin(EncryptedDiskWriter::new(
|
|
||||||
file,
|
|
||||||
XNonce::from_slice(nonce.as_ref()),
|
|
||||||
key,
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
Box::pin(file)
|
|
||||||
};
|
|
||||||
|
|
||||||
let metadata_string = serde_json::to_string(&metadata).expect("serialization to work");
|
let metadata_string = serde_json::to_string(&metadata).expect("serialization to work");
|
||||||
let metadata_size = metadata_string.len();
|
let metadata_size = metadata_string.len();
|
||||||
|
|
||||||
let mut error = writer.write_all(metadata_string.as_bytes()).await.err();
|
let (mut writer, maybe_header): (Pin<Box<dyn AsyncWrite + Send>>, _) =
|
||||||
|
if let Some((enc, header)) = ENCRYPTION_KEY
|
||||||
|
.get()
|
||||||
|
.map(|key| SecretStream::init_push(key).expect("Failed to init enc stream"))
|
||||||
|
{
|
||||||
|
(Box::pin(EncryptedDiskWriter::new(file, enc)), Some(header))
|
||||||
|
} else {
|
||||||
|
(Box::pin(file), None)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut error = if let Some(header) = maybe_header {
|
||||||
|
writer.write_all(header.as_ref()).await.err()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
if error.is_none() {
|
||||||
|
error = writer.write_all(metadata_string.as_bytes()).await.err();
|
||||||
|
}
|
||||||
if error.is_none() {
|
if error.is_none() {
|
||||||
debug!("decrypted write {:x?}", &bytes[..40]);
|
|
||||||
error = writer.write_all(&bytes).await.err();
|
error = writer.write_all(&bytes).await.err();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,76 +277,87 @@ where
|
||||||
|
|
||||||
struct EncryptedDiskWriter {
|
struct EncryptedDiskWriter {
|
||||||
file: Pin<Box<File>>,
|
file: Pin<Box<File>>,
|
||||||
keystream: XChaCha20,
|
stream: Option<SecretStream<Push>>,
|
||||||
buffer: Vec<u8>,
|
encryption_buffer: Vec<u8>,
|
||||||
|
write_buffer: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EncryptedDiskWriter {
|
impl EncryptedDiskWriter {
|
||||||
fn new(file: File, nonce: &XNonce, key: &Key) -> Self {
|
fn new(file: File, stream: SecretStream<Push>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
file: Box::pin(file),
|
file: Box::pin(file),
|
||||||
keystream: XChaCha20::new(key, nonce),
|
stream: Some(stream),
|
||||||
buffer: vec![],
|
encryption_buffer: vec![],
|
||||||
|
write_buffer: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncWrite for EncryptedDiskWriter {
|
impl AsyncWrite for EncryptedDiskWriter {
|
||||||
#[inline]
|
|
||||||
fn poll_write(
|
fn poll_write(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &[u8],
|
buf: &[u8],
|
||||||
) -> Poll<Result<usize, std::io::Error>> {
|
) -> Poll<Result<usize, std::io::Error>> {
|
||||||
let pinned = Pin::into_inner(self);
|
let new_self = Pin::into_inner(self);
|
||||||
|
{
|
||||||
|
let encryption_buffer = &mut new_self.encryption_buffer;
|
||||||
|
if let Some(stream) = new_self.stream.as_mut() {
|
||||||
|
stream
|
||||||
|
.push_to_vec(buf, None, Tag::Message, encryption_buffer)
|
||||||
|
.expect("Failed to write encrypted data to buffer");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let old_buffer_size = pinned.buffer.len();
|
new_self.write_buffer.extend(&new_self.encryption_buffer);
|
||||||
pinned.buffer.extend_from_slice(buf);
|
|
||||||
pinned
|
match new_self
|
||||||
.keystream
|
.file
|
||||||
.apply_keystream(&mut pinned.buffer[old_buffer_size..]);
|
.as_mut()
|
||||||
match pinned.file.as_mut().poll_write(cx, &pinned.buffer) {
|
.poll_write(cx, &new_self.write_buffer)
|
||||||
|
{
|
||||||
Poll::Ready(Ok(n)) => {
|
Poll::Ready(Ok(n)) => {
|
||||||
pinned.buffer.drain(..n);
|
new_self.write_buffer.drain(..n);
|
||||||
|
// We buffered all the bytes that were provided to use.
|
||||||
Poll::Ready(Ok(buf.len()))
|
Poll::Ready(Ok(buf.len()))
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
poll => poll,
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll_flush(
|
fn poll_flush(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
if self.buffer.is_empty() {
|
if self.as_ref().write_buffer.is_empty() {
|
||||||
self.as_mut().file.as_mut().poll_flush(cx)
|
self.file.as_mut().poll_flush(cx)
|
||||||
} else {
|
} else {
|
||||||
let pinned = Pin::into_inner(self);
|
let new_self = Pin::into_inner(self);
|
||||||
while !pinned.buffer.is_empty() {
|
let buffer = new_self.write_buffer.as_ref();
|
||||||
match pinned.file.as_mut().poll_write(cx, &pinned.buffer) {
|
match new_self.file.as_mut().poll_write(cx, buffer) {
|
||||||
Poll::Ready(Ok(n)) => {
|
Poll::Ready(res) => {
|
||||||
pinned.buffer.drain(..n);
|
let n = res?;
|
||||||
}
|
new_self.write_buffer.drain(..n);
|
||||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
// We're immediately ready to do some more flushing!
|
||||||
Poll::Pending => return Poll::Pending,
|
cx.waker().wake_by_ref();
|
||||||
|
// Return pending here because we still need to flush the
|
||||||
|
// file
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn poll_shutdown(
|
fn poll_shutdown(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
match self.as_mut().poll_flush(cx) {
|
self.as_mut()
|
||||||
Poll::Ready(Ok(())) => self.as_mut().file.as_mut().poll_shutdown(cx),
|
.stream
|
||||||
poll => poll,
|
.take()
|
||||||
}
|
.map(|stream| stream.finalize(None));
|
||||||
|
self.file.as_mut().poll_shutdown(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
12
src/cache/mem.rs
vendored
12
src/cache/mem.rs
vendored
|
@ -1,7 +1,9 @@
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::{Cache, CacheKey, CacheStream, CallbackCache, ImageMetadata, MemStream};
|
use super::{
|
||||||
|
Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, InnerStream, MemStream,
|
||||||
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
|
@ -144,9 +146,13 @@ where
|
||||||
) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> {
|
) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> {
|
||||||
match self.mem_cache.lock().now_or_never() {
|
match self.mem_cache.lock().now_or_never() {
|
||||||
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
|
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
|
||||||
Ok((CacheStream::Memory(MemStream(bytes.clone())), *metadata))
|
Ok((InnerStream::Memory(MemStream(bytes.clone())), *metadata))
|
||||||
}) {
|
}) {
|
||||||
Some(v) => Some(v),
|
Some(v) => Some(v.and_then(|(inner, metadata)| {
|
||||||
|
CacheStream::new(inner, None)
|
||||||
|
.map(|v| (v, metadata))
|
||||||
|
.map_err(|_| CacheError::DecryptionFailure)
|
||||||
|
})),
|
||||||
None => self.inner.get(key).await,
|
None => self.inner.get(key).await,
|
||||||
},
|
},
|
||||||
None => self.inner.get(key).await,
|
None => self.inner.get(key).await,
|
||||||
|
|
58
src/cache/mod.rs
vendored
58
src/cache/mod.rs
vendored
|
@ -8,13 +8,14 @@ use std::task::{Context, Poll};
|
||||||
use actix_web::http::HeaderValue;
|
use actix_web::http::HeaderValue;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use chacha20::Key;
|
|
||||||
use chrono::{DateTime, FixedOffset};
|
use chrono::{DateTime, FixedOffset};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||||
|
use sodiumoxide::crypto::secretstream::{Header, Key, Pull, Stream as SecretStream};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
|
|
||||||
|
@ -23,7 +24,6 @@ pub use fs::UpstreamError;
|
||||||
pub use mem::MemoryCache;
|
pub use mem::MemoryCache;
|
||||||
|
|
||||||
use self::compat::LegacyImageMetadata;
|
use self::compat::LegacyImageMetadata;
|
||||||
use self::fs::MetadataFetch;
|
|
||||||
|
|
||||||
pub static ENCRYPTION_KEY: OnceCell<Key> = OnceCell::new();
|
pub static ENCRYPTION_KEY: OnceCell<Key> = OnceCell::new();
|
||||||
|
|
||||||
|
@ -231,12 +231,56 @@ impl<T: CallbackCache> CallbackCache for Arc<T> {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub enum CacheStream {
|
|
||||||
Memory(MemStream),
|
pub struct CacheStream {
|
||||||
Completed(FramedRead<Pin<Box<dyn MetadataFetch + Send>>, BytesCodec>),
|
inner: InnerStream,
|
||||||
|
decrypt: Option<SecretStream<Pull>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<CachedImage> for CacheStream {
|
impl CacheStream {
|
||||||
|
pub(self) fn new(inner: InnerStream, header: Option<Header>) -> Result<Self, ()> {
|
||||||
|
Ok(Self {
|
||||||
|
inner,
|
||||||
|
decrypt: header
|
||||||
|
.and_then(|header| {
|
||||||
|
ENCRYPTION_KEY
|
||||||
|
.get()
|
||||||
|
.map(|key| SecretStream::init_pull(&header, key))
|
||||||
|
})
|
||||||
|
.transpose()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for CacheStream {
|
||||||
|
type Item = CacheStreamItem;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
self.inner.poll_next_unpin(cx).map(|data| {
|
||||||
|
// False positive (`data`): https://link.eddie.sh/r1fXX
|
||||||
|
#[allow(clippy::option_if_let_else)]
|
||||||
|
if let Some(keystream) = self.decrypt.as_mut() {
|
||||||
|
data.map(|bytes_res| {
|
||||||
|
bytes_res.and_then(|bytes| {
|
||||||
|
keystream
|
||||||
|
.pull(&bytes, None)
|
||||||
|
.map(|(data, _tag)| Bytes::from(data))
|
||||||
|
.map_err(|_| UpstreamError)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
data
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(self) enum InnerStream {
|
||||||
|
Memory(MemStream),
|
||||||
|
Completed(FramedRead<Pin<Box<dyn AsyncRead + Send>>, BytesCodec>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<CachedImage> for InnerStream {
|
||||||
fn from(image: CachedImage) -> Self {
|
fn from(image: CachedImage) -> Self {
|
||||||
Self::Memory(MemStream(image.0))
|
Self::Memory(MemStream(image.0))
|
||||||
}
|
}
|
||||||
|
@ -244,7 +288,7 @@ impl From<CachedImage> for CacheStream {
|
||||||
|
|
||||||
type CacheStreamItem = Result<Bytes, UpstreamError>;
|
type CacheStreamItem = Result<Bytes, UpstreamError>;
|
||||||
|
|
||||||
impl Stream for CacheStream {
|
impl Stream for InnerStream {
|
||||||
type Item = CacheStreamItem;
|
type Item = CacheStreamItem;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
|
|
@ -5,6 +5,7 @@ use std::time::Duration;
|
||||||
use actix_web::http::{HeaderMap, HeaderName, HeaderValue};
|
use actix_web::http::{HeaderMap, HeaderName, HeaderValue};
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use log::{debug, error, warn};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use reqwest::header::{
|
use reqwest::header::{
|
||||||
|
@ -14,7 +15,6 @@ use reqwest::header::{
|
||||||
use reqwest::{Client, StatusCode};
|
use reqwest::{Client, StatusCode};
|
||||||
use tokio::sync::watch::{channel, Receiver};
|
use tokio::sync::watch::{channel, Receiver};
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
use tracing::{debug, error, warn};
|
|
||||||
|
|
||||||
use crate::cache::{Cache, CacheKey, ImageMetadata};
|
use crate::cache::{Cache, CacheKey, ImageMetadata};
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,6 @@ use clap::{crate_authors, crate_description, crate_version, Clap};
|
||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::level_filters::LevelFilter as TracingLevelFilter;
|
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::units::{KilobitsPerSecond, Mebibytes, Port};
|
use crate::units::{KilobitsPerSecond, Mebibytes, Port};
|
||||||
|
@ -79,7 +78,7 @@ pub struct Config {
|
||||||
pub cache_type: CacheType,
|
pub cache_type: CacheType,
|
||||||
pub cache_path: PathBuf,
|
pub cache_path: PathBuf,
|
||||||
pub shutdown_timeout: NonZeroU16,
|
pub shutdown_timeout: NonZeroU16,
|
||||||
pub log_level: TracingLevelFilter,
|
pub log_level: LevelFilter,
|
||||||
pub client_secret: ClientSecret,
|
pub client_secret: ClientSecret,
|
||||||
pub port: Port,
|
pub port: Port,
|
||||||
pub bind_address: SocketAddr,
|
pub bind_address: SocketAddr,
|
||||||
|
@ -98,24 +97,15 @@ impl Config {
|
||||||
let file_extended_options = file_args.extended_options.unwrap_or_default();
|
let file_extended_options = file_args.extended_options.unwrap_or_default();
|
||||||
|
|
||||||
let log_level = match (cli_args.quiet, cli_args.verbose) {
|
let log_level = match (cli_args.quiet, cli_args.verbose) {
|
||||||
(n, _) if n > 2 => TracingLevelFilter::OFF,
|
(n, _) if n > 2 => LevelFilter::Off,
|
||||||
(2, _) => TracingLevelFilter::ERROR,
|
(2, _) => LevelFilter::Error,
|
||||||
(1, _) => TracingLevelFilter::WARN,
|
(1, _) => LevelFilter::Warn,
|
||||||
// Use log level from file if no flags were provided to CLI
|
// Use log level from file if no flags were provided to CLI
|
||||||
(0, 0) => {
|
(0, 0) => file_extended_options
|
||||||
file_extended_options
|
.logging_level
|
||||||
.logging_level
|
.unwrap_or(LevelFilter::Info),
|
||||||
.map_or(TracingLevelFilter::INFO, |filter| match filter {
|
(_, 1) => LevelFilter::Debug,
|
||||||
LevelFilter::Off => TracingLevelFilter::OFF,
|
(_, n) if n > 1 => LevelFilter::Trace,
|
||||||
LevelFilter::Error => TracingLevelFilter::ERROR,
|
|
||||||
LevelFilter::Warn => TracingLevelFilter::WARN,
|
|
||||||
LevelFilter::Info => TracingLevelFilter::INFO,
|
|
||||||
LevelFilter::Debug => TracingLevelFilter::DEBUG,
|
|
||||||
LevelFilter::Trace => TracingLevelFilter::TRACE,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
(_, 1) => TracingLevelFilter::DEBUG,
|
|
||||||
(_, n) if n > 1 => TracingLevelFilter::TRACE,
|
|
||||||
// compiler can't figure it out
|
// compiler can't figure it out
|
||||||
_ => unsafe { unreachable_unchecked() },
|
_ => unsafe { unreachable_unchecked() },
|
||||||
};
|
};
|
||||||
|
|
15
src/main.rs
15
src/main.rs
|
@ -14,15 +14,15 @@ use actix_web::rt::{spawn, time, System};
|
||||||
use actix_web::web::{self, Data};
|
use actix_web::web::{self, Data};
|
||||||
use actix_web::{App, HttpResponse, HttpServer};
|
use actix_web::{App, HttpResponse, HttpServer};
|
||||||
use cache::{Cache, DiskCache};
|
use cache::{Cache, DiskCache};
|
||||||
use chacha20::Key;
|
|
||||||
use config::Config;
|
use config::Config;
|
||||||
|
use log::{debug, error, info, warn};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rustls::{NoClientAuth, ServerConfig};
|
use rustls::{NoClientAuth, ServerConfig};
|
||||||
use sodiumoxide::crypto::stream::xchacha20::gen_key;
|
use simple_logger::SimpleLogger;
|
||||||
|
use sodiumoxide::crypto::secretstream::gen_key;
|
||||||
use state::{RwLockServerState, ServerState};
|
use state::{RwLockServerState, ServerState};
|
||||||
use stop::send_stop;
|
use stop::send_stop;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{debug, error, info, warn};
|
|
||||||
|
|
||||||
use crate::cache::mem::{Lfu, Lru};
|
use crate::cache::mem::{Lfu, Lru};
|
||||||
use crate::cache::{MemoryCache, ENCRYPTION_KEY};
|
use crate::cache::{MemoryCache, ENCRYPTION_KEY};
|
||||||
|
@ -80,10 +80,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
// Logging and warnings
|
// Logging and warnings
|
||||||
//
|
//
|
||||||
|
|
||||||
// SimpleLogger::new().with_level(config.log_level).init()?;
|
SimpleLogger::new().with_level(config.log_level).init()?;
|
||||||
tracing_subscriber::fmt()
|
|
||||||
.with_max_level(config.log_level)
|
|
||||||
.init();
|
|
||||||
|
|
||||||
if let Err(e) = print_preamble_and_warnings(&config) {
|
if let Err(e) = print_preamble_and_warnings(&config) {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
|
@ -97,9 +94,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
|
||||||
if config.ephemeral_disk_encryption {
|
if config.ephemeral_disk_encryption {
|
||||||
info!("Running with at-rest encryption!");
|
info!("Running with at-rest encryption!");
|
||||||
ENCRYPTION_KEY
|
ENCRYPTION_KEY.set(gen_key()).unwrap();
|
||||||
.set(*Key::from_slice(gen_key().as_ref()))
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.enable_metrics {
|
if config.enable_metrics {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::{io::BufReader, sync::Arc};
|
use std::{io::BufReader, sync::Arc};
|
||||||
|
|
||||||
|
use log::{debug, error, info, warn};
|
||||||
use rustls::internal::pemfile::{certs, rsa_private_keys};
|
use rustls::internal::pemfile::{certs, rsa_private_keys};
|
||||||
use rustls::sign::{RSASigningKey, SigningKey};
|
use rustls::sign::{RSASigningKey, SigningKey};
|
||||||
use rustls::Certificate;
|
use rustls::Certificate;
|
||||||
|
@ -8,7 +9,6 @@ use serde::de::{MapAccess, Visitor};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_repr::Deserialize_repr;
|
use serde_repr::Deserialize_repr;
|
||||||
use sodiumoxide::crypto::box_::PrecomputedKey;
|
use sodiumoxide::crypto::box_::PrecomputedKey;
|
||||||
use tracing::{debug, error, info, warn};
|
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::config::{ClientSecret, Config, UnstableOptions, VALIDATE_TOKENS};
|
use crate::config::{ClientSecret, Config, UnstableOptions, VALIDATE_TOKENS};
|
||||||
|
|
|
@ -10,11 +10,11 @@ use base64::DecodeError;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
|
use log::{debug, error, info, trace};
|
||||||
use prometheus::{Encoder, TextEncoder};
|
use prometheus::{Encoder, TextEncoder};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
|
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{debug, error, info, trace};
|
|
||||||
|
|
||||||
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
|
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
|
||||||
use crate::client::{FetchResult, DEFAULT_HEADERS, HTTP_CLIENT};
|
use crate::client::{FetchResult, DEFAULT_HEADERS, HTTP_CLIENT};
|
||||||
|
|
|
@ -4,6 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use crate::config::{ClientSecret, Config, UnstableOptions, OFFLINE_MODE, VALIDATE_TOKENS};
|
use crate::config::{ClientSecret, Config, UnstableOptions, OFFLINE_MODE, VALIDATE_TOKENS};
|
||||||
use crate::ping::{Request, Response, CONTROL_CENTER_PING_URL};
|
use crate::ping::{Request, Response, CONTROL_CENTER_PING_URL};
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
|
use log::{error, info, warn};
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rustls::sign::{CertifiedKey, SigningKey};
|
use rustls::sign::{CertifiedKey, SigningKey};
|
||||||
|
@ -11,7 +12,6 @@ use rustls::Certificate;
|
||||||
use rustls::{ClientHello, ResolvesServerCert};
|
use rustls::{ClientHello, ResolvesServerCert};
|
||||||
use sodiumoxide::crypto::box_::{PrecomputedKey, PRECOMPUTEDKEYBYTES};
|
use sodiumoxide::crypto::box_::{PrecomputedKey, PRECOMPUTEDKEYBYTES};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{error, info, warn};
|
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
pub struct ServerState {
|
pub struct ServerState {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
|
use log::{info, warn};
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tracing::{info, warn};
|
|
||||||
|
|
||||||
use crate::config::ClientSecret;
|
use crate::config::ClientSecret;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue