Compare commits

..

No commits in common. "d42b80d7e1ae581deceba877ef8f43dcf662d55e" and "5fdcfa5071cdd02dbef33598a74d6c34a1ee09dd" have entirely different histories.

7 changed files with 52 additions and 213 deletions

29
Cargo.lock generated
View file

@ -403,9 +403,6 @@ name = "bytes"
version = "1.0.1" version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "bytestring" name = "bytestring"
@ -498,16 +495,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "combine"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2d47c1b11006b87e492b53b313bb699ce60e16613c4dddaa91f8f7c220ab2fa"
dependencies = [
"bytes 1.0.1",
"memchr",
]
[[package]] [[package]]
name = "const_fn" name = "const_fn"
version = "0.4.8" version = "0.4.8"
@ -1264,7 +1251,6 @@ dependencies = [
"once_cell", "once_cell",
"parking_lot", "parking_lot",
"prometheus", "prometheus",
"redis",
"reqwest", "reqwest",
"rustls", "rustls",
"serde", "serde",
@ -1681,21 +1667,6 @@ dependencies = [
"rand_core", "rand_core",
] ]
[[package]]
name = "redis"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bbc1838d8d0b423f325d6fac80c5f19109c7d16c8c37c584893dc17cf71c63d"
dependencies = [
"async-trait",
"combine",
"dtoa",
"itoa",
"percent-encoding",
"sha1",
"url",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.9" version = "0.2.9"

View file

@ -26,7 +26,7 @@ arc-swap = "1"
async-trait = "0.1" async-trait = "0.1"
base64 = "0.13" base64 = "0.13"
bincode = "1" bincode = "1"
bytes = { version = "1", features = [ "serde" ] } bytes = "1"
chacha20 = "0.7" chacha20 = "0.7"
chrono = { version = "0.4", features = [ "serde" ] } chrono = { version = "0.4", features = [ "serde" ] }
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] } clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
@ -42,7 +42,6 @@ maxminddb = "0.20"
md-5 = "0.9" md-5 = "0.9"
parking_lot = "0.11" parking_lot = "0.11"
prometheus = { version = "0.12", features = [ "process" ] } prometheus = { version = "0.12", features = [ "process" ] }
redis = "0.21"
reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] } reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] }
rustls = "0.19" rustls = "0.19"
serde = "1" serde = "1"

View file

@ -66,16 +66,12 @@ metric_settings:
# differently from the official client. # differently from the official client.
extended_options: extended_options:
# Which cache type to use. By default, this is `on_disk`, but one can select # Which cache type to use. By default, this is `on_disk`, but one can select
# `lfu`, `lru`, or `redis` to use a LFU, LRU, or redis instance in addition # `lfu` or `lru` to use a LFU or LRU in addition to the on-disk cache to
# to the on-disk cache to improve lookup times. Generally speaking, using one # improve lookup times. Generally speaking, using one is almost always better,
# is almost always better, but by how much depends on how much memory you let # but by how much depends on how much memory you let the node use, how large
# the node use, how large is your node, and which caching implementation you # is your node, and which caching implementation you use.
# use.
# cache_type: on_disk # cache_type: on_disk
# The redis url to connect with. Does nothing if the cache type isn't`redis`.
# redis_url: "redis://127.0.0.1/"
# The amount of memory the client should use when using an in-memory cache. # The amount of memory the client should use when using an in-memory cache.
# This does nothing if only the on-disk cache is used. # This does nothing if only the on-disk cache is used.
# memory_quota: 0 # memory_quota: 0
@ -93,6 +89,8 @@ extended_options:
# "debug", "trace", and "off", which disables logging. # "debug", "trace", and "off", which disables logging.
# logging_level: info # logging_level: info
# Warning: Experimental. Will cause problems.
#
# Enables disk encryption where the key is stored in memory. In other words, # Enables disk encryption where the key is stored in memory. In other words,
# when the MD@H program is stopped, all cached files are irrecoverable. # when the MD@H program is stopped, all cached files are irrecoverable.
# Practically speaking, this isn't all too useful (and definitely hurts # Practically speaking, this isn't all too useful (and definitely hurts

