diff --git a/Cargo.lock b/Cargo.lock index 1957860..99bf0d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,6 +403,9 @@ name = "bytes" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +dependencies = [ + "serde", +] [[package]] name = "bytestring" @@ -495,6 +498,16 @@ dependencies = [ "syn", ] +[[package]] +name = "combine" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2d47c1b11006b87e492b53b313bb699ce60e16613c4dddaa91f8f7c220ab2fa" +dependencies = [ + "bytes 1.0.1", + "memchr", +] + [[package]] name = "const_fn" version = "0.4.8" @@ -1251,6 +1264,7 @@ dependencies = [ "once_cell", "parking_lot", "prometheus", + "redis", "reqwest", "rustls", "serde", @@ -1667,6 +1681,21 @@ dependencies = [ "rand_core", ] +[[package]] +name = "redis" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bbc1838d8d0b423f325d6fac80c5f19109c7d16c8c37c584893dc17cf71c63d" +dependencies = [ + "async-trait", + "combine", + "dtoa", + "itoa", + "percent-encoding", + "sha1", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.9" diff --git a/Cargo.toml b/Cargo.toml index acf831c..4b31e5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ arc-swap = "1" async-trait = "0.1" base64 = "0.13" bincode = "1" -bytes = "1" +bytes = { version = "1", features = [ "serde" ] } chacha20 = "0.7" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "3.0.0-beta.2", features = [ "wrap_help" ] } @@ -42,6 +42,7 @@ maxminddb = "0.20" md-5 = "0.9" parking_lot = "0.11" prometheus = { version = "0.12", features = [ "process" ] } +redis = "0.21" reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] } rustls = "0.19" serde = "1" diff --git a/settings.sample.yaml b/settings.sample.yaml index aa9ddf7..13e00f8 100644 --- a/settings.sample.yaml +++ b/settings.sample.yaml @@ -66,12 +66,16 @@ metric_settings: # differently from the official client. extended_options: # Which cache type to use. By default, this is `on_disk`, but one can select - # `lfu` or `lru` to use a LFU or LRU in addition to the on-disk cache to - # improve lookup times. Generally speaking, using one is almost always better, - # but by how much depends on how much memory you let the node use, how large - # is your node, and which caching implementation you use. + # `lfu`, `lru`, or `redis` to use a LFU, LRU, or redis instance in addition + # to the on-disk cache to improve lookup times. Generally speaking, using one + # is almost always better, but by how much depends on how much memory you let + # the node use, how large is your node, and which caching implementation you + # use. # cache_type: on_disk + # The redis url to connect with. Does nothing if the cache type isn't`redis`. + # redis_url: "redis://127.0.0.1/" + # The amount of memory the client should use when using an in-memory cache. # This does nothing if only the on-disk cache is used. # memory_quota: 0 diff --git a/src/cache/mem.rs b/src/cache/mem.rs index 4182c37..1efc52a 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -7,10 +8,67 @@ use bytes::Bytes; use futures::FutureExt; use lfu_cache::LfuCache; use lru::LruCache; +use redis::{ + Client as RedisClient, Commands, FromRedisValue, RedisError, RedisResult, ToRedisArgs, +}; +use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; +use tracing::warn; -type CacheValue = (Bytes, ImageMetadata, u64); +#[derive(Clone, Serialize, Deserialize)] +pub struct CacheValue { + data: Bytes, + metadata: ImageMetadata, + on_disk_size: u64, +} + +impl CacheValue { + #[inline] + fn new(data: Bytes, metadata: ImageMetadata, on_disk_size: u64) -> Self { + Self { + data, + metadata, + on_disk_size, + } + } +} + +impl FromRedisValue for CacheValue { + fn from_redis_value(v: &redis::Value) -> RedisResult { + use bincode::ErrorKind; + + if let redis::Value::Data(data) = v { + bincode::deserialize(data).map_err(|err| match *err { + ErrorKind::Io(e) => RedisError::from(e), + ErrorKind::Custom(e) => RedisError::from(( + redis::ErrorKind::ResponseError, + "bincode deserialize failed", + e, + )), + e => RedisError::from(( + redis::ErrorKind::ResponseError, + "bincode deserialized failed", + e.to_string(), + )), + }) + } else { + Err(RedisError::from(( + redis::ErrorKind::TypeError, + "Got non data type from redis db", + ))) + } + } +} + +impl ToRedisArgs for CacheValue { + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + redis::RedisWrite, + { + out.write_arg(&bincode::serialize(self).expect("serialization to work")) + } +} /// Use LRU as the eviction strategy pub type Lru = LruCache; @@ -18,22 +76,27 @@ pub type Lru = LruCache; pub type Lfu = LfuCache; /// Adapter trait for memory cache backends +pub trait InternalMemoryCacheInitializer: InternalMemoryCache { + fn new() -> Self; +} + pub trait InternalMemoryCache: Sync + Send { - fn unbounded() -> Self; - fn get(&mut self, key: &CacheKey) -> Option<&CacheValue>; + fn get(&mut self, key: &CacheKey) -> Option>; fn push(&mut self, key: CacheKey, data: CacheValue); fn pop(&mut self) -> Option<(CacheKey, CacheValue)>; } -impl InternalMemoryCache for Lfu { +impl InternalMemoryCacheInitializer for Lfu { #[inline] - fn unbounded() -> Self { + fn new() -> Self { Self::unbounded() } +} +impl InternalMemoryCache for Lfu { #[inline] - fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> { - self.get(key) + fn get(&mut self, key: &CacheKey) -> Option> { + self.get(key).map(Cow::Borrowed) } #[inline] @@ -47,15 +110,17 @@ impl InternalMemoryCache for Lfu { } } -impl InternalMemoryCache for Lru { +impl InternalMemoryCacheInitializer for Lru { #[inline] - fn unbounded() -> Self { + fn new() -> Self { Self::unbounded() } +} +impl InternalMemoryCache for Lru { #[inline] - fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> { - self.get(key) + fn get(&mut self, key: &CacheKey) -> Option> { + self.get(key).map(Cow::Borrowed) } #[inline] @@ -69,6 +134,22 @@ impl InternalMemoryCache for Lru { } } +impl InternalMemoryCache for RedisClient { + fn get(&mut self, key: &CacheKey) -> Option> { + Commands::get(self, key).ok().map(Cow::Owned) + } + + fn push(&mut self, key: CacheKey, data: CacheValue) { + if let Err(e) = Commands::set::<_, _, ()>(self, key, data) { + warn!("Failed to push to redis: {}", e); + } + } + + fn pop(&mut self) -> Option<(CacheKey, CacheValue)> { + unimplemented!("redis should handle its own memory") + } +} + /// Memory accelerated disk cache. Uses the internal cache implementation in /// memory to speed up reads. pub struct MemoryCache { @@ -80,7 +161,7 @@ pub struct MemoryCache { impl MemoryCache where - MemoryCacheImpl: 'static + InternalMemoryCache, + MemoryCacheImpl: 'static + InternalMemoryCacheInitializer, ColdCache: 'static + Cache, { pub fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc { @@ -88,7 +169,7 @@ where let new_self = Arc::new(Self { inner, cur_mem_size: AtomicU64::new(0), - mem_cache: Mutex::new(MemoryCacheImpl::unbounded()), + mem_cache: Mutex::new(MemoryCacheImpl::new()), master_sender: tx, }); @@ -113,7 +194,7 @@ where Self { inner, cur_mem_size: AtomicU64::new(0), - mem_cache: Mutex::new(MemoryCacheImpl::unbounded()), + mem_cache: Mutex::new(MemoryCacheImpl::new()), master_sender: tx, }, rx, @@ -121,6 +202,21 @@ where } } +impl MemoryCache +where + MemoryCacheImpl: 'static + InternalMemoryCache, + ColdCache: 'static + Cache, +{ + pub fn new_with_cache(inner: ColdCache, init_mem_cache: MemoryCacheImpl) -> Self { + Self { + inner, + cur_mem_size: AtomicU64::new(0), + mem_cache: Mutex::new(init_mem_cache), + master_sender: channel(1).0, + } + } +} + async fn internal_cache_listener( cache: Arc>, max_mem_size: crate::units::Bytes, @@ -146,16 +242,20 @@ async fn internal_cache_listener( .mem_cache .lock() .await - .push(key, (data, metadata, on_disk_size)); + .push(key, CacheValue::new(data, metadata, on_disk_size)); // Pop if too large while cache.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 { - let popped = cache - .mem_cache - .lock() - .await - .pop() - .map(|(key, (bytes, metadata, size))| (key, bytes, metadata, size)); + let popped = cache.mem_cache.lock().await.pop().map( + |( + key, + CacheValue { + data, + metadata, + on_disk_size, + }, + )| (key, data, metadata, on_disk_size), + ); if let Some((_, _, _, size)) = popped { cache.cur_mem_size.fetch_sub(size as u64, Ordering::Release); } else { @@ -181,12 +281,16 @@ where key: &CacheKey, ) -> Option> { match self.mem_cache.lock().now_or_never() { - Some(mut mem_cache) => match mem_cache.get(key).map(|(bytes, metadata, _)| { - Ok((CacheStream::Memory(MemStream(bytes.clone())), *metadata)) - }) { - Some(v) => Some(v), - None => self.inner.get(key).await, - }, + Some(mut mem_cache) => { + match mem_cache.get(key).map(Cow::into_owned).map( + |CacheValue { data, metadata, .. }| { + Ok((CacheStream::Memory(MemStream(data)), metadata)) + }, + ) { + Some(v) => Some(v), + None => self.inner.get(key).await, + } + } None => self.inner.get(key).await, } } @@ -206,10 +310,11 @@ where #[cfg(test)] mod test_util { + use std::borrow::Cow; use std::cell::RefCell; use std::collections::{BTreeMap, HashMap}; - use super::{CacheValue, InternalMemoryCache}; + use super::{CacheValue, InternalMemoryCache, InternalMemoryCacheInitializer}; use crate::cache::{ Cache, CacheEntry, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, }; @@ -275,13 +380,15 @@ mod test_util { #[derive(Default)] pub struct TestMemoryCache(pub BTreeMap); - impl InternalMemoryCache for TestMemoryCache { - fn unbounded() -> Self { + impl InternalMemoryCacheInitializer for TestMemoryCache { + fn new() -> Self { Self::default() } + } - fn get(&mut self, key: &CacheKey) -> Option<&CacheValue> { - self.0.get(key) + impl InternalMemoryCache for TestMemoryCache { + fn get(&mut self, key: &CacheKey) -> Option> { + self.0.get(key).map(Cow::Borrowed) } fn push(&mut self, key: CacheKey, data: CacheValue) { @@ -306,7 +413,7 @@ mod cache_ops { use bytes::Bytes; use futures::{FutureExt, StreamExt}; - use crate::cache::mem::InternalMemoryCache; + use crate::cache::mem::{CacheValue, InternalMemoryCache}; use crate::cache::{Cache, CacheEntry, CacheKey, CacheStream, ImageMetadata, MemStream}; use super::test_util::{TestDiskCache, TestMemoryCache}; @@ -326,7 +433,7 @@ mod cache_ops { last_modified: None, }; let bytes = Bytes::from_static(b"abcd"); - let value = (bytes.clone(), metadata.clone(), 34); + let value = CacheValue::new(bytes.clone(), metadata.clone(), 34); // Populate the cache, need to drop the lock else it's considered locked // when we actually call the cache diff --git a/src/cache/mod.rs b/src/cache/mod.rs index bc5e259..7fc1699 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -12,6 +12,7 @@ use chacha20::Key; use chrono::{DateTime, FixedOffset}; use futures::{Stream, StreamExt}; use once_cell::sync::OnceCell; +use redis::ToRedisArgs; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use thiserror::Error; @@ -35,6 +36,15 @@ pub mod mem; #[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord)] pub struct CacheKey(pub String, pub String, pub bool); +impl ToRedisArgs for CacheKey { + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + redis::RedisWrite, + { + out.write_arg_fmt(self) + } +} + impl Display for CacheKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { if self.2 { diff --git a/src/config.rs b/src/config.rs index 5018777..9dbd673 100644 --- a/src/config.rs +++ b/src/config.rs @@ -109,6 +109,7 @@ pub struct Config { pub enable_metrics: bool, pub geoip_license_key: Option, pub proxy: Option, + pub redis_url: Option, } impl Config { @@ -210,6 +211,7 @@ impl Config { } }), proxy: cli_args.proxy, + redis_url: file_extended_options.redis_url, } } } @@ -236,6 +238,7 @@ pub enum CacheType { OnDisk, Lru, Lfu, + Redis, } impl FromStr for CacheType { @@ -246,6 +249,7 @@ impl FromStr for CacheType { "on_disk" => Ok(Self::OnDisk), "lru" => Ok(Self::Lru), "lfu" => Ok(Self::Lfu), + "redis" => Ok(Self::Redis), _ => Err(format!("Unknown option: {}", s)), } } @@ -294,6 +298,7 @@ struct YamlExtendedOptions { enable_metrics: Option, logging_level: Option, cache_path: Option, + redis_url: Option, } #[derive(Clap, Clone)] @@ -341,7 +346,7 @@ struct CliArgs { #[clap(short, long)] pub config_path: Option, /// Whether to use an in-memory cache in addition to the disk cache. Default - /// value is "on_disk", other options are "lfu" and "lru". + /// value is "on_disk", other options are "lfu", "lru", and "redis". #[clap(short = 't', long)] pub cache_type: Option, /// Whether or not to use a proxy for upstream requests. This affects all @@ -457,6 +462,7 @@ mod config { enable_metrics: None, logging_level: Some(LevelFilter::Error), cache_path: Some(PathBuf::from("b")), + redis_url: None, }), }; diff --git a/src/main.rs b/src/main.rs index 87fa085..464c91b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,6 +21,7 @@ use chacha20::Key; use config::Config; use maxminddb::geoip2; use parking_lot::RwLock; +use redis::Client as RedisClient; use rustls::{NoClientAuth, ServerConfig}; use sodiumoxide::crypto::stream::xchacha20::gen_key; use state::{RwLockServerState, ServerState}; @@ -81,6 +82,7 @@ async fn main() -> Result<(), Box> { .unstable_options .contains(&UnstableOptions::DisableTls); let bind_address = config.bind_address; + let redis_url = config.redis_url.clone(); // // Logging and warnings @@ -174,6 +176,14 @@ async fn main() -> Result<(), Box> { CacheType::OnDisk => cache, CacheType::Lru => MemoryCache::::new(cache, memory_max_size), CacheType::Lfu => MemoryCache::::new(cache, memory_max_size), + CacheType::Redis => { + let url = redis_url.unwrap_or_else(|| { + url::Url::parse("redis://127.0.0.1/").expect("default redis url to be parsable") + }); + info!("Trying to connect to redis instance at {}", url); + let mem_cache = RedisClient::open(url)?; + Arc::new(MemoryCache::new_with_cache(cache, mem_cache)) + } }; let cache_0 = Arc::clone(&cache);