Compare commits
2 commits
b1797dafd2
...
d4d22ec674
Author | SHA1 | Date | |
---|---|---|---|
d4d22ec674 | |||
353ee72713 |
7 changed files with 231 additions and 25 deletions
|
@ -42,7 +42,7 @@ serde_yaml = "0.8"
|
|||
sodiumoxide = "0.2"
|
||||
sqlx = { version = "0.5", features = [ "runtime-actix-rustls", "sqlite", "time", "chrono", "macros", "offline" ] }
|
||||
thiserror = "1"
|
||||
tokio = { version = "1", features = [ "full", "parking_lot" ] }
|
||||
tokio = { version = "1", features = [ "rt-multi-thread", "macros", "fs", "sync", "parking_lot" ] }
|
||||
tokio-stream = { version = "0.1", features = [ "sync" ] }
|
||||
tokio-util = { version = "0.6", features = [ "codec" ] }
|
||||
tracing = "0.1"
|
||||
|
|
1
db_queries/insert_image.sql
Normal file
1
db_queries/insert_image.sql
Normal file
|
@ -0,0 +1 @@
|
|||
insert into Images (id, size, accessed) values (?, ?, ?) on conflict do nothing
|
73
src/cache/compat.rs
vendored
73
src/cache/compat.rs
vendored
|
@ -77,3 +77,76 @@ impl<'de> Deserialize<'de> for LegacyImageContentType {
|
|||
deserializer.deserialize_str(LegacyImageContentTypeVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod parse {
|
||||
use std::error::Error;
|
||||
|
||||
use chrono::DateTime;
|
||||
|
||||
use crate::cache::ImageContentType;
|
||||
|
||||
use super::LegacyImageMetadata;
|
||||
|
||||
#[test]
|
||||
fn from_valid_legacy_format() -> Result<(), Box<dyn Error>> {
|
||||
let legacy_header = r#"{"content_type":"image/jpeg","last_modified":"Sat, 10 Apr 2021 10:55:22 GMT","size":117888}"#;
|
||||
let metadata: LegacyImageMetadata = serde_json::from_str(legacy_header)?;
|
||||
|
||||
assert_eq!(
|
||||
metadata.content_type.map(|v| v.0),
|
||||
Some(ImageContentType::Jpeg)
|
||||
);
|
||||
assert_eq!(metadata.size, Some(117888));
|
||||
assert_eq!(
|
||||
metadata.last_modified.map(|v| v.0),
|
||||
Some(DateTime::parse_from_rfc2822(
|
||||
"Sat, 10 Apr 2021 10:55:22 GMT"
|
||||
)?)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_metadata() -> Result<(), Box<dyn Error>> {
|
||||
let legacy_header = "{}";
|
||||
let metadata: LegacyImageMetadata = serde_json::from_str(legacy_header)?;
|
||||
|
||||
assert!(metadata.content_type.is_none());
|
||||
assert!(metadata.size.is_none());
|
||||
assert!(metadata.last_modified.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_image_mime_value() {
|
||||
let legacy_header = r#"{"content_type":"image/not-a-real-image"}"#;
|
||||
assert!(serde_json::from_str::<LegacyImageMetadata>(legacy_header).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_date_time() {
|
||||
let legacy_header = r#"{"last_modified":"idk last tuesday?"}"#;
|
||||
assert!(serde_json::from_str::<LegacyImageMetadata>(legacy_header).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_size() {
|
||||
let legacy_header = r#"{"size":-1}"#;
|
||||
assert!(serde_json::from_str::<LegacyImageMetadata>(legacy_header).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wrong_image_type() {
|
||||
let legacy_header = r#"{"content_type":25}"#;
|
||||
assert!(serde_json::from_str::<LegacyImageMetadata>(legacy_header).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wrong_date_time_type() {
|
||||
let legacy_header = r#"{"last_modified":false}"#;
|
||||
assert!(serde_json::from_str::<LegacyImageMetadata>(legacy_header).is_err());
|
||||
}
|
||||
}
|
||||
|
|
78
src/cache/disk.rs
vendored
78
src/cache/disk.rs
vendored
|
@ -41,28 +41,30 @@ impl DiskCache {
|
|||
/// This internally spawns a task that will wait for filesystem
|
||||
/// notifications when a file has been written.
|
||||
pub async fn new(disk_max_size: Bytes, disk_path: PathBuf) -> Arc<Self> {
|
||||
let (db_tx, db_rx) = channel(128);
|
||||
let db_pool = {
|
||||
let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy());
|
||||
let mut options = SqliteConnectOptions::from_str(&db_url)
|
||||
.unwrap()
|
||||
.create_if_missing(true);
|
||||
options.log_statements(LevelFilter::Trace);
|
||||
let db = SqlitePool::connect_with(options).await.unwrap();
|
||||
SqlitePool::connect_with(options).await.unwrap()
|
||||
};
|
||||
|
||||
Self::from_db_pool(db_pool, disk_max_size, disk_path).await
|
||||
}
|
||||
|
||||
async fn from_db_pool(pool: SqlitePool, disk_max_size: Bytes, disk_path: PathBuf) -> Arc<Self> {
|
||||
let (db_tx, db_rx) = channel(128);
|
||||
// Run db init
|
||||
sqlx::query_file!("./db_queries/init.sql")
|
||||
.execute(&mut db.acquire().await.unwrap())
|
||||
.execute(&mut pool.acquire().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db
|
||||
};
|
||||
|
||||
// This is intentional.
|
||||
#[allow(clippy::cast_sign_loss)]
|
||||
let disk_cur_size = {
|
||||
let mut conn = db_pool.acquire().await.unwrap();
|
||||
let mut conn = pool.acquire().await.unwrap();
|
||||
sqlx::query!("SELECT IFNULL(SUM(size), 0) AS size FROM Images")
|
||||
.fetch_one(&mut conn)
|
||||
.await
|
||||
|
@ -80,7 +82,7 @@ impl DiskCache {
|
|||
tokio::spawn(db_listener(
|
||||
Arc::clone(&new_self),
|
||||
db_rx,
|
||||
db_pool,
|
||||
pool,
|
||||
disk_max_size.get() as u64 / 20 * 19,
|
||||
));
|
||||
|
||||
|
@ -239,12 +241,7 @@ async fn handle_db_put(
|
|||
// This is intentional.
|
||||
#[allow(clippy::cast_possible_wrap)]
|
||||
let casted_size = size as i64;
|
||||
let query = sqlx::query!(
|
||||
"insert into Images (id, size, accessed) values (?, ?, ?) on conflict do nothing",
|
||||
key,
|
||||
casted_size,
|
||||
now,
|
||||
)
|
||||
let query = sqlx::query_file!("./db_queries/insert_image.sql", key, casted_size, now)
|
||||
.execute(transaction)
|
||||
.await;
|
||||
|
||||
|
@ -369,6 +366,59 @@ impl CallbackCache for DiskCache {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod disk_cache {
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use chrono::Utc;
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
use crate::units::Bytes;
|
||||
|
||||
use super::DiskCache;
|
||||
|
||||
#[tokio::test]
|
||||
async fn db_is_initialized() -> Result<(), Box<dyn Error>> {
|
||||
let conn = SqlitePool::connect("sqlite::memory:").await?;
|
||||
let _cache = DiskCache::from_db_pool(conn.clone(), Bytes(1000), PathBuf::new()).await;
|
||||
let res = sqlx::query("select * from Images").execute(&conn).await;
|
||||
assert!(res.is_ok());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn db_initializes_empty() -> Result<(), Box<dyn Error>> {
|
||||
let conn = SqlitePool::connect("sqlite::memory:").await?;
|
||||
let cache = DiskCache::from_db_pool(conn.clone(), Bytes(1000), PathBuf::new()).await;
|
||||
assert_eq!(cache.disk_cur_size.load(Ordering::SeqCst), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn db_can_load_from_existing() -> Result<(), Box<dyn Error>> {
|
||||
let conn = SqlitePool::connect("sqlite::memory:").await?;
|
||||
sqlx::query_file!("./db_queries/init.sql")
|
||||
.execute(&conn)
|
||||
.await?;
|
||||
|
||||
let now = Utc::now();
|
||||
sqlx::query_file!("./db_queries/insert_image.sql", "a", 4, now)
|
||||
.execute(&conn)
|
||||
.await?;
|
||||
|
||||
let now = Utc::now();
|
||||
sqlx::query_file!("./db_queries/insert_image.sql", "b", 15, now)
|
||||
.execute(&conn)
|
||||
.await?;
|
||||
|
||||
let cache = DiskCache::from_db_pool(conn.clone(), Bytes(1000), PathBuf::new()).await;
|
||||
assert_eq!(cache.disk_cur_size.load(Ordering::SeqCst), 19);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod db {
|
||||
use chrono::{DateTime, Utc};
|
||||
|
|
|
@ -198,7 +198,7 @@ impl std::fmt::Debug for ClientSecret {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Copy, Clone, Debug)]
|
||||
#[derive(Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CacheType {
|
||||
OnDisk,
|
||||
|
@ -348,7 +348,69 @@ mod sample_yaml {
|
|||
use crate::config::YamlArgs;
|
||||
|
||||
#[test]
|
||||
fn sample_yaml_parses() {
|
||||
fn parses() {
|
||||
assert!(serde_yaml::from_str::<YamlArgs>(include_str!("../settings.sample.yaml")).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod config {
|
||||
use std::path::PathBuf;
|
||||
|
||||
use log::LevelFilter;
|
||||
use tracing::level_filters::LevelFilter as TracingLevelFilter;
|
||||
|
||||
use crate::config::{CacheType, ClientSecret, Config, YamlExtendedOptions, YamlServerSettings};
|
||||
use crate::units::{KilobitsPerSecond, Mebibytes, Port};
|
||||
|
||||
use super::{CliArgs, YamlArgs};
|
||||
|
||||
#[test]
|
||||
fn cli_has_priority() {
|
||||
let cli_config = CliArgs {
|
||||
port: Port::new(1234),
|
||||
memory_quota: Some(Mebibytes::new(10)),
|
||||
disk_quota: Some(Mebibytes::new(10)),
|
||||
cache_path: Some(PathBuf::from("a")),
|
||||
network_speed: KilobitsPerSecond::new(10),
|
||||
verbose: 1,
|
||||
quiet: 0,
|
||||
unstable_options: vec![],
|
||||
override_upstream: None,
|
||||
ephemeral_disk_encryption: true,
|
||||
config_path: None,
|
||||
cache_type: Some(CacheType::Lfu),
|
||||
};
|
||||
|
||||
let yaml_args = YamlArgs {
|
||||
max_cache_size_in_mebibytes: Mebibytes::new(50),
|
||||
server_settings: YamlServerSettings {
|
||||
secret: ClientSecret(String::new()),
|
||||
port: Port::new(4321).expect("to work?"),
|
||||
external_max_kilobits_per_second: KilobitsPerSecond::new(50).expect("to work?"),
|
||||
external_port: None,
|
||||
graceful_shutdown_wait_seconds: None,
|
||||
hostname: None,
|
||||
external_ip: None,
|
||||
},
|
||||
extended_options: Some(YamlExtendedOptions {
|
||||
memory_quota: Some(Mebibytes::new(50)),
|
||||
cache_type: Some(CacheType::Lru),
|
||||
ephemeral_disk_encryption: Some(false),
|
||||
enable_metrics: None,
|
||||
logging_level: Some(LevelFilter::Error),
|
||||
cache_path: Some(PathBuf::from("b")),
|
||||
}),
|
||||
};
|
||||
|
||||
let config = Config::from_cli_and_file(cli_config, yaml_args);
|
||||
assert_eq!(Some(config.port), Port::new(1234));
|
||||
assert_eq!(config.memory_quota, Mebibytes::new(10));
|
||||
assert_eq!(config.disk_quota, Mebibytes::new(10));
|
||||
assert_eq!(config.cache_path, PathBuf::from("a"));
|
||||
assert_eq!(Some(config.network_speed), KilobitsPerSecond::new(10));
|
||||
assert_eq!(config.log_level, TracingLevelFilter::DEBUG);
|
||||
assert_eq!(config.ephemeral_disk_encryption, true);
|
||||
assert_eq!(config.cache_type, CacheType::Lfu);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ mod metrics;
|
|||
mod ping;
|
||||
mod routes;
|
||||
mod state;
|
||||
#[cfg(not(tarpaulin_include))]
|
||||
mod stop;
|
||||
mod units;
|
||||
|
||||
|
|
23
src/units.rs
23
src/units.rs
|
@ -5,13 +5,18 @@ use std::str::FromStr;
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Wrapper type for a port number.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct Port(NonZeroU16);
|
||||
|
||||
impl Port {
|
||||
pub const fn get(self) -> u16 {
|
||||
self.0.get()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new(amt: u16) -> Option<Self> {
|
||||
NonZeroU16::new(amt).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Port {
|
||||
|
@ -37,6 +42,13 @@ impl Display for Port {
|
|||
#[derive(Copy, Clone, Serialize, Deserialize, Default, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct Mebibytes(usize);
|
||||
|
||||
impl Mebibytes {
|
||||
#[cfg(test)]
|
||||
pub fn new(size: usize) -> Self {
|
||||
Mebibytes(size)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Mebibytes {
|
||||
type Err = ParseIntError;
|
||||
|
||||
|
@ -45,7 +57,7 @@ impl FromStr for Mebibytes {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct Bytes(usize);
|
||||
pub struct Bytes(pub usize);
|
||||
|
||||
impl Bytes {
|
||||
pub const fn get(&self) -> usize {
|
||||
|
@ -62,6 +74,13 @@ impl From<Mebibytes> for Bytes {
|
|||
#[derive(Copy, Clone, Deserialize, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct KilobitsPerSecond(NonZeroU64);
|
||||
|
||||
impl KilobitsPerSecond {
|
||||
#[cfg(test)]
|
||||
pub fn new(size: u64) -> Option<Self> {
|
||||
NonZeroU64::new(size).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for KilobitsPerSecond {
|
||||
type Err = ParseIntError;
|
||||
|
||||
|
|
Loading…
Reference in a new issue