187
src/cache/mem.rs vendored
View file

@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -8,67 +7,10 @@ use bytes::Bytes;
use futures::FutureExt; use futures::FutureExt;
use lfu_cache::LfuCache; use lfu_cache::LfuCache;
use lru::LruCache; use lru::LruCache;
use redis::{
Client as RedisClient, Commands, FromRedisValue, RedisError, RedisResult, ToRedisArgs,
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::warn;
#[derive(Clone, Serialize, Deserialize)] type CacheValue = (Bytes, ImageMetadata, u64);
pub struct CacheValue {
data: Bytes,
metadata: ImageMetadata,
on_disk_size: u64,
}
impl CacheValue {
#[inline]
fn new(data: Bytes, metadata: ImageMetadata, on_disk_size: u64) -> Self {
Self {
data,
metadata,
on_disk_size,
}
}
}
impl FromRedisValue for CacheValue {
fn from_redis_value(v: &redis::Value) -> RedisResult<Self> {
use bincode::ErrorKind;
if let redis::Value::Data(data) = v {
bincode::deserialize(data).map_err(|err| match *err {
ErrorKind::Io(e) => RedisError::from(e),
ErrorKind::Custom(e) => RedisError::from((
redis::ErrorKind::ResponseError,
"bincode deserialize failed",
e,
)),
e => RedisError::from((
redis::ErrorKind::ResponseError,
"bincode deserialized failed",
e.to_string(),
)),
})
} else {
Err(RedisError::from((
redis::ErrorKind::TypeError,
"Got non data type from redis db",
)))
}
}
}
impl ToRedisArgs for CacheValue {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + redis::RedisWrite,
{
out.write_arg(&bincode::serialize(self).expect("serialization to work"))
}
}
/// Use LRU as the eviction strategy /// Use LRU as the eviction strategy
pub type Lru = LruCache<CacheKey, CacheValue>; pub type Lru = LruCache<CacheKey, CacheValue>;
@ -76,27 +18,22 @@ pub type Lru = LruCache<CacheKey, CacheValue>;
pub type Lfu = LfuCache<CacheKey, CacheValue>; pub type Lfu = LfuCache<CacheKey, CacheValue>;
/// Adapter trait for memory cache backends /// Adapter trait for memory cache backends
pub trait InternalMemoryCacheInitializer: InternalMemoryCache {
fn new() -> Self;
}
pub trait InternalMemoryCache: Sync + Send { pub trait InternalMemoryCache: Sync + Send {
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>>; fn unbounded() -> Self;
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue>;
fn push(&mut self, key: CacheKey, data: CacheValue); fn push(&mut self, key: CacheKey, data: CacheValue);
fn pop(&mut self) -> Option<(CacheKey, CacheValue)>; fn pop(&mut self) -> Option<(CacheKey, CacheValue)>;
} }
impl InternalMemoryCacheInitializer for Lfu {
#[inline]
fn new() -> Self {
Self::unbounded()
}
}
impl InternalMemoryCache for Lfu { impl InternalMemoryCache for Lfu {
#[inline] #[inline]
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> { fn unbounded() -> Self {
self.get(key).map(Cow::Borrowed) Self::unbounded()
}
#[inline]
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
self.get(key)
} }
#[inline] #[inline]
@ -110,17 +47,15 @@ impl InternalMemoryCache for Lfu {
} }
} }
impl InternalMemoryCacheInitializer for Lru {
#[inline]
fn new() -> Self {
Self::unbounded()
}
}
impl InternalMemoryCache for Lru { impl InternalMemoryCache for Lru {
#[inline] #[inline]
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> { fn unbounded() -> Self {
self.get(key).map(Cow::Borrowed) Self::unbounded()
}
#[inline]
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
self.get(key)
} }
#[inline] #[inline]
@ -134,22 +69,6 @@ impl InternalMemoryCache for Lru {
} }
} }
impl InternalMemoryCache for RedisClient {
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> {
Commands::get(self, key).ok().map(Cow::Owned)
}
fn push(&mut self, key: CacheKey, data: CacheValue) {
if let Err(e) = Commands::set::<_, _, ()>(self, key, data) {
warn!("Failed to push to redis: {}", e);
}
}
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
unimplemented!("redis should handle its own memory")
}
}
/// Memory accelerated disk cache. Uses the internal cache implementation in /// Memory accelerated disk cache. Uses the internal cache implementation in
/// memory to speed up reads. /// memory to speed up reads.
pub struct MemoryCache<MemoryCacheImpl, ColdCache> { pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
@ -161,7 +80,7 @@ pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache> impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
where where
MemoryCacheImpl: 'static + InternalMemoryCacheInitializer, MemoryCacheImpl: 'static + InternalMemoryCache,
ColdCache: 'static + Cache, ColdCache: 'static + Cache,
{ {
pub fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> { pub fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
@ -169,7 +88,7 @@ where
let new_self = Arc::new(Self { let new_self = Arc::new(Self {
inner, inner,
cur_mem_size: AtomicU64::new(0), cur_mem_size: AtomicU64::new(0),
mem_cache: Mutex::new(MemoryCacheImpl::new()), mem_cache: Mutex::new(MemoryCacheImpl::unbounded()),
master_sender: tx, master_sender: tx,
}); });
@ -194,7 +113,7 @@ where
Self { Self {
inner, inner,
cur_mem_size: AtomicU64::new(0), cur_mem_size: AtomicU64::new(0),
mem_cache: Mutex::new(MemoryCacheImpl::new()), mem_cache: Mutex::new(MemoryCacheImpl::unbounded()),
master_sender: tx, master_sender: tx,
}, },
rx, rx,
@ -202,21 +121,6 @@ where
} }
} }
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
where
MemoryCacheImpl: 'static + InternalMemoryCache,
ColdCache: 'static + Cache,
{
pub fn new_with_cache(inner: ColdCache, init_mem_cache: MemoryCacheImpl) -> Self {
Self {
inner,
cur_mem_size: AtomicU64::new(0),
mem_cache: Mutex::new(init_mem_cache),
master_sender: channel(1).0,
}
}
}
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>( async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
cache: Arc<MemoryCache<MemoryCacheImpl, ColdCache>>, cache: Arc<MemoryCache<MemoryCacheImpl, ColdCache>>,
max_mem_size: crate::units::Bytes, max_mem_size: crate::units::Bytes,
@ -242,20 +146,16 @@ async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
.mem_cache .mem_cache
.lock() .lock()
.await .await
.push(key, CacheValue::new(data, metadata, on_disk_size)); .push(key, (data, metadata, on_disk_size));
// Pop if too large // Pop if too large
while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 { while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
let popped = cache.mem_cache.lock().await.pop().map( let popped = cache
|( .mem_cache
key, .lock()
CacheValue { .await
data, .pop()
metadata, .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
on_disk_size,
},
)| (key, data, metadata, on_disk_size),
);
if let Some((_, _, _, size)) = popped { if let Some((_, _, _, size)) = popped {
cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release); cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release);
} else { } else {
@ -281,16 +181,12 @@ where
key: &CacheKey, key: &CacheKey,
) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> { ) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> {
match self.mem_cache.lock().now_or_never() { match self.mem_cache.lock().now_or_never() {
Some(mut mem_cache) => { Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
match mem_cache.get(key).map(Cow::into_owned).map( Ok((CacheStream::Memory(MemStream(bytes.clone())), *metadata))
|CacheValue { data, metadata, .. }| { }) {
Ok((CacheStream::Memory(MemStream(data)), metadata)) Some(v) => Some(v),
}, None => self.inner.get(key).await,
) { },
Some(v) => Some(v),
None => self.inner.get(key).await,
}
}
None => self.inner.get(key).await, None => self.inner.get(key).await,
} }
} }
@ -310,11 +206,10 @@ where
#[cfg(test)] #[cfg(test)]
mod test_util { mod test_util {
use std::borrow::Cow;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use super::{CacheValue, InternalMemoryCache, InternalMemoryCacheInitializer}; use super::{CacheValue, InternalMemoryCache};
use crate::cache::{ use crate::cache::{
Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
}; };
@ -380,15 +275,13 @@ mod test_util {
#[derive(Default)] #[derive(Default)]
pub struct TestMemoryCache(pub BTreeMap<CacheKey, CacheValue>); pub struct TestMemoryCache(pub BTreeMap<CacheKey, CacheValue>);
impl InternalMemoryCacheInitializer for TestMemoryCache { impl InternalMemoryCache for TestMemoryCache {
fn new() -> Self { fn unbounded() -> Self {
Self::default() Self::default()
} }
}
impl InternalMemoryCache for TestMemoryCache { fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> { self.0.get(key)
self.0.get(key).map(Cow::Borrowed)
} }
fn push(&mut self, key: CacheKey, data: CacheValue) { fn push(&mut self, key: CacheKey, data: CacheValue) {
@ -413,7 +306,7 @@ mod cache_ops {
use bytes::Bytes; use bytes::Bytes;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use crate::cache::mem::{CacheValue, InternalMemoryCache}; use crate::cache::mem::InternalMemoryCache;
use crate::cache::{Cache, CacheEntry, CacheKey, CacheStream, ImageMetadata, MemStream}; use crate::cache::{Cache, CacheEntry, CacheKey, CacheStream, ImageMetadata, MemStream};
use super::test_util::{TestDiskCache, TestMemoryCache}; use super::test_util::{TestDiskCache, TestMemoryCache};
@ -433,7 +326,7 @@ mod cache_ops {
last_modified: None, last_modified: None,
}; };
let bytes = Bytes::from_static(b"abcd"); let bytes = Bytes::from_static(b"abcd");
let value = CacheValue::new(bytes.clone(), metadata.clone(), 34); let value = (bytes.clone(), metadata.clone(), 34);
// Populate the cache, need to drop the lock else it's considered locked // Populate the cache, need to drop the lock else it's considered locked
// when we actually call the cache // when we actually call the cache

10
src/cache/mod.rs vendored
View file

@ -12,7 +12,6 @@ use chacha20::Key;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use redis::ToRedisArgs;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr}; use serde_repr::{Deserialize_repr, Serialize_repr};
use thiserror::Error; use thiserror::Error;
@ -36,15 +35,6 @@ pub mod mem;
#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord)] #[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord)]
pub struct CacheKey(pub String, pub String, pub bool); pub struct CacheKey(pub String, pub String, pub bool);
impl ToRedisArgs for CacheKey {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + redis::RedisWrite,
{
out.write_arg_fmt(self)
}
}
impl Display for CacheKey { impl Display for CacheKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.2 { if self.2 {

View file

@ -109,7 +109,6 @@ pub struct Config {
pub enable_metrics: bool, pub enable_metrics: bool,
pub geoip_license_key: Option<ClientSecret>, pub geoip_license_key: Option<ClientSecret>,
pub proxy: Option<Url>, pub proxy: Option<Url>,
pub redis_url: Option<Url>,
} }
impl Config { impl Config {
@ -211,7 +210,6 @@ impl Config {
} }
}), }),
proxy: cli_args.proxy, proxy: cli_args.proxy,
redis_url: file_extended_options.redis_url,
} }
} }
} }
@ -238,7 +236,6 @@ pub enum CacheType {
OnDisk, OnDisk,
Lru, Lru,
Lfu, Lfu,
Redis,
} }
impl FromStr for CacheType { impl FromStr for CacheType {
@ -249,7 +246,6 @@ impl FromStr for CacheType {
"on_disk" => Ok(Self::OnDisk), "on_disk" => Ok(Self::OnDisk),
"lru" => Ok(Self::Lru), "lru" => Ok(Self::Lru),
"lfu" => Ok(Self::Lfu), "lfu" => Ok(Self::Lfu),
"redis" => Ok(Self::Redis),
_ => Err(format!("Unknown option: {}", s)), _ => Err(format!("Unknown option: {}", s)),
} }
} }
@ -298,7 +294,6 @@ struct YamlExtendedOptions {
enable_metrics: Option<bool>, enable_metrics: Option<bool>,
logging_level: Option<LevelFilter>, logging_level: Option<LevelFilter>,
cache_path: Option<PathBuf>, cache_path: Option<PathBuf>,
redis_url: Option<Url>,
} }
#[derive(Clap, Clone)] #[derive(Clap, Clone)]
@ -346,7 +341,7 @@ struct CliArgs {
#[clap(short, long)] #[clap(short, long)]
pub config_path: Option<PathBuf>, pub config_path: Option<PathBuf>,
/// Whether to use an in-memory cache in addition to the disk cache. Default /// Whether to use an in-memory cache in addition to the disk cache. Default
/// value is "on_disk", other options are "lfu", "lru", and "redis". /// value is "on_disk", other options are "lfu" and "lru".
#[clap(short = 't', long)] #[clap(short = 't', long)]
pub cache_type: Option<CacheType>, pub cache_type: Option<CacheType>,
/// Whether or not to use a proxy for upstream requests. This affects all /// Whether or not to use a proxy for upstream requests. This affects all
@ -462,7 +457,6 @@ mod config {
enable_metrics: None, enable_metrics: None,
logging_level: Some(LevelFilter::Error), logging_level: Some(LevelFilter::Error),
cache_path: Some(PathBuf::from("b")), cache_path: Some(PathBuf::from("b")),
redis_url: None,
}), }),
}; };

View file

@ -21,7 +21,6 @@ use chacha20::Key;
use config::Config; use config::Config;
use maxminddb::geoip2; use maxminddb::geoip2;
use parking_lot::RwLock; use parking_lot::RwLock;
use redis::Client as RedisClient;
use rustls::{NoClientAuth, ServerConfig}; use rustls::{NoClientAuth, ServerConfig};
use sodiumoxide::crypto::stream::xchacha20::gen_key; use sodiumoxide::crypto::stream::xchacha20::gen_key;
use state::{RwLockServerState, ServerState}; use state::{RwLockServerState, ServerState};
@ -82,7 +81,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unstable_options .unstable_options
.contains(&UnstableOptions::DisableTls); .contains(&UnstableOptions::DisableTls);
let bind_address = config.bind_address; let bind_address = config.bind_address;
let redis_url = config.redis_url.clone();
// //
// Logging and warnings // Logging and warnings
@ -176,14 +174,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
CacheType::OnDisk => cache, CacheType::OnDisk => cache,
CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size), CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size),
CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size), CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size),
CacheType::Redis => {
let url = redis_url.unwrap_or_else(|| {
url::Url::parse("redis://127.0.0.1/").expect("default redis url to be parsable")
});
info!("Trying to connect to redis instance at {}", url);
let mem_cache = RedisClient::open(url)?;
Arc::new(MemoryCache::new_with_cache(cache, mem_cache))
}
}; };
let cache_0 = Arc::clone(&cache); let cache_0 = Arc::clone(&cache);
@ -299,6 +289,10 @@ fn print_preamble_and_warnings(args: &Config) -> Result<(), Box<dyn Error>> {
build_string build_string
); );
if args.ephemeral_disk_encryption {
error!("Encrypted files are _very_ broken; caveat emptor!");
}
if !args.unstable_options.is_empty() { if !args.unstable_options.is_empty() {
warn!("Unstable options are enabled. These options should not be used in production!"); warn!("Unstable options are enabled. These options should not be used in production!");
} }