Compare commits
No commits in common. "f8f4098faeeee99a3a9b9144278a5d0226a73e19" and "5da486d43d111aa187d48bbdb7d6173d844152e8" have entirely different histories.
f8f4098fae
...
5da486d43d
3 changed files with 6 additions and 157 deletions
147
src/cache/mem.rs
vendored
147
src/cache/mem.rs
vendored
|
@ -129,7 +129,7 @@ async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
||||||
MemoryCacheImpl: InternalMemoryCache,
|
MemoryCacheImpl: InternalMemoryCache,
|
||||||
ColdCache: Cache,
|
ColdCache: Cache,
|
||||||
{
|
{
|
||||||
let max_mem_size = mem_threshold(&max_mem_size);
|
let max_mem_size = max_mem_size.get() / 20 * 19;
|
||||||
while let Some(CacheEntry {
|
while let Some(CacheEntry {
|
||||||
key,
|
key,
|
||||||
data,
|
data,
|
||||||
|
@ -165,10 +165,6 @@ async fn internal_cache_listener<MemoryCacheImpl, ColdCache>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const fn mem_threshold(bytes: &crate::units::Bytes) -> usize {
|
|
||||||
bytes.get() / 20 * 19
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
|
impl<MemoryCacheImpl, ColdCache> Cache for MemoryCache<MemoryCacheImpl, ColdCache>
|
||||||
where
|
where
|
||||||
|
@ -207,7 +203,7 @@ where
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test_util {
|
mod test_util {
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use super::{CacheValue, InternalMemoryCache};
|
use super::{CacheValue, InternalMemoryCache};
|
||||||
use crate::cache::{
|
use crate::cache::{
|
||||||
|
@ -273,7 +269,7 @@ mod test_util {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct TestMemoryCache(pub BTreeMap<CacheKey, CacheValue>);
|
pub struct TestMemoryCache(pub HashMap<CacheKey, CacheValue>);
|
||||||
|
|
||||||
impl InternalMemoryCache for TestMemoryCache {
|
impl InternalMemoryCache for TestMemoryCache {
|
||||||
fn unbounded() -> Self {
|
fn unbounded() -> Self {
|
||||||
|
@ -289,12 +285,7 @@ mod test_util {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
fn pop(&mut self) -> Option<(CacheKey, CacheValue)> {
|
||||||
let mut cache = BTreeMap::new();
|
unimplemented!("shouldn't be needed for tests");
|
||||||
std::mem::swap(&mut cache, &mut self.0);
|
|
||||||
let mut iter = cache.into_iter();
|
|
||||||
let ret = iter.next();
|
|
||||||
self.0 = iter.collect();
|
|
||||||
ret
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -478,133 +469,5 @@ mod cache_ops {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod db_listener {
|
mod db_listener {
|
||||||
use std::error::Error;
|
use super::*;
|
||||||
use std::iter::FromIterator;
|
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use tokio::task;
|
|
||||||
|
|
||||||
use crate::cache::{Cache, CacheKey, ImageMetadata};
|
|
||||||
|
|
||||||
use super::test_util::{TestDiskCache, TestMemoryCache};
|
|
||||||
use super::{internal_cache_listener, MemoryCache};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn put_into_memory() -> Result<(), Box<dyn Error>> {
|
|
||||||
let (cache, rx) = MemoryCache::<TestMemoryCache, _>::new_with_receiver(
|
|
||||||
TestDiskCache::default(),
|
|
||||||
crate::units::Bytes(0),
|
|
||||||
);
|
|
||||||
let cache = Arc::new(cache);
|
|
||||||
tokio::spawn(internal_cache_listener(
|
|
||||||
Arc::clone(&cache),
|
|
||||||
crate::units::Bytes(20),
|
|
||||||
rx,
|
|
||||||
));
|
|
||||||
|
|
||||||
// put small image into memory
|
|
||||||
let key = CacheKey("a".to_string(), "b".to_string(), false);
|
|
||||||
let metadata = ImageMetadata {
|
|
||||||
content_type: None,
|
|
||||||
content_length: Some(1),
|
|
||||||
last_modified: None,
|
|
||||||
};
|
|
||||||
let bytes = Bytes::from_static(b"abcd");
|
|
||||||
cache.put(key.clone(), bytes.clone(), metadata).await?;
|
|
||||||
|
|
||||||
// let the listener run first
|
|
||||||
for _ in 0..10 {
|
|
||||||
task::yield_now().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
cache.cur_mem_size.load(Ordering::SeqCst),
|
|
||||||
bytes.len() as u64
|
|
||||||
);
|
|
||||||
|
|
||||||
// Since we didn't populate the cache, fetching must be from memory, so
|
|
||||||
// this should succeed since the cache listener should push the item
|
|
||||||
// into cache
|
|
||||||
assert!(cache.get(&key).await.is_some());
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn pops_items() -> Result<(), Box<dyn Error>> {
|
|
||||||
let (cache, rx) = MemoryCache::<TestMemoryCache, _>::new_with_receiver(
|
|
||||||
TestDiskCache::default(),
|
|
||||||
crate::units::Bytes(0),
|
|
||||||
);
|
|
||||||
let cache = Arc::new(cache);
|
|
||||||
tokio::spawn(internal_cache_listener(
|
|
||||||
Arc::clone(&cache),
|
|
||||||
crate::units::Bytes(20),
|
|
||||||
rx,
|
|
||||||
));
|
|
||||||
|
|
||||||
// put small image into memory
|
|
||||||
let key_0 = CacheKey("a".to_string(), "b".to_string(), false);
|
|
||||||
let key_1 = CacheKey("c".to_string(), "d".to_string(), false);
|
|
||||||
let metadata = ImageMetadata {
|
|
||||||
content_type: None,
|
|
||||||
content_length: Some(1),
|
|
||||||
last_modified: None,
|
|
||||||
};
|
|
||||||
let bytes = Bytes::from_static(b"abcde");
|
|
||||||
|
|
||||||
cache.put(key_0, bytes.clone(), metadata.clone()).await?;
|
|
||||||
cache.put(key_1, bytes.clone(), metadata).await?;
|
|
||||||
|
|
||||||
// let the listener run first
|
|
||||||
task::yield_now().await;
|
|
||||||
for _ in 0..10 {
|
|
||||||
task::yield_now().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Items should be in cache now
|
|
||||||
assert_eq!(
|
|
||||||
cache.cur_mem_size.load(Ordering::SeqCst),
|
|
||||||
(bytes.len() * 2) as u64
|
|
||||||
);
|
|
||||||
|
|
||||||
let key_3 = CacheKey("e".to_string(), "f".to_string(), false);
|
|
||||||
let metadata = ImageMetadata {
|
|
||||||
content_type: None,
|
|
||||||
content_length: Some(1),
|
|
||||||
last_modified: None,
|
|
||||||
};
|
|
||||||
let bytes = Bytes::from_iter(b"0".repeat(16).into_iter());
|
|
||||||
let bytes_len = bytes.len();
|
|
||||||
cache.put(key_3, bytes, metadata).await?;
|
|
||||||
|
|
||||||
// let the listener run first
|
|
||||||
task::yield_now().await;
|
|
||||||
for _ in 0..10 {
|
|
||||||
task::yield_now().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Items should have been evicted, only 16 bytes should be there now
|
|
||||||
assert_eq!(cache.cur_mem_size.load(Ordering::SeqCst), bytes_len as u64);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod mem_threshold {
|
|
||||||
use crate::units::Bytes;
|
|
||||||
|
|
||||||
use super::mem_threshold;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn small_amount_works() {
|
|
||||||
assert_eq!(mem_threshold(&Bytes(100)), 95);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn large_amount_cannot_overflow() {
|
|
||||||
assert_eq!(mem_threshold(&Bytes(usize::MAX)), 17524406870024074020);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
2
src/cache/mod.rs
vendored
2
src/cache/mod.rs
vendored
|
@ -32,7 +32,7 @@ mod disk;
|
||||||
mod fs;
|
mod fs;
|
||||||
pub mod mem;
|
pub mod mem;
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord)]
|
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
|
||||||
pub struct CacheKey(pub String, pub String, pub bool);
|
pub struct CacheKey(pub String, pub String, pub bool);
|
||||||
|
|
||||||
impl Display for CacheKey {
|
impl Display for CacheKey {
|
||||||
|
|
14
src/stop.rs
14
src/stop.rs
|
@ -30,17 +30,3 @@ pub async fn send_stop(secret: &ClientSecret) {
|
||||||
Err(e) => warn!("Got error while sending stop message: {}", e),
|
Err(e) => warn!("Got error while sending stop message: {}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod stop {
|
|
||||||
use super::CONTROL_CENTER_STOP_URL;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn stop_url_does_not_have_ping_in_url() {
|
|
||||||
// This looks like a dumb test, yes, but it ensures that clients don't
|
|
||||||
// get marked compromised because apparently just sending a json obj
|
|
||||||
// with just the secret is acceptable to the ping endpoint, which messes
|
|
||||||
// up non-trivial client configs.
|
|
||||||
assert!(!CONTROL_CENTER_STOP_URL.contains("ping"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue