Compare commits
No commits in common. "d42b80d7e1ae581deceba877ef8f43dcf662d55e" and "5fdcfa5071cdd02dbef33598a74d6c34a1ee09dd" have entirely different histories.
d42b80d7e1
...
5fdcfa5071
7 changed files with 52 additions and 213 deletions
29
Cargo.lock
generated
29
Cargo.lock
generated
|
@ -403,9 +403,6 @@ name = "bytes"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
|
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
|
||||||
dependencies = [
|
|
||||||
"serde",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bytestring"
|
name = "bytestring"
|
||||||
|
@ -498,16 +495,6 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "combine"
|
|
||||||
version = "4.6.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a2d47c1b11006b87e492b53b313bb699ce60e16613c4dddaa91f8f7c220ab2fa"
|
|
||||||
dependencies = [
|
|
||||||
"bytes 1.0.1",
|
|
||||||
"memchr",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "const_fn"
|
name = "const_fn"
|
||||||
version = "0.4.8"
|
version = "0.4.8"
|
||||||
|
@ -1264,7 +1251,6 @@ dependencies = [
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"redis",
|
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rustls",
|
"rustls",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -1681,21 +1667,6 @@ dependencies = [
|
||||||
"rand_core",
|
"rand_core",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "redis"
|
|
||||||
version = "0.21.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3bbc1838d8d0b423f325d6fac80c5f19109c7d16c8c37c584893dc17cf71c63d"
|
|
||||||
dependencies = [
|
|
||||||
"async-trait",
|
|
||||||
"combine",
|
|
||||||
"dtoa",
|
|
||||||
"itoa",
|
|
||||||
"percent-encoding",
|
|
||||||
"sha1",
|
|
||||||
"url",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "redox_syscall"
|
name = "redox_syscall"
|
||||||
version = "0.2.9"
|
version = "0.2.9"
|
||||||
|
|
|
@ -26,7 +26,7 @@ arc-swap = "1"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
bincode = "1"
|
bincode = "1"
|
||||||
bytes = { version = "1", features = [ "serde" ] }
|
bytes = "1"
|
||||||
chacha20 = "0.7"
|
chacha20 = "0.7"
|
||||||
chrono = { version = "0.4", features = [ "serde" ] }
|
chrono = { version = "0.4", features = [ "serde" ] }
|
||||||
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
|
clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] }
|
||||||
|
@ -42,7 +42,6 @@ maxminddb = "0.20"
|
||||||
md-5 = "0.9"
|
md-5 = "0.9"
|
||||||
parking_lot = "0.11"
|
parking_lot = "0.11"
|
||||||
prometheus = { version = "0.12", features = [ "process" ] }
|
prometheus = { version = "0.12", features = [ "process" ] }
|
||||||
redis = "0.21"
|
|
||||||
reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] }
|
reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] }
|
||||||
rustls = "0.19"
|
rustls = "0.19"
|
||||||
serde = "1"
|
serde = "1"
|
||||||
|
|
|
@ -66,16 +66,12 @@ metric_settings:
|
||||||
# differently from the official client.
|
# differently from the official client.
|
||||||
extended_options:
|
extended_options:
|
||||||
# Which cache type to use. By default, this is `on_disk`, but one can select
|
# Which cache type to use. By default, this is `on_disk`, but one can select
|
||||||
# `lfu`, `lru`, or `redis` to use a LFU, LRU, or redis instance in addition
|
# `lfu` or `lru` to use a LFU or LRU in addition to the on-disk cache to
|
||||||
# to the on-disk cache to improve lookup times. Generally speaking, using one
|
# improve lookup times. Generally speaking, using one is almost always better,
|
||||||
# is almost always better, but by how much depends on how much memory you let
|
# but by how much depends on how much memory you let the node use, how large
|
||||||
# the node use, how large is your node, and which caching implementation you
|
# is your node, and which caching implementation you use.
|
||||||
# use.
|
|
||||||
# cache_type: on_disk
|
# cache_type: on_disk
|
||||||
|
|
||||||
# The redis url to connect with. Does nothing if the cache type isn't`redis`.
|
|
||||||
# redis_url: "redis://127.0.0.1/"
|
|
||||||
|
|
||||||
# The amount of memory the client should use when using an in-memory cache.
|
# The amount of memory the client should use when using an in-memory cache.
|
||||||
# This does nothing if only the on-disk cache is used.
|
# This does nothing if only the on-disk cache is used.
|
||||||
# memory_quota: 0
|
# memory_quota: 0
|
||||||
|
@ -93,6 +89,8 @@ extended_options:
|
||||||
# "debug", "trace", and "off", which disables logging.
|
# "debug", "trace", and "off", which disables logging.
|
||||||
# logging_level: info
|
# logging_level: info
|
||||||
|
|
||||||
|
# Warning: Experimental. Will cause problems.
|
||||||
|
#
|
||||||
# Enables disk encryption where the key is stored in memory. In other words,
|
# Enables disk encryption where the key is stored in memory. In other words,
|
||||||
# when the MD@H program is stopped, all cached files are irrecoverable.
|
# when the MD@H program is stopped, all cached files are irrecoverable.
|
||||||
# Practically speaking, this isn't all too useful (and definitely hurts
|
# Practically speaking, this isn't all too useful (and definitely hurts
|
||||||
|
|
187
src/cache/mem.rs
vendored
187
src/cache/mem.rs
vendored
|
@ -1,4 +1,3 @@
|
||||||
use std::borrow::Cow;
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
@ -8,67 +7,10 @@ use bytes::Bytes;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use lfu_cache::LfuCache;
|
use lfu_cache::LfuCache;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use redis::{
|
|
||||||
Client as RedisClient, Commands, FromRedisValue, RedisError, RedisResult, ToRedisArgs,
|
|
||||||
};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::warn;
|
|
||||||
|
|
||||||
#[derive(Clone, Serialize, Deserialize)]
|
type CacheValue = (Bytes, ImageMetadata, u64);
|
||||||
pub struct CacheValue {
|
|
||||||
data: Bytes,
|
|
||||||
metadata: ImageMetadata,
|
|
||||||
on_disk_size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CacheValue {
|
|
||||||
#[inline]
|
|
||||||
fn new(data: Bytes, metadata: ImageMetadata, on_disk_size: u64) -> Self {
|
|
||||||
Self {
|
|
||||||
data,
|
|
||||||
metadata,
|
|
||||||
on_disk_size,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FromRedisValue for CacheValue {
|
|
||||||
fn from_redis_value(v: &redis::Value) -> RedisResult<Self> {
|
|
||||||
use bincode::ErrorKind;
|
|
||||||
|
|
||||||
if let redis::Value::Data(data) = v {
|
|
||||||
bincode::deserialize(data).map_err(|err| match *err {
|
|
||||||
ErrorKind::Io(e) => RedisError::from(e),
|
|
||||||
ErrorKind::Custom(e) => RedisError::from((
|
|
||||||
redis::ErrorKind::ResponseError,
|
|
||||||
"bincode deserialize failed",
|
|
||||||
e,
|
|
||||||
)),
|
|
||||||
e => RedisError::from((
|
|
||||||
redis::ErrorKind::ResponseError,
|
|
||||||
"bincode deserialized failed",
|
|
||||||
e.to_string(),
|
|
||||||
)),
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
Err(RedisError::from((
|
|
||||||
redis::ErrorKind::TypeError,
|
|
||||||
"Got non data type from redis db",
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ToRedisArgs for CacheValue {
|
|
||||||
fn write_redis_args<W>(&self, out: &mut W)
|
|
||||||
where
|
|
||||||
W: ?Sized + redis::RedisWrite,
|
|
||||||
{
|
|
||||||
out.write_arg(&bincode::serialize(self).expect("serialization to work"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Use LRU as the eviction strategy
|
/// Use LRU as the eviction strategy
|
||||||
pub type Lru = LruCache<CacheKey, CacheValue>;
|
pub type Lru = LruCache<CacheKey, CacheValue>;
|
||||||
|
@ -76,27 +18,22 @@ pub type Lru = LruCache<CacheKey, CacheValue>;
|
||||||
pub type Lfu = LfuCache<CacheKey, CacheValue>;
|
pub type Lfu = LfuCache<CacheKey, CacheValue>;
|
||||||
|
|
||||||
/// Adapter trait for memory cache backends
|
/// Adapter trait for memory cache backends
|
||||||
pub trait InternalMemoryCacheInitializer: InternalMemoryCache {
|
|
||||||
fn new() -> Self;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait InternalMemoryCache: Sync + Send {
|
pub trait InternalMemoryCache: Sync + Send {
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>>;
|
fn unbounded() -> Self;
|
||||||
|
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue>;
|
||||||
fn push(&mut self, key: CacheKey, data: CacheValue);
|
fn push(&mut self, key: CacheKey, data: CacheValue);
|
||||||
fn pop(&mut self) -> Option<(CacheKey, CacheValue)>;
|
fn pop(&mut self) -> Option<(CacheKey, CacheValue)>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InternalMemoryCacheInitializer for Lfu {
|
|
||||||
#[inline]
|
|
||||||
fn new() -> Self {
|
|
||||||
Self::unbounded()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InternalMemoryCache for Lfu {
|
impl InternalMemoryCache for Lfu {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> {
|
fn unbounded() -> Self {
|
||||||
self.get(key).map(Cow::Borrowed)
|
Self::unbounded()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
||||||
|
self.get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -110,17 +47,15 @@ impl InternalMemoryCache for Lfu {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InternalMemoryCacheInitializer for Lru {
|
|
||||||
#[inline]
|
|
||||||
fn new() -> Self {
|
|
||||||
Self::unbounded()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InternalMemoryCache for Lru {
|
impl InternalMemoryCache for Lru {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> {
|
fn unbounded() -> Self {
|
||||||
self.get(key).map(Cow::Borrowed)
|
Self::unbounded()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
||||||
|
self.get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -134,22 +69,6 @@ impl InternalMemoryCache for Lru {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InternalMemoryCache for RedisClient {
|
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> {
|
|
||||||
Commands::get(self, key).ok().map(Cow::Owned)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
|
||||||
if let Err(e) = Commands::set::<_, _, ()>(self, key, data) {
|
|
||||||
warn!("Failed to push to redis: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
|
||||||
unimplemented!("redis should handle its own memory")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Memory accelerated disk cache. Uses the internal cache implementation in
|
/// Memory accelerated disk cache. Uses the internal cache implementation in
|
||||||
/// memory to speed up reads.
|
/// memory to speed up reads.
|
||||||
pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
|
pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
|
||||||
|
@ -161,7 +80,7 @@ pub struct MemoryCache<MemoryCacheImpl, ColdCache> {
|
||||||
|
|
||||||
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
|
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
|
||||||
where
|
where
|
||||||
MemoryCacheImpl: 'static + InternalMemoryCacheInitializer,
|
MemoryCacheImpl: 'static + InternalMemoryCache,
|
||||||
ColdCache: 'static + Cache,
|
ColdCache: 'static + Cache,
|
||||||
{
|
{
|
||||||
pub fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
|
pub fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
|
||||||
|
@ -169,7 +88,7 @@ where
|
||||||
let new_self = Arc::new(Self {
|
let new_self = Arc::new(Self {
|
||||||
inner,
|
inner,
|
||||||
cur_mem_size: AtomicU64::new(0),
|
cur_mem_size: AtomicU64::new(0),
|
||||||
mem_cache: Mutex::new(MemoryCacheImpl::new()),
|
mem_cache: Mutex::new(MemoryCacheImpl::unbounded()),
|
||||||
master_sender: tx,
|
master_sender: tx,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -194,7 +113,7 @@ where
|
||||||
Self {
|
Self {
|
||||||
inner,
|
inner,
|
||||||
cur_mem_size: AtomicU64::new(0),
|
cur_mem_size: AtomicU64::new(0),
|
||||||
mem_cache: Mutex::new(MemoryCacheImpl::new()),
|
mem_cache: Mutex::new(MemoryCacheImpl::unbounded()),
|
||||||
master_sender: tx,
|
master_sender: tx,
|
||||||
},
|
},
|
||||||
rx,
|
rx,
|
||||||
|
@ -202,21 +121,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<MemoryCacheImpl, ColdCache> MemoryCache<MemoryCacheImpl, ColdCache>
|
|
||||||
where
|
|
||||||
MemoryCacheImpl: 'static + InternalMemoryCache,
|
|
||||||
ColdCache: 'static + Cache,
|
|
||||||
{
|
|
||||||
pub fn new_with_cache(inner: ColdCache, init_mem_cache: MemoryCacheImpl) -> Self {
|
|
||||||
Self {
|
|
||||||
inner,
|
|
||||||
cur_mem_size: AtomicU64::new(0),
|
|
||||||
mem_cache: Mutex::new(init_mem_cache),
|
|
||||||
master_sender: channel(1).0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
||||||
cache: Arc<MemoryCache<MemoryCacheImpl, ColdCache>>,
|
cache: Arc<MemoryCache<MemoryCacheImpl, ColdCache>>,
|
||||||
max_mem_size: crate::units::Bytes,
|
max_mem_size: crate::units::Bytes,
|
||||||
|
@ -242,20 +146,16 @@ async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
||||||
.mem_cache
|
.mem_cache
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.push(key, CacheValue::new(data, metadata, on_disk_size));
|
.push(key, (data, metadata, on_disk_size));
|
||||||
|
|
||||||
// Pop if too large
|
// Pop if too large
|
||||||
while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
|
while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
|
||||||
let popped = cache.mem_cache.lock().await.pop().map(
|
let popped = cache
|
||||||
|(
|
.mem_cache
|
||||||
key,
|
.lock()
|
||||||
CacheValue {
|
.await
|
||||||
data,
|
.pop()
|
||||||
metadata,
|
.map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size));
|
||||||
on_disk_size,
|
|
||||||
},
|
|
||||||
)| (key, data, metadata, on_disk_size),
|
|
||||||
);
|
|
||||||
if let Some((_, _, _, size)) = popped {
|
if let Some((_, _, _, size)) = popped {
|
||||||
cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release);
|
cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release);
|
||||||
} else {
|
} else {
|
||||||
|
@ -281,16 +181,12 @@ where
|
||||||
key: &CacheKey,
|
key: &CacheKey,
|
||||||
) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> {
|
) -> Option<Result<(CacheStream, ImageMetadata), super::CacheError>> {
|
||||||
match self.mem_cache.lock().now_or_never() {
|
match self.mem_cache.lock().now_or_never() {
|
||||||
Some(mut mem_cache) => {
|
Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| {
|
||||||
match mem_cache.get(key).map(Cow::into_owned).map(
|
Ok((CacheStream::Memory(MemStream(bytes.clone())), *metadata))
|
||||||
|CacheValue { data, metadata, .. }| {
|
}) {
|
||||||
Ok((CacheStream::Memory(MemStream(data)), metadata))
|
Some(v) => Some(v),
|
||||||
},
|
None => self.inner.get(key).await,
|
||||||
) {
|
},
|
||||||
Some(v) => Some(v),
|
|
||||||
None => self.inner.get(key).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => self.inner.get(key).await,
|
None => self.inner.get(key).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -310,11 +206,10 @@ where
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test_util {
|
mod test_util {
|
||||||
use std::borrow::Cow;
|
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
|
|
||||||
use super::{CacheValue, InternalMemoryCache, InternalMemoryCacheInitializer};
|
use super::{CacheValue, InternalMemoryCache};
|
||||||
use crate::cache::{
|
use crate::cache::{
|
||||||
Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
|
Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
|
||||||
};
|
};
|
||||||
|
@ -380,15 +275,13 @@ mod test_util {
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct TestMemoryCache(pub BTreeMap<CacheKey, CacheValue>);
|
pub struct TestMemoryCache(pub BTreeMap<CacheKey, CacheValue>);
|
||||||
|
|
||||||
impl InternalMemoryCacheInitializer for TestMemoryCache {
|
impl InternalMemoryCache for TestMemoryCache {
|
||||||
fn new() -> Self {
|
fn unbounded() -> Self {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl InternalMemoryCache for TestMemoryCache {
|
fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> {
|
||||||
fn get(&mut self, key: &CacheKey) -> Option<Cow<CacheValue>> {
|
self.0.get(key)
|
||||||
self.0.get(key).map(Cow::Borrowed)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
fn push(&mut self, key: CacheKey, data: CacheValue) {
|
||||||
|
@ -413,7 +306,7 @@ mod cache_ops {
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
|
|
||||||
use crate::cache::mem::{CacheValue, InternalMemoryCache};
|
use crate::cache::mem::InternalMemoryCache;
|
||||||
use crate::cache::{Cache, CacheEntry, CacheKey, CacheStream, ImageMetadata, MemStream};
|
use crate::cache::{Cache, CacheEntry, CacheKey, CacheStream, ImageMetadata, MemStream};
|
||||||
|
|
||||||
use super::test_util::{TestDiskCache, TestMemoryCache};
|
use super::test_util::{TestDiskCache, TestMemoryCache};
|
||||||
|
@ -433,7 +326,7 @@ mod cache_ops {
|
||||||
last_modified: None,
|
last_modified: None,
|
||||||
};
|
};
|
||||||
let bytes = Bytes::from_static(b"abcd");
|
let bytes = Bytes::from_static(b"abcd");
|
||||||
let value = CacheValue::new(bytes.clone(), metadata.clone(), 34);
|
let value = (bytes.clone(), metadata.clone(), 34);
|
||||||
|
|
||||||
// Populate the cache, need to drop the lock else it's considered locked
|
// Populate the cache, need to drop the lock else it's considered locked
|
||||||
// when we actually call the cache
|
// when we actually call the cache
|
||||||
|
|
10
src/cache/mod.rs
vendored
10
src/cache/mod.rs
vendored
|
@ -12,7 +12,6 @@ use chacha20::Key;
|
||||||
use chrono::{DateTime, FixedOffset};
|
use chrono::{DateTime, FixedOffset};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use redis::ToRedisArgs;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
@ -36,15 +35,6 @@ pub mod mem;
|
||||||
#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord)]
|
#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord)]
|
||||||
pub struct CacheKey(pub String, pub String, pub bool);
|
pub struct CacheKey(pub String, pub String, pub bool);
|
||||||
|
|
||||||
impl ToRedisArgs for CacheKey {
|
|
||||||
fn write_redis_args<W>(&self, out: &mut W)
|
|
||||||
where
|
|
||||||
W: ?Sized + redis::RedisWrite,
|
|
||||||
{
|
|
||||||
out.write_arg_fmt(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for CacheKey {
|
impl Display for CacheKey {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
if self.2 {
|
if self.2 {
|
||||||
|
|
|
@ -109,7 +109,6 @@ pub struct Config {
|
||||||
pub enable_metrics: bool,
|
pub enable_metrics: bool,
|
||||||
pub geoip_license_key: Option<ClientSecret>,
|
pub geoip_license_key: Option<ClientSecret>,
|
||||||
pub proxy: Option<Url>,
|
pub proxy: Option<Url>,
|
||||||
pub redis_url: Option<Url>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
@ -211,7 +210,6 @@ impl Config {
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
proxy: cli_args.proxy,
|
proxy: cli_args.proxy,
|
||||||
redis_url: file_extended_options.redis_url,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -238,7 +236,6 @@ pub enum CacheType {
|
||||||
OnDisk,
|
OnDisk,
|
||||||
Lru,
|
Lru,
|
||||||
Lfu,
|
Lfu,
|
||||||
Redis,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromStr for CacheType {
|
impl FromStr for CacheType {
|
||||||
|
@ -249,7 +246,6 @@ impl FromStr for CacheType {
|
||||||
"on_disk" => Ok(Self::OnDisk),
|
"on_disk" => Ok(Self::OnDisk),
|
||||||
"lru" => Ok(Self::Lru),
|
"lru" => Ok(Self::Lru),
|
||||||
"lfu" => Ok(Self::Lfu),
|
"lfu" => Ok(Self::Lfu),
|
||||||
"redis" => Ok(Self::Redis),
|
|
||||||
_ => Err(format!("Unknown option: {}", s)),
|
_ => Err(format!("Unknown option: {}", s)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -298,7 +294,6 @@ struct YamlExtendedOptions {
|
||||||
enable_metrics: Option<bool>,
|
enable_metrics: Option<bool>,
|
||||||
logging_level: Option<LevelFilter>,
|
logging_level: Option<LevelFilter>,
|
||||||
cache_path: Option<PathBuf>,
|
cache_path: Option<PathBuf>,
|
||||||
redis_url: Option<Url>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clap, Clone)]
|
#[derive(Clap, Clone)]
|
||||||
|
@ -346,7 +341,7 @@ struct CliArgs {
|
||||||
#[clap(short, long)]
|
#[clap(short, long)]
|
||||||
pub config_path: Option<PathBuf>,
|
pub config_path: Option<PathBuf>,
|
||||||
/// Whether to use an in-memory cache in addition to the disk cache. Default
|
/// Whether to use an in-memory cache in addition to the disk cache. Default
|
||||||
/// value is "on_disk", other options are "lfu", "lru", and "redis".
|
/// value is "on_disk", other options are "lfu" and "lru".
|
||||||
#[clap(short = 't', long)]
|
#[clap(short = 't', long)]
|
||||||
pub cache_type: Option<CacheType>,
|
pub cache_type: Option<CacheType>,
|
||||||
/// Whether or not to use a proxy for upstream requests. This affects all
|
/// Whether or not to use a proxy for upstream requests. This affects all
|
||||||
|
@ -462,7 +457,6 @@ mod config {
|
||||||
enable_metrics: None,
|
enable_metrics: None,
|
||||||
logging_level: Some(LevelFilter::Error),
|
logging_level: Some(LevelFilter::Error),
|
||||||
cache_path: Some(PathBuf::from("b")),
|
cache_path: Some(PathBuf::from("b")),
|
||||||
redis_url: None,
|
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
14
src/main.rs
14
src/main.rs
|
@ -21,7 +21,6 @@ use chacha20::Key;
|
||||||
use config::Config;
|
use config::Config;
|
||||||
use maxminddb::geoip2;
|
use maxminddb::geoip2;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use redis::Client as RedisClient;
|
|
||||||
use rustls::{NoClientAuth, ServerConfig};
|
use rustls::{NoClientAuth, ServerConfig};
|
||||||
use sodiumoxide::crypto::stream::xchacha20::gen_key;
|
use sodiumoxide::crypto::stream::xchacha20::gen_key;
|
||||||
use state::{RwLockServerState, ServerState};
|
use state::{RwLockServerState, ServerState};
|
||||||
|
@ -82,7 +81,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
.unstable_options
|
.unstable_options
|
||||||
.contains(&UnstableOptions::DisableTls);
|
.contains(&UnstableOptions::DisableTls);
|
||||||
let bind_address = config.bind_address;
|
let bind_address = config.bind_address;
|
||||||
let redis_url = config.redis_url.clone();
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// Logging and warnings
|
// Logging and warnings
|
||||||
|
@ -176,14 +174,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
CacheType::OnDisk => cache,
|
CacheType::OnDisk => cache,
|
||||||
CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size),
|
CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size),
|
||||||
CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size),
|
CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size),
|
||||||
CacheType::Redis => {
|
|
||||||
let url = redis_url.unwrap_or_else(|| {
|
|
||||||
url::Url::parse("redis://127.0.0.1/").expect("default redis url to be parsable")
|
|
||||||
});
|
|
||||||
info!("Trying to connect to redis instance at {}", url);
|
|
||||||
let mem_cache = RedisClient::open(url)?;
|
|
||||||
Arc::new(MemoryCache::new_with_cache(cache, mem_cache))
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let cache_0 = Arc::clone(&cache);
|
let cache_0 = Arc::clone(&cache);
|
||||||
|
@ -299,6 +289,10 @@ fn print_preamble_and_warnings(args: &Config) -> Result<(), Box<dyn Error>> {
|
||||||
build_string
|
build_string
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if args.ephemeral_disk_encryption {
|
||||||
|
error!("Encrypted files are _very_ broken; caveat emptor!");
|
||||||
|
}
|
||||||
|
|
||||||
if !args.unstable_options.is_empty() {
|
if !args.unstable_options.is_empty() {
|
||||||
warn!("Unstable options are enabled. These options should not be used in production!");
|
warn!("Unstable options are enabled. These options should not be used in production!");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue