Compare commits

..

No commits in common. "a8e5d09ff0fa5a5b382b6d6b988318bf8968c460" and "5b674317784a490ab767dfb22a3fad08cff9a402" have entirely different histories.

10 changed files with 173 additions and 192 deletions

1
Cargo.lock generated
View file

@ -1154,7 +1154,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"serde",
] ]
[[package]] [[package]]

View file

@ -26,7 +26,7 @@ ctrlc = "3"
dotenv = "0.15" dotenv = "0.15"
futures = "0.3" futures = "0.3"
once_cell = "1" once_cell = "1"
log = { version = "0.4", features = [ "serde" ] } log = "0.4"
lfu_cache = "1" lfu_cache = "1"
lru = "0.6" lru = "0.6"
parking_lot = "0.11" parking_lot = "0.11"

9
src/cache/disk.rs vendored
View file

@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt; use futures::StreamExt;
use log::{error, warn, LevelFilter}; use log::{error, warn, LevelFilter};
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
@ -14,8 +15,6 @@ use tokio::fs::remove_file;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use crate::units::Bytes;
use super::{ use super::{
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata, BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
}; };
@ -35,7 +34,7 @@ impl DiskCache {
/// Constructs a new low memory cache at the provided path and capacity. /// Constructs a new low memory cache at the provided path and capacity.
/// This internally spawns a task that will wait for filesystem /// This internally spawns a task that will wait for filesystem
/// notifications when a file has been written. /// notifications when a file has been written.
pub async fn new(disk_max_size: Bytes, disk_path: PathBuf) -> Arc<Self> { pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Self> {
let (db_tx, db_rx) = channel(128); let (db_tx, db_rx) = channel(128);
let db_pool = { let db_pool = {
let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy()); let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy());
@ -76,7 +75,7 @@ impl DiskCache {
Arc::clone(&new_self), Arc::clone(&new_self),
db_rx, db_rx,
db_pool, db_pool,
disk_max_size.get() as u64 / 20 * 19, disk_max_size / 20 * 19,
)); ));
new_self new_self
@ -238,7 +237,7 @@ impl CallbackCache for DiskCache {
key: CacheKey, key: CacheKey,
image: BoxedImageStream, image: BoxedImageStream,
metadata: ImageMetadata, metadata: ImageMetadata,
on_complete: Sender<(CacheKey, bytes::Bytes, ImageMetadata, u64)>, on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
) -> Result<CacheStream, CacheError> { ) -> Result<CacheStream, CacheError> {
let channel = self.db_update_channel_sender.clone(); let channel = self.db_update_channel_sender.clone();

6
src/cache/mem.rs vendored
View file

@ -86,7 +86,7 @@ where
MemoryCacheImpl: 'static + InternalMemoryCache, MemoryCacheImpl: 'static + InternalMemoryCache,
ColdCache: 'static + Cache, ColdCache: 'static + Cache,
{ {
pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> { pub async fn new(inner: ColdCache, max_mem_size: u64) -> Arc<Self> {
let (tx, mut rx) = channel(100); let (tx, mut rx) = channel(100);
let new_self = Arc::new(Self { let new_self = Arc::new(Self {
inner, inner,
@ -98,7 +98,7 @@ where
let new_self_0 = Arc::clone(&new_self); let new_self_0 = Arc::clone(&new_self);
tokio::spawn(async move { tokio::spawn(async move {
let new_self = new_self_0; let new_self = new_self_0;
let max_mem_size = max_mem_size.get() / 20 * 19; let max_mem_size = max_mem_size / 20 * 19;
while let Some((key, bytes, metadata, size)) = rx.recv().await { while let Some((key, bytes, metadata, size)) = rx.recv().await {
// Add to memory cache // Add to memory cache
// We can add first because we constrain our memory usage to 95% // We can add first because we constrain our memory usage to 95%
@ -112,7 +112,7 @@ where
.push(key, (bytes, metadata, size)); .push(key, (bytes, metadata, size));
// Pop if too large // Pop if too large
while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 { while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size {
let popped = new_self let popped = new_self
.mem_cache .mem_cache
.lock() .lock()

View file

@ -1,103 +1,49 @@
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::hint::unreachable_unchecked;
use std::io::{ErrorKind, Write}; use std::io::{ErrorKind, Write};
use std::net::{IpAddr, SocketAddr}; use std::net::IpAddr;
use std::num::{NonZeroU16, NonZeroU64}; use std::num::{NonZeroU16, NonZeroU64};
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use clap::{crate_authors, crate_description, crate_version, Clap}; use clap::{crate_authors, crate_description, crate_version, Clap};
use log::LevelFilter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
use crate::units::{KilobitsPerSecond, Mebibytes, Port}; use crate::units::{Kilobits, Mebibytes, Port};
// Validate tokens is an atomic because it's faster than locking on rwlock. // Validate tokens is an atomic because it's faster than locking on rwlock.
pub static VALIDATE_TOKENS: AtomicBool = AtomicBool::new(false); pub static VALIDATE_TOKENS: AtomicBool = AtomicBool::new(false);
// We use an atomic here because it's better for us to not pass the config
// everywhere.
pub static SEND_SERVER_VERSION: AtomicBool = AtomicBool::new(false);
pub static OFFLINE_MODE: AtomicBool = AtomicBool::new(false); pub static OFFLINE_MODE: AtomicBool = AtomicBool::new(false);
pub fn load_config() -> Result<Config, serde_yaml::Error> { pub fn load_config() -> Config {
// Load cli args first const CONFIG_PATH: &str = "./settings.yaml";
let cli_args: CliArgs = CliArgs::parse(); let config_file: Result<YamlArgs, _> = match File::open(CONFIG_PATH) {
Ok(file) => serde_yaml::from_reader(file),
Err(e) if e.kind() == ErrorKind::NotFound => {
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(CONFIG_PATH)
.unwrap();
// Load yaml file next file.write_all(include_bytes!("../settings.sample.yaml"))
let config_file: Result<YamlArgs, _> = { .unwrap();
let config_path = cli_args
.config_path
.as_deref()
.unwrap_or_else(|| Path::new("./settings.yaml"));
match File::open(config_path) {
Ok(file) => serde_yaml::from_reader(file),
Err(e) if e.kind() == ErrorKind::NotFound => {
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(config_path)
.unwrap();
let default_config = include_str!("../settings.sample.yaml"); return load_config();
file.write_all(default_config.as_bytes()).unwrap();
serde_yaml::from_str(default_config)
}
e => panic!(
"Failed to open file at {}: {:?}",
config_path.to_string_lossy(),
e
),
} }
_ => panic!(),
}; };
// generate config todo!()
let config = Config::from_cli_and_file(cli_args, config_file?);
// initialize globals
OFFLINE_MODE.store(
config
.unstable_options
.contains(&UnstableOptions::OfflineMode),
Ordering::Release,
);
Ok(config)
} }
/// Represents a fully parsed config file. pub struct Config {}
pub struct Config {
pub cache_type: CacheType,
pub cache_path: PathBuf,
pub shutdown_timeout: NonZeroU16,
pub log_level: LevelFilter,
pub client_secret: ClientSecret,
pub port: Port,
pub bind_address: SocketAddr,
pub external_address: Option<SocketAddr>,
pub ephemeral_disk_encryption: bool,
pub unstable_options: Vec<UnstableOptions>,
pub network_speed: KilobitsPerSecond,
pub disk_quota: Mebibytes,
pub memory_quota: Mebibytes,
pub override_upstream: Option<Url>,
}
impl Config {
fn from_cli_and_file(cli_args: CliArgs, file_args: YamlArgs) -> Self {
let log_level = match (cli_args.quiet, cli_args.verbose) {
(n, _) if n > 2 => LevelFilter::Off,
(2, _) => LevelFilter::Error,
(1, _) => LevelFilter::Warn,
// Use log level from file if no flags were provided to CLI
(0, 0) => file_args.extended_options.logging_level,
(_, 1) => LevelFilter::Debug,
(_, n) if n > 1 => LevelFilter::Trace,
// compiler can't figure it out
_ => unsafe { unreachable_unchecked() },
};
todo!()
}
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct YamlArgs { struct YamlArgs {
@ -113,7 +59,7 @@ struct YamlArgs {
struct YamlServerSettings { struct YamlServerSettings {
secret: ClientSecret, secret: ClientSecret,
port: Port, port: Port,
external_max_kilobits_per_second: KilobitsPerSecond, external_max_kilobits_per_second: Kilobits,
external_port: Option<Port>, external_port: Option<Port>,
graceful_shutdown_wait_seconds: Option<NonZeroU16>, graceful_shutdown_wait_seconds: Option<NonZeroU16>,
hostname: Option<IpAddr>, hostname: Option<IpAddr>,
@ -122,47 +68,29 @@ struct YamlServerSettings {
// this intentionally does not implement display or debug // this intentionally does not implement display or debug
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
pub struct ClientSecret(String); struct ClientSecret(String);
#[derive(Deserialize)] #[derive(Deserialize)]
struct YamlExtendedOptions { struct YamlExtendedOptions {
memory_quota: Option<NonZeroU64>, memory_quota: Option<NonZeroU64>,
#[serde(default)] #[serde(default)]
cache_type: CacheType, send_server_string: bool,
#[serde(default)]
cache_type: YamlCacheType,
#[serde(default)] #[serde(default)]
ephemeral_disk_encryption: bool, ephemeral_disk_encryption: bool,
#[serde(default)] #[serde(default)]
enable_metrics: bool, enable_metrics: bool,
#[serde(default = "default_logging_level")]
logging_level: LevelFilter,
} }
const fn default_logging_level() -> LevelFilter { #[derive(Deserialize)]
LevelFilter::Info enum YamlCacheType {
}
#[derive(Deserialize, Copy, Clone)]
#[serde(rename_all = "snake_case")]
pub enum CacheType {
OnDisk, OnDisk,
Lru, Lru,
Lfu, Lfu,
} }
impl FromStr for CacheType { impl Default for YamlCacheType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"on_disk" => Ok(Self::OnDisk),
"lru" => Ok(Self::Lru),
"lfu" => Ok(Self::Lfu),
_ => Err(format!("Unknown option: {}", s)),
}
}
}
impl Default for CacheType {
fn default() -> Self { fn default() -> Self {
Self::OnDisk Self::OnDisk
} }
@ -170,13 +98,13 @@ impl Default for CacheType {
#[derive(Clap, Clone)] #[derive(Clap, Clone)]
#[clap(version = crate_version!(), author = crate_authors!(), about = crate_description!())] #[clap(version = crate_version!(), author = crate_authors!(), about = crate_description!())]
struct CliArgs { pub struct CliArgs {
/// The port to listen on. /// The port to listen on.
#[clap(short, long, default_value = "42069")] #[clap(short, long, default_value = "42069")]
pub port: Port, pub port: Port,
/// How large, in bytes, the in-memory cache should be. Note that this does /// How large, in bytes, the in-memory cache should be. Note that this does
/// not include runtime memory usage. /// not include runtime memory usage.
#[clap(long)] #[clap(long, conflicts_with = "low-memory")]
pub memory_quota: Option<NonZeroU64>, pub memory_quota: Option<NonZeroU64>,
/// How large, in bytes, the on-disk cache should be. Note that actual /// How large, in bytes, the on-disk cache should be. Note that actual
/// values may be larger for metadata information. /// values may be larger for metadata information.
@ -188,6 +116,16 @@ struct CliArgs {
/// The network speed to advertise to Mangadex@Home control server. /// The network speed to advertise to Mangadex@Home control server.
#[clap(long)] #[clap(long)]
pub network_speed: NonZeroU64, pub network_speed: NonZeroU64,
/// Whether or not to provide the Server HTTP header to clients. This is
/// useful for debugging, but is generally not recommended for security
/// reasons.
#[clap(long, takes_value = false)]
pub send_server_string: bool,
/// Changes the caching behavior to avoid buffering images in memory, and
/// instead use the filesystem as the buffer backing. This is useful for
/// clients in low (< 1GB) RAM environments.
#[clap(short, long, conflicts_with("memory-quota"), takes_value = false)]
pub low_memory: bool,
/// Changes verbosity. Default verbosity is INFO, while increasing counts of /// Changes verbosity. Default verbosity is INFO, while increasing counts of
/// verbose flags increases the verbosity to DEBUG and TRACE, respectively. /// verbose flags increases the verbosity to DEBUG and TRACE, respectively.
#[clap(short, long, parse(from_occurrences))] #[clap(short, long, parse(from_occurrences))]
@ -208,8 +146,6 @@ struct CliArgs {
pub ephemeral_disk_encryption: bool, pub ephemeral_disk_encryption: bool,
#[clap(short, long)] #[clap(short, long)]
pub config_path: Option<PathBuf>, pub config_path: Option<PathBuf>,
#[clap(default_value = "on_disk")]
pub cache_type: CacheType,
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -218,6 +154,10 @@ pub enum UnstableOptions {
/// you know what you're dealing with. /// you know what you're dealing with.
OverrideUpstream, OverrideUpstream,
/// Use an LFU implementation for the in-memory cache instead of the default
/// LRU implementation.
UseLfu,
/// Disables token validation. Don't use this unless you know the /// Disables token validation. Don't use this unless you know the
/// ramifications of this command. /// ramifications of this command.
DisableTokenValidation, DisableTokenValidation,
@ -235,6 +175,7 @@ impl FromStr for UnstableOptions {
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
match s { match s {
"override-upstream" => Ok(Self::OverrideUpstream), "override-upstream" => Ok(Self::OverrideUpstream),
"use-lfu" => Ok(Self::UseLfu),
"disable-token-validation" => Ok(Self::DisableTokenValidation), "disable-token-validation" => Ok(Self::DisableTokenValidation),
"offline-mode" => Ok(Self::OfflineMode), "offline-mode" => Ok(Self::OfflineMode),
"disable-tls" => Ok(Self::DisableTls), "disable-tls" => Ok(Self::DisableTls),
@ -247,6 +188,7 @@ impl Display for UnstableOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self { match self {
Self::OverrideUpstream => write!(f, "override-upstream"), Self::OverrideUpstream => write!(f, "override-upstream"),
Self::UseLfu => write!(f, "use-lfu"),
Self::DisableTokenValidation => write!(f, "disable-token-validation"), Self::DisableTokenValidation => write!(f, "disable-token-validation"),
Self::OfflineMode => write!(f, "offline-mode"), Self::OfflineMode => write!(f, "offline-mode"),
Self::DisableTls => write!(f, "disable-tls"), Self::DisableTls => write!(f, "disable-tls"),

View file

@ -5,7 +5,8 @@
use std::env::{self, VarError}; use std::env::{self, VarError};
use std::error::Error; use std::error::Error;
use std::fmt::Display; use std::fmt::Display;
use std::num::ParseIntError; use std::hint::unreachable_unchecked;
use std::num::{NonZeroU64, ParseIntError};
use std::process; use std::process;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -15,8 +16,9 @@ use actix_web::rt::{spawn, time, System};
use actix_web::web::{self, Data}; use actix_web::web::{self, Data};
use actix_web::{App, HttpResponse, HttpServer}; use actix_web::{App, HttpResponse, HttpServer};
use cache::{Cache, DiskCache}; use cache::{Cache, DiskCache};
use config::Config; use clap::Clap;
use log::{debug, error, info, warn}; use config::CliArgs;
use log::{debug, error, info, warn, LevelFilter};
use parking_lot::RwLock; use parking_lot::RwLock;
use rustls::{NoClientAuth, ServerConfig}; use rustls::{NoClientAuth, ServerConfig};
use simple_logger::SimpleLogger; use simple_logger::SimpleLogger;
@ -27,7 +29,7 @@ use thiserror::Error;
use crate::cache::mem::{Lfu, Lru}; use crate::cache::mem::{Lfu, Lru};
use crate::cache::{MemoryCache, ENCRYPTION_KEY}; use crate::cache::{MemoryCache, ENCRYPTION_KEY};
use crate::config::{CacheType, UnstableOptions, OFFLINE_MODE}; use crate::config::{UnstableOptions, OFFLINE_MODE};
use crate::state::DynamicServerCert; use crate::state::DynamicServerCert;
mod cache; mod cache;
@ -39,7 +41,12 @@ mod state;
mod stop; mod stop;
mod units; mod units;
const CLIENT_API_VERSION: usize = 31; #[macro_export]
macro_rules! client_api_version {
() => {
"31"
};
}
#[derive(Error, Debug)] #[derive(Error, Debug)]
enum ServerError { enum ServerError {
@ -59,23 +66,46 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Config loading // Config loading
// //
let config = config::load_config()?; let config = config::load_config();
let memory_quota = config.memory_quota;
let disk_quota = config.disk_quota; let cli_args = CliArgs::parse();
let cache_type = config.cache_type; let port = cli_args.port;
let cache_path = config.cache_path.clone(); let memory_max_size = cli_args
let disable_tls = config .memory_quota
.map(NonZeroU64::get)
.unwrap_or_default();
let disk_quota = cli_args.disk_quota;
let cache_path = cli_args.cache_path.clone();
let low_mem_mode = cli_args.low_memory;
let use_lfu = cli_args.unstable_options.contains(&UnstableOptions::UseLfu);
let disable_tls = cli_args
.unstable_options .unstable_options
.contains(&UnstableOptions::DisableTls); .contains(&UnstableOptions::DisableTls);
let bind_address = config.bind_address; OFFLINE_MODE.store(
cli_args
.unstable_options
.contains(&UnstableOptions::OfflineMode),
Ordering::Release,
);
// //
// Logging and warnings // Logging and warnings
// //
SimpleLogger::new().with_level(config.log_level).init()?; let log_level = match (cli_args.quiet, cli_args.verbose) {
(n, _) if n > 2 => LevelFilter::Off,
(2, _) => LevelFilter::Error,
(1, _) => LevelFilter::Warn,
(0, 0) => LevelFilter::Info,
(_, 1) => LevelFilter::Debug,
(_, n) if n > 1 => LevelFilter::Trace,
// compiler can't figure it out
_ => unsafe { unreachable_unchecked() },
};
if let Err(e) = print_preamble_and_warnings(&config) { SimpleLogger::new().with_level(log_level).init()?;
if let Err(e) = print_preamble_and_warnings(&cli_args) {
error!("{}", e); error!("{}", e);
return Err(e); return Err(e);
} }
@ -88,7 +118,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}; };
let client_secret_1 = client_secret.clone(); let client_secret_1 = client_secret.clone();
if config.ephemeral_disk_encryption { if cli_args.ephemeral_disk_encryption {
info!("Running with at-rest encryption!"); info!("Running with at-rest encryption!");
ENCRYPTION_KEY.set(gen_key()).unwrap(); ENCRYPTION_KEY.set(gen_key()).unwrap();
} }
@ -100,11 +130,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
let server = if OFFLINE_MODE.load(Ordering::Acquire) { let server = if OFFLINE_MODE.load(Ordering::Acquire) {
ServerState::init_offline() ServerState::init_offline()
} else { } else {
ServerState::init(&client_secret, &config).await? ServerState::init(&client_secret, &cli_args).await?
}; };
let data_0 = Arc::new(RwLockServerState(RwLock::new(server))); let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
let data_1 = Arc::clone(&data_0); let data_1 = Arc::clone(&data_0);
// Rustls only supports TLS 1.2 and 1.3.
let tls_config = {
let mut tls_config = ServerConfig::new(NoClientAuth::new());
tls_config.cert_resolver = Arc::new(DynamicServerCert);
tls_config
};
// //
// At this point, the server is ready to start, and starts the necessary // At this point, the server is ready to start, and starts the necessary
// threads. // threads.
@ -140,17 +177,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop { loop {
interval.tick().await; interval.tick().await;
debug!("Sending ping!"); debug!("Sending ping!");
ping::update_server_state(&client_secret_1, &config, &mut data).await; ping::update_server_state(&client_secret_1, &cli_args, &mut data).await;
} }
}); });
} }
let memory_max_size = memory_quota.into(); let cache = DiskCache::new(disk_quota, cache_path.clone()).await;
let cache = DiskCache::new(disk_quota.into(), cache_path.clone()).await; let cache: Arc<dyn Cache> = if low_mem_mode {
let cache: Arc<dyn Cache> = match cache_type { cache
CacheType::OnDisk => cache, } else if use_lfu {
CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size).await, MemoryCache::<Lfu, _>::new(cache, memory_max_size).await
CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size).await, } else {
MemoryCache::<Lru, _>::new(cache, memory_max_size).await
}; };
let cache_0 = Arc::clone(&cache); let cache_0 = Arc::clone(&cache);
@ -176,17 +214,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
}) })
.shutdown_timeout(60); .shutdown_timeout(60);
let server_bind_address = format!("0.0.0.0:{}", port);
if disable_tls { if disable_tls {
server.bind(bind_address)?.run().await?; server.bind(server_bind_address)?.run().await?;
} else { } else {
// Rustls only supports TLS 1.2 and 1.3. server
let tls_config = { .bind_rustls(server_bind_address, tls_config)?
let mut tls_config = ServerConfig::new(NoClientAuth::new()); .run()
tls_config.cert_resolver = Arc::new(DynamicServerCert); .await?;
tls_config
};
server.bind_rustls(bind_address, tls_config)?.run().await?;
} }
// Waiting for us to finish sending stop message // Waiting for us to finish sending stop message
@ -218,7 +253,7 @@ impl Display for InvalidCombination {
impl Error for InvalidCombination {} impl Error for InvalidCombination {}
fn print_preamble_and_warnings(args: &Config) -> Result<(), Box<dyn Error>> { fn print_preamble_and_warnings(args: &CliArgs) -> Result<(), Box<dyn Error>> {
println!(concat!( println!(concat!(
env!("CARGO_PKG_NAME"), env!("CARGO_PKG_NAME"),
" ", " ",

View file

@ -1,3 +1,4 @@
use std::num::NonZeroU64;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::{io::BufReader, sync::Arc}; use std::{io::BufReader, sync::Arc};
@ -11,13 +12,13 @@ use serde_repr::Deserialize_repr;
use sodiumoxide::crypto::box_::PrecomputedKey; use sodiumoxide::crypto::box_::PrecomputedKey;
use url::Url; use url::Url;
use crate::config::{Config, UnstableOptions, VALIDATE_TOKENS}; use crate::config::{CliArgs, VALIDATE_TOKENS};
use crate::state::{ use crate::state::{
RwLockServerState, PREVIOUSLY_COMPROMISED, PREVIOUSLY_PAUSED, TLS_CERTS, RwLockServerState, PREVIOUSLY_COMPROMISED, PREVIOUSLY_PAUSED, TLS_CERTS,
TLS_PREVIOUSLY_CREATED, TLS_SIGNING_KEY, TLS_PREVIOUSLY_CREATED, TLS_SIGNING_KEY,
}; };
use crate::units::{BytesPerSecond, Mebibytes, Port}; use crate::units::Port;
use crate::CLIENT_API_VERSION; use crate::{client_api_version, config::UnstableOptions};
pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping"; pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
@ -25,20 +26,22 @@ pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
pub struct Request<'a> { pub struct Request<'a> {
secret: &'a str, secret: &'a str,
port: Port, port: Port,
disk_space: Mebibytes, disk_space: u64,
network_speed: BytesPerSecond, network_speed: NonZeroU64,
build_version: usize, build_version: u64,
tls_created_at: Option<String>, tls_created_at: Option<String>,
} }
impl<'a> Request<'a> { impl<'a> Request<'a> {
fn from_config_and_state(secret: &'a str, config: &Config) -> Self { fn from_config_and_state(secret: &'a str, config: &CliArgs) -> Self {
Self { Self {
secret, secret,
port: config.port, port: config.port,
disk_space: config.disk_quota, disk_space: config.disk_quota,
network_speed: config.network_speed.into(), network_speed: config.network_speed,
build_version: CLIENT_API_VERSION, build_version: client_api_version!()
.parse()
.expect("to parse the build version"),
tls_created_at: TLS_PREVIOUSLY_CREATED tls_created_at: TLS_PREVIOUSLY_CREATED
.get() .get()
.map(|v| v.load().as_ref().clone()), .map(|v| v.load().as_ref().clone()),
@ -46,14 +49,17 @@ impl<'a> Request<'a> {
} }
} }
impl<'a> From<(&'a str, &Config)> for Request<'a> { #[allow(clippy::fallible_impl_from)]
fn from((secret, config): (&'a str, &Config)) -> Self { impl<'a> From<(&'a str, &CliArgs)> for Request<'a> {
fn from((secret, config): (&'a str, &CliArgs)) -> Self {
Self { Self {
secret, secret,
port: config.port, port: config.port,
disk_space: config.disk_quota, disk_space: config.disk_quota,
network_speed: config.network_speed.into(), network_speed: config.network_speed,
build_version: CLIENT_API_VERSION, build_version: client_api_version!()
.parse()
.expect("to parse the build version"),
tls_created_at: None, tls_created_at: None,
} }
} }
@ -161,7 +167,7 @@ impl std::fmt::Debug for Tls {
} }
} }
pub async fn update_server_state(secret: &str, cli: &Config, data: &mut Arc<RwLockServerState>) { pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwLockServerState>) {
let req = Request::from_config_and_state(secret, cli); let req = Request::from_config_and_state(secret, cli);
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let resp = client.post(CONTROL_CENTER_PING_URL).json(&req).send().await; let resp = client.post(CONTROL_CENTER_PING_URL).json(&req).send().await;

View file

@ -22,14 +22,15 @@ use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBY
use thiserror::Error; use thiserror::Error;
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError}; use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
use crate::config::{OFFLINE_MODE, VALIDATE_TOKENS}; use crate::client_api_version;
use crate::config::{OFFLINE_MODE, SEND_SERVER_VERSION, VALIDATE_TOKENS};
use crate::metrics::{ use crate::metrics::{
CACHE_HIT_COUNTER, CACHE_MISS_COUNTER, REQUESTS_DATA_COUNTER, REQUESTS_DATA_SAVER_COUNTER, CACHE_HIT_COUNTER, CACHE_MISS_COUNTER, REQUESTS_DATA_COUNTER, REQUESTS_DATA_SAVER_COUNTER,
REQUESTS_OTHER_COUNTER, REQUESTS_TOTAL_COUNTER, REQUESTS_OTHER_COUNTER, REQUESTS_TOTAL_COUNTER,
}; };
use crate::state::RwLockServerState; use crate::state::RwLockServerState;
const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false); pub const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false);
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| { static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder() Client::builder()
@ -40,6 +41,15 @@ static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
.expect("Client initialization to work") .expect("Client initialization to work")
}); });
const SERVER_ID_STRING: &str = concat!(
env!("CARGO_CRATE_NAME"),
" ",
env!("CARGO_PKG_VERSION"),
" (",
client_api_version!(),
") - Conforming to spec revision b82043289",
);
enum ServerResponse { enum ServerResponse {
TokenValidationError(TokenValidationError), TokenValidationError(TokenValidationError),
HttpResponse(HttpResponse), HttpResponse(HttpResponse),
@ -216,6 +226,10 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
.insert_header((CACHE_CONTROL, "public, max-age=1209600")) .insert_header((CACHE_CONTROL, "public, max-age=1209600"))
.insert_header(("Timing-Allow-Origin", "https://mangadex.org")); .insert_header(("Timing-Allow-Origin", "https://mangadex.org"));
if SEND_SERVER_VERSION.load(Ordering::Acquire) {
builder.insert_header(("Server", SERVER_ID_STRING));
}
builder builder
} }

View file

@ -1,7 +1,7 @@
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use crate::config::{Config, UnstableOptions, OFFLINE_MODE, VALIDATE_TOKENS}; use crate::config::{CliArgs, UnstableOptions, OFFLINE_MODE, SEND_SERVER_VERSION, VALIDATE_TOKENS};
use crate::ping::{Request, Response, CONTROL_CENTER_PING_URL}; use crate::ping::{Request, Response, CONTROL_CENTER_PING_URL};
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use log::{error, info, warn}; use log::{error, info, warn};
@ -45,13 +45,18 @@ pub enum ServerInitError {
} }
impl ServerState { impl ServerState {
pub async fn init(secret: &str, config: &Config) -> Result<Self, ServerInitError> { pub async fn init(secret: &str, config: &CliArgs) -> Result<Self, ServerInitError> {
let resp = reqwest::Client::new() let resp = reqwest::Client::new()
.post(CONTROL_CENTER_PING_URL) .post(CONTROL_CENTER_PING_URL)
.json(&Request::from((secret, config))) .json(&Request::from((secret, config)))
.send() .send()
.await; .await;
if config.send_server_string {
warn!("Client will send Server header in responses. This is not recommended!");
SEND_SERVER_VERSION.store(true, Ordering::Release);
}
match resp { match resp {
Ok(resp) => match resp.json::<Response>().await { Ok(resp) => match resp.json::<Response>().await {
Ok(Response::Ok(mut resp)) => { Ok(Response::Ok(mut resp)) => {

View file

@ -22,37 +22,18 @@ impl Display for Port {
} }
} }
#[derive(Copy, Clone, Serialize, Deserialize, Default, Debug, Hash, Eq, PartialEq)] #[derive(Copy, Clone, Deserialize, Default, Debug, Hash, Eq, PartialEq)]
pub struct Mebibytes(usize); pub struct Mebibytes(usize);
impl Mebibytes { impl Mebibytes {
pub const fn get(self) -> usize { pub const fn as_bytes(&self) -> usize {
self.0 self.0 << 20
} }
}
pub struct Bytes(usize);
impl Bytes {
pub const fn get(&self) -> usize { pub const fn get(&self) -> usize {
self.0 self.0
} }
} }
impl From<Mebibytes> for Bytes {
fn from(mib: Mebibytes) -> Self {
Self(mib.0 << 20)
}
}
#[derive(Copy, Clone, Deserialize, Debug, Hash, Eq, PartialEq)] #[derive(Copy, Clone, Deserialize, Debug, Hash, Eq, PartialEq)]
pub struct KilobitsPerSecond(NonZeroU64); pub struct Kilobits(NonZeroU64);
#[derive(Copy, Clone, Serialize, Debug, Hash, Eq, PartialEq)]
pub struct BytesPerSecond(NonZeroU64);
impl From<KilobitsPerSecond> for BytesPerSecond {
fn from(kbps: KilobitsPerSecond) -> Self {
Self(unsafe { NonZeroU64::new_unchecked(kbps.0.get() * 125) })
}
}