Compare commits
2 commits
5b67431778
...
a8e5d09ff0
Author | SHA1 | Date | |
---|---|---|---|
a8e5d09ff0 | |||
b4f27c5f8c |
10 changed files with 192 additions and 173 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1154,6 +1154,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -26,7 +26,7 @@ ctrlc = "3"
|
|||
dotenv = "0.15"
|
||||
futures = "0.3"
|
||||
once_cell = "1"
|
||||
log = "0.4"
|
||||
log = { version = "0.4", features = [ "serde" ] }
|
||||
lfu_cache = "1"
|
||||
lru = "0.6"
|
||||
parking_lot = "0.11"
|
||||
|
|
9
src/cache/disk.rs
vendored
9
src/cache/disk.rs
vendored
|
@ -6,7 +6,6 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
|||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::StreamExt;
|
||||
use log::{error, warn, LevelFilter};
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
|
@ -15,6 +14,8 @@ use tokio::fs::remove_file;
|
|||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::units::Bytes;
|
||||
|
||||
use super::{
|
||||
BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, CallbackCache, ImageMetadata,
|
||||
};
|
||||
|
@ -34,7 +35,7 @@ impl DiskCache {
|
|||
/// Constructs a new low memory cache at the provided path and capacity.
|
||||
/// This internally spawns a task that will wait for filesystem
|
||||
/// notifications when a file has been written.
|
||||
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Self> {
|
||||
pub async fn new(disk_max_size: Bytes, disk_path: PathBuf) -> Arc<Self> {
|
||||
let (db_tx, db_rx) = channel(128);
|
||||
let db_pool = {
|
||||
let db_url = format!("sqlite:{}/metadata.sqlite", disk_path.to_string_lossy());
|
||||
|
@ -75,7 +76,7 @@ impl DiskCache {
|
|||
Arc::clone(&new_self),
|
||||
db_rx,
|
||||
db_pool,
|
||||
disk_max_size / 20 * 19,
|
||||
disk_max_size.get() as u64 / 20 * 19,
|
||||
));
|
||||
|
||||
new_self
|
||||
|
@ -237,7 +238,7 @@ impl CallbackCache for DiskCache {
|
|||
key: CacheKey,
|
||||
image: BoxedImageStream,
|
||||
metadata: ImageMetadata,
|
||||
on_complete: Sender<(CacheKey, Bytes, ImageMetadata, u64)>,
|
||||
on_complete: Sender<(CacheKey, bytes::Bytes, ImageMetadata, u64)>,
|
||||
) -> Result<CacheStream, CacheError> {
|
||||
let channel = self.db_update_channel_sender.clone();
|
||||
|
||||
|
|
6
src/cache/mem.rs
vendored
6
src/cache/mem.rs
vendored
|
@ -86,7 +86,7 @@ where
|
|||
MemoryCacheImpl: 'static + InternalMemoryCache,
|
||||
ColdCache: 'static + Cache,
|
||||
{
|
||||
pub async fn new(inner: ColdCache, max_mem_size: u64) -> Arc<Self> {
|
||||
pub async fn new(inner: ColdCache, max_mem_size: crate::units::Bytes) -> Arc<Self> {
|
||||
let (tx, mut rx) = channel(100);
|
||||
let new_self = Arc::new(Self {
|
||||
inner,
|
||||
|
@ -98,7 +98,7 @@ where
|
|||
let new_self_0 = Arc::clone(&new_self);
|
||||
tokio::spawn(async move {
|
||||
let new_self = new_self_0;
|
||||
let max_mem_size = max_mem_size / 20 * 19;
|
||||
let max_mem_size = max_mem_size.get() / 20 * 19;
|
||||
while let Some((key, bytes, metadata, size)) = rx.recv().await {
|
||||
// Add to memory cache
|
||||
// We can add first because we constrain our memory usage to 95%
|
||||
|
@ -112,7 +112,7 @@ where
|
|||
.push(key, (bytes, metadata, size));
|
||||
|
||||
// Pop if too large
|
||||
while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size {
|
||||
while new_self.cur_mem_size.load(Ordering::Acquire) >= max_mem_size as u64 {
|
||||
let popped = new_self
|
||||
.mem_cache
|
||||
.lock()
|
||||
|
|
158
src/config.rs
158
src/config.rs
|
@ -1,49 +1,103 @@
|
|||
use std::fmt::{Display, Formatter};
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::hint::unreachable_unchecked;
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::net::IpAddr;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::num::{NonZeroU16, NonZeroU64};
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use clap::{crate_authors, crate_description, crate_version, Clap};
|
||||
use log::LevelFilter;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
use crate::units::{Kilobits, Mebibytes, Port};
|
||||
use crate::units::{KilobitsPerSecond, Mebibytes, Port};
|
||||
|
||||
// Validate tokens is an atomic because it's faster than locking on rwlock.
|
||||
pub static VALIDATE_TOKENS: AtomicBool = AtomicBool::new(false);
|
||||
// We use an atomic here because it's better for us to not pass the config
|
||||
// everywhere.
|
||||
pub static SEND_SERVER_VERSION: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
pub static OFFLINE_MODE: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
pub fn load_config() -> Config {
|
||||
const CONFIG_PATH: &str = "./settings.yaml";
|
||||
let config_file: Result<YamlArgs, _> = match File::open(CONFIG_PATH) {
|
||||
Ok(file) => serde_yaml::from_reader(file),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {
|
||||
let mut file = OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(CONFIG_PATH)
|
||||
.unwrap();
|
||||
pub fn load_config() -> Result<Config, serde_yaml::Error> {
|
||||
// Load cli args first
|
||||
let cli_args: CliArgs = CliArgs::parse();
|
||||
|
||||
file.write_all(include_bytes!("../settings.sample.yaml"))
|
||||
.unwrap();
|
||||
// Load yaml file next
|
||||
let config_file: Result<YamlArgs, _> = {
|
||||
let config_path = cli_args
|
||||
.config_path
|
||||
.as_deref()
|
||||
.unwrap_or_else(|| Path::new("./settings.yaml"));
|
||||
match File::open(config_path) {
|
||||
Ok(file) => serde_yaml::from_reader(file),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {
|
||||
let mut file = OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(config_path)
|
||||
.unwrap();
|
||||
|
||||
return load_config();
|
||||
let default_config = include_str!("../settings.sample.yaml");
|
||||
file.write_all(default_config.as_bytes()).unwrap();
|
||||
serde_yaml::from_str(default_config)
|
||||
}
|
||||
e => panic!(
|
||||
"Failed to open file at {}: {:?}",
|
||||
config_path.to_string_lossy(),
|
||||
e
|
||||
),
|
||||
}
|
||||
_ => panic!(),
|
||||
};
|
||||
|
||||
todo!()
|
||||
// generate config
|
||||
let config = Config::from_cli_and_file(cli_args, config_file?);
|
||||
|
||||
// initialize globals
|
||||
OFFLINE_MODE.store(
|
||||
config
|
||||
.unstable_options
|
||||
.contains(&UnstableOptions::OfflineMode),
|
||||
Ordering::Release,
|
||||
);
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
pub struct Config {}
|
||||
/// Represents a fully parsed config file.
|
||||
pub struct Config {
|
||||
pub cache_type: CacheType,
|
||||
pub cache_path: PathBuf,
|
||||
pub shutdown_timeout: NonZeroU16,
|
||||
pub log_level: LevelFilter,
|
||||
pub client_secret: ClientSecret,
|
||||
pub port: Port,
|
||||
pub bind_address: SocketAddr,
|
||||
pub external_address: Option<SocketAddr>,
|
||||
pub ephemeral_disk_encryption: bool,
|
||||
pub unstable_options: Vec<UnstableOptions>,
|
||||
pub network_speed: KilobitsPerSecond,
|
||||
pub disk_quota: Mebibytes,
|
||||
pub memory_quota: Mebibytes,
|
||||
pub override_upstream: Option<Url>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
fn from_cli_and_file(cli_args: CliArgs, file_args: YamlArgs) -> Self {
|
||||
let log_level = match (cli_args.quiet, cli_args.verbose) {
|
||||
(n, _) if n > 2 => LevelFilter::Off,
|
||||
(2, _) => LevelFilter::Error,
|
||||
(1, _) => LevelFilter::Warn,
|
||||
// Use log level from file if no flags were provided to CLI
|
||||
(0, 0) => file_args.extended_options.logging_level,
|
||||
(_, 1) => LevelFilter::Debug,
|
||||
(_, n) if n > 1 => LevelFilter::Trace,
|
||||
// compiler can't figure it out
|
||||
_ => unsafe { unreachable_unchecked() },
|
||||
};
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct YamlArgs {
|
||||
|
@ -59,7 +113,7 @@ struct YamlArgs {
|
|||
struct YamlServerSettings {
|
||||
secret: ClientSecret,
|
||||
port: Port,
|
||||
external_max_kilobits_per_second: Kilobits,
|
||||
external_max_kilobits_per_second: KilobitsPerSecond,
|
||||
external_port: Option<Port>,
|
||||
graceful_shutdown_wait_seconds: Option<NonZeroU16>,
|
||||
hostname: Option<IpAddr>,
|
||||
|
@ -68,29 +122,47 @@ struct YamlServerSettings {
|
|||
|
||||
// this intentionally does not implement display or debug
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct ClientSecret(String);
|
||||
pub struct ClientSecret(String);
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct YamlExtendedOptions {
|
||||
memory_quota: Option<NonZeroU64>,
|
||||
#[serde(default)]
|
||||
send_server_string: bool,
|
||||
#[serde(default)]
|
||||
cache_type: YamlCacheType,
|
||||
cache_type: CacheType,
|
||||
#[serde(default)]
|
||||
ephemeral_disk_encryption: bool,
|
||||
#[serde(default)]
|
||||
enable_metrics: bool,
|
||||
#[serde(default = "default_logging_level")]
|
||||
logging_level: LevelFilter,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum YamlCacheType {
|
||||
const fn default_logging_level() -> LevelFilter {
|
||||
LevelFilter::Info
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Copy, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CacheType {
|
||||
OnDisk,
|
||||
Lru,
|
||||
Lfu,
|
||||
}
|
||||
|
||||
impl Default for YamlCacheType {
|
||||
impl FromStr for CacheType {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"on_disk" => Ok(Self::OnDisk),
|
||||
"lru" => Ok(Self::Lru),
|
||||
"lfu" => Ok(Self::Lfu),
|
||||
_ => Err(format!("Unknown option: {}", s)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CacheType {
|
||||
fn default() -> Self {
|
||||
Self::OnDisk
|
||||
}
|
||||
|
@ -98,13 +170,13 @@ impl Default for YamlCacheType {
|
|||
|
||||
#[derive(Clap, Clone)]
|
||||
#[clap(version = crate_version!(), author = crate_authors!(), about = crate_description!())]
|
||||
pub struct CliArgs {
|
||||
struct CliArgs {
|
||||
/// The port to listen on.
|
||||
#[clap(short, long, default_value = "42069")]
|
||||
pub port: Port,
|
||||
/// How large, in bytes, the in-memory cache should be. Note that this does
|
||||
/// not include runtime memory usage.
|
||||
#[clap(long, conflicts_with = "low-memory")]
|
||||
#[clap(long)]
|
||||
pub memory_quota: Option<NonZeroU64>,
|
||||
/// How large, in bytes, the on-disk cache should be. Note that actual
|
||||
/// values may be larger for metadata information.
|
||||
|
@ -116,16 +188,6 @@ pub struct CliArgs {
|
|||
/// The network speed to advertise to Mangadex@Home control server.
|
||||
#[clap(long)]
|
||||
pub network_speed: NonZeroU64,
|
||||
/// Whether or not to provide the Server HTTP header to clients. This is
|
||||
/// useful for debugging, but is generally not recommended for security
|
||||
/// reasons.
|
||||
#[clap(long, takes_value = false)]
|
||||
pub send_server_string: bool,
|
||||
/// Changes the caching behavior to avoid buffering images in memory, and
|
||||
/// instead use the filesystem as the buffer backing. This is useful for
|
||||
/// clients in low (< 1GB) RAM environments.
|
||||
#[clap(short, long, conflicts_with("memory-quota"), takes_value = false)]
|
||||
pub low_memory: bool,
|
||||
/// Changes verbosity. Default verbosity is INFO, while increasing counts of
|
||||
/// verbose flags increases the verbosity to DEBUG and TRACE, respectively.
|
||||
#[clap(short, long, parse(from_occurrences))]
|
||||
|
@ -146,6 +208,8 @@ pub struct CliArgs {
|
|||
pub ephemeral_disk_encryption: bool,
|
||||
#[clap(short, long)]
|
||||
pub config_path: Option<PathBuf>,
|
||||
#[clap(default_value = "on_disk")]
|
||||
pub cache_type: CacheType,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
|
@ -154,10 +218,6 @@ pub enum UnstableOptions {
|
|||
/// you know what you're dealing with.
|
||||
OverrideUpstream,
|
||||
|
||||
/// Use an LFU implementation for the in-memory cache instead of the default
|
||||
/// LRU implementation.
|
||||
UseLfu,
|
||||
|
||||
/// Disables token validation. Don't use this unless you know the
|
||||
/// ramifications of this command.
|
||||
DisableTokenValidation,
|
||||
|
@ -175,7 +235,6 @@ impl FromStr for UnstableOptions {
|
|||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"override-upstream" => Ok(Self::OverrideUpstream),
|
||||
"use-lfu" => Ok(Self::UseLfu),
|
||||
"disable-token-validation" => Ok(Self::DisableTokenValidation),
|
||||
"offline-mode" => Ok(Self::OfflineMode),
|
||||
"disable-tls" => Ok(Self::DisableTls),
|
||||
|
@ -188,7 +247,6 @@ impl Display for UnstableOptions {
|
|||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::OverrideUpstream => write!(f, "override-upstream"),
|
||||
Self::UseLfu => write!(f, "use-lfu"),
|
||||
Self::DisableTokenValidation => write!(f, "disable-token-validation"),
|
||||
Self::OfflineMode => write!(f, "offline-mode"),
|
||||
Self::DisableTls => write!(f, "disable-tls"),
|
||||
|
|
101
src/main.rs
101
src/main.rs
|
@ -5,8 +5,7 @@
|
|||
use std::env::{self, VarError};
|
||||
use std::error::Error;
|
||||
use std::fmt::Display;
|
||||
use std::hint::unreachable_unchecked;
|
||||
use std::num::{NonZeroU64, ParseIntError};
|
||||
use std::num::ParseIntError;
|
||||
use std::process;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
@ -16,9 +15,8 @@ use actix_web::rt::{spawn, time, System};
|
|||
use actix_web::web::{self, Data};
|
||||
use actix_web::{App, HttpResponse, HttpServer};
|
||||
use cache::{Cache, DiskCache};
|
||||
use clap::Clap;
|
||||
use config::CliArgs;
|
||||
use log::{debug, error, info, warn, LevelFilter};
|
||||
use config::Config;
|
||||
use log::{debug, error, info, warn};
|
||||
use parking_lot::RwLock;
|
||||
use rustls::{NoClientAuth, ServerConfig};
|
||||
use simple_logger::SimpleLogger;
|
||||
|
@ -29,7 +27,7 @@ use thiserror::Error;
|
|||
|
||||
use crate::cache::mem::{Lfu, Lru};
|
||||
use crate::cache::{MemoryCache, ENCRYPTION_KEY};
|
||||
use crate::config::{UnstableOptions, OFFLINE_MODE};
|
||||
use crate::config::{CacheType, UnstableOptions, OFFLINE_MODE};
|
||||
use crate::state::DynamicServerCert;
|
||||
|
||||
mod cache;
|
||||
|
@ -41,12 +39,7 @@ mod state;
|
|||
mod stop;
|
||||
mod units;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! client_api_version {
|
||||
() => {
|
||||
"31"
|
||||
};
|
||||
}
|
||||
const CLIENT_API_VERSION: usize = 31;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
enum ServerError {
|
||||
|
@ -66,46 +59,23 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
// Config loading
|
||||
//
|
||||
|
||||
let config = config::load_config();
|
||||
|
||||
let cli_args = CliArgs::parse();
|
||||
let port = cli_args.port;
|
||||
let memory_max_size = cli_args
|
||||
.memory_quota
|
||||
.map(NonZeroU64::get)
|
||||
.unwrap_or_default();
|
||||
let disk_quota = cli_args.disk_quota;
|
||||
let cache_path = cli_args.cache_path.clone();
|
||||
let low_mem_mode = cli_args.low_memory;
|
||||
let use_lfu = cli_args.unstable_options.contains(&UnstableOptions::UseLfu);
|
||||
let disable_tls = cli_args
|
||||
let config = config::load_config()?;
|
||||
let memory_quota = config.memory_quota;
|
||||
let disk_quota = config.disk_quota;
|
||||
let cache_type = config.cache_type;
|
||||
let cache_path = config.cache_path.clone();
|
||||
let disable_tls = config
|
||||
.unstable_options
|
||||
.contains(&UnstableOptions::DisableTls);
|
||||
OFFLINE_MODE.store(
|
||||
cli_args
|
||||
.unstable_options
|
||||
.contains(&UnstableOptions::OfflineMode),
|
||||
Ordering::Release,
|
||||
);
|
||||
let bind_address = config.bind_address;
|
||||
|
||||
//
|
||||
// Logging and warnings
|
||||
//
|
||||
|
||||
let log_level = match (cli_args.quiet, cli_args.verbose) {
|
||||
(n, _) if n > 2 => LevelFilter::Off,
|
||||
(2, _) => LevelFilter::Error,
|
||||
(1, _) => LevelFilter::Warn,
|
||||
(0, 0) => LevelFilter::Info,
|
||||
(_, 1) => LevelFilter::Debug,
|
||||
(_, n) if n > 1 => LevelFilter::Trace,
|
||||
// compiler can't figure it out
|
||||
_ => unsafe { unreachable_unchecked() },
|
||||
};
|
||||
SimpleLogger::new().with_level(config.log_level).init()?;
|
||||
|
||||
SimpleLogger::new().with_level(log_level).init()?;
|
||||
|
||||
if let Err(e) = print_preamble_and_warnings(&cli_args) {
|
||||
if let Err(e) = print_preamble_and_warnings(&config) {
|
||||
error!("{}", e);
|
||||
return Err(e);
|
||||
}
|
||||
|
@ -118,7 +88,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
};
|
||||
let client_secret_1 = client_secret.clone();
|
||||
|
||||
if cli_args.ephemeral_disk_encryption {
|
||||
if config.ephemeral_disk_encryption {
|
||||
info!("Running with at-rest encryption!");
|
||||
ENCRYPTION_KEY.set(gen_key()).unwrap();
|
||||
}
|
||||
|
@ -130,18 +100,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
let server = if OFFLINE_MODE.load(Ordering::Acquire) {
|
||||
ServerState::init_offline()
|
||||
} else {
|
||||
ServerState::init(&client_secret, &cli_args).await?
|
||||
ServerState::init(&client_secret, &config).await?
|
||||
};
|
||||
let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
|
||||
let data_1 = Arc::clone(&data_0);
|
||||
|
||||
// Rustls only supports TLS 1.2 and 1.3.
|
||||
let tls_config = {
|
||||
let mut tls_config = ServerConfig::new(NoClientAuth::new());
|
||||
tls_config.cert_resolver = Arc::new(DynamicServerCert);
|
||||
tls_config
|
||||
};
|
||||
|
||||
//
|
||||
// At this point, the server is ready to start, and starts the necessary
|
||||
// threads.
|
||||
|
@ -177,18 +140,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
loop {
|
||||
interval.tick().await;
|
||||
debug!("Sending ping!");
|
||||
ping::update_server_state(&client_secret_1, &cli_args, &mut data).await;
|
||||
ping::update_server_state(&client_secret_1, &config, &mut data).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let cache = DiskCache::new(disk_quota, cache_path.clone()).await;
|
||||
let cache: Arc<dyn Cache> = if low_mem_mode {
|
||||
cache
|
||||
} else if use_lfu {
|
||||
MemoryCache::<Lfu, _>::new(cache, memory_max_size).await
|
||||
} else {
|
||||
MemoryCache::<Lru, _>::new(cache, memory_max_size).await
|
||||
let memory_max_size = memory_quota.into();
|
||||
let cache = DiskCache::new(disk_quota.into(), cache_path.clone()).await;
|
||||
let cache: Arc<dyn Cache> = match cache_type {
|
||||
CacheType::OnDisk => cache,
|
||||
CacheType::Lru => MemoryCache::<Lfu, _>::new(cache, memory_max_size).await,
|
||||
CacheType::Lfu => MemoryCache::<Lru, _>::new(cache, memory_max_size).await,
|
||||
};
|
||||
|
||||
let cache_0 = Arc::clone(&cache);
|
||||
|
@ -214,14 +176,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
})
|
||||
.shutdown_timeout(60);
|
||||
|
||||
let server_bind_address = format!("0.0.0.0:{}", port);
|
||||
if disable_tls {
|
||||
server.bind(server_bind_address)?.run().await?;
|
||||
server.bind(bind_address)?.run().await?;
|
||||
} else {
|
||||
server
|
||||
.bind_rustls(server_bind_address, tls_config)?
|
||||
.run()
|
||||
.await?;
|
||||
// Rustls only supports TLS 1.2 and 1.3.
|
||||
let tls_config = {
|
||||
let mut tls_config = ServerConfig::new(NoClientAuth::new());
|
||||
tls_config.cert_resolver = Arc::new(DynamicServerCert);
|
||||
tls_config
|
||||
};
|
||||
|
||||
server.bind_rustls(bind_address, tls_config)?.run().await?;
|
||||
}
|
||||
|
||||
// Waiting for us to finish sending stop message
|
||||
|
@ -253,7 +218,7 @@ impl Display for InvalidCombination {
|
|||
|
||||
impl Error for InvalidCombination {}
|
||||
|
||||
fn print_preamble_and_warnings(args: &CliArgs) -> Result<(), Box<dyn Error>> {
|
||||
fn print_preamble_and_warnings(args: &Config) -> Result<(), Box<dyn Error>> {
|
||||
println!(concat!(
|
||||
env!("CARGO_PKG_NAME"),
|
||||
" ",
|
||||
|
|
34
src/ping.rs
34
src/ping.rs
|
@ -1,4 +1,3 @@
|
|||
use std::num::NonZeroU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::{io::BufReader, sync::Arc};
|
||||
|
||||
|
@ -12,13 +11,13 @@ use serde_repr::Deserialize_repr;
|
|||
use sodiumoxide::crypto::box_::PrecomputedKey;
|
||||
use url::Url;
|
||||
|
||||
use crate::config::{CliArgs, VALIDATE_TOKENS};
|
||||
use crate::config::{Config, UnstableOptions, VALIDATE_TOKENS};
|
||||
use crate::state::{
|
||||
RwLockServerState, PREVIOUSLY_COMPROMISED, PREVIOUSLY_PAUSED, TLS_CERTS,
|
||||
TLS_PREVIOUSLY_CREATED, TLS_SIGNING_KEY,
|
||||
};
|
||||
use crate::units::Port;
|
||||
use crate::{client_api_version, config::UnstableOptions};
|
||||
use crate::units::{BytesPerSecond, Mebibytes, Port};
|
||||
use crate::CLIENT_API_VERSION;
|
||||
|
||||
pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
|
||||
|
||||
|
@ -26,22 +25,20 @@ pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
|
|||
pub struct Request<'a> {
|
||||
secret: &'a str,
|
||||
port: Port,
|
||||
disk_space: u64,
|
||||
network_speed: NonZeroU64,
|
||||
build_version: u64,
|
||||
disk_space: Mebibytes,
|
||||
network_speed: BytesPerSecond,
|
||||
build_version: usize,
|
||||
tls_created_at: Option<String>,
|
||||
}
|
||||
|
||||
impl<'a> Request<'a> {
|
||||
fn from_config_and_state(secret: &'a str, config: &CliArgs) -> Self {
|
||||
fn from_config_and_state(secret: &'a str, config: &Config) -> Self {
|
||||
Self {
|
||||
secret,
|
||||
port: config.port,
|
||||
disk_space: config.disk_quota,
|
||||
network_speed: config.network_speed,
|
||||
build_version: client_api_version!()
|
||||
.parse()
|
||||
.expect("to parse the build version"),
|
||||
network_speed: config.network_speed.into(),
|
||||
build_version: CLIENT_API_VERSION,
|
||||
tls_created_at: TLS_PREVIOUSLY_CREATED
|
||||
.get()
|
||||
.map(|v| v.load().as_ref().clone()),
|
||||
|
@ -49,17 +46,14 @@ impl<'a> Request<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::fallible_impl_from)]
|
||||
impl<'a> From<(&'a str, &CliArgs)> for Request<'a> {
|
||||
fn from((secret, config): (&'a str, &CliArgs)) -> Self {
|
||||
impl<'a> From<(&'a str, &Config)> for Request<'a> {
|
||||
fn from((secret, config): (&'a str, &Config)) -> Self {
|
||||
Self {
|
||||
secret,
|
||||
port: config.port,
|
||||
disk_space: config.disk_quota,
|
||||
network_speed: config.network_speed,
|
||||
build_version: client_api_version!()
|
||||
.parse()
|
||||
.expect("to parse the build version"),
|
||||
network_speed: config.network_speed.into(),
|
||||
build_version: CLIENT_API_VERSION,
|
||||
tls_created_at: None,
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +161,7 @@ impl std::fmt::Debug for Tls {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn update_server_state(secret: &str, cli: &CliArgs, data: &mut Arc<RwLockServerState>) {
|
||||
pub async fn update_server_state(secret: &str, cli: &Config, data: &mut Arc<RwLockServerState>) {
|
||||
let req = Request::from_config_and_state(secret, cli);
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client.post(CONTROL_CENTER_PING_URL).json(&req).send().await;
|
||||
|
|
|
@ -22,15 +22,14 @@ use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBY
|
|||
use thiserror::Error;
|
||||
|
||||
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
|
||||
use crate::client_api_version;
|
||||
use crate::config::{OFFLINE_MODE, SEND_SERVER_VERSION, VALIDATE_TOKENS};
|
||||
use crate::config::{OFFLINE_MODE, VALIDATE_TOKENS};
|
||||
use crate::metrics::{
|
||||
CACHE_HIT_COUNTER, CACHE_MISS_COUNTER, REQUESTS_DATA_COUNTER, REQUESTS_DATA_SAVER_COUNTER,
|
||||
REQUESTS_OTHER_COUNTER, REQUESTS_TOTAL_COUNTER,
|
||||
};
|
||||
use crate::state::RwLockServerState;
|
||||
|
||||
pub const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false);
|
||||
const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false);
|
||||
|
||||
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
|
||||
Client::builder()
|
||||
|
@ -41,15 +40,6 @@ static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
|
|||
.expect("Client initialization to work")
|
||||
});
|
||||
|
||||
const SERVER_ID_STRING: &str = concat!(
|
||||
env!("CARGO_CRATE_NAME"),
|
||||
" ",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
" (",
|
||||
client_api_version!(),
|
||||
") - Conforming to spec revision b82043289",
|
||||
);
|
||||
|
||||
enum ServerResponse {
|
||||
TokenValidationError(TokenValidationError),
|
||||
HttpResponse(HttpResponse),
|
||||
|
@ -226,10 +216,6 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
|
|||
.insert_header((CACHE_CONTROL, "public, max-age=1209600"))
|
||||
.insert_header(("Timing-Allow-Origin", "https://mangadex.org"));
|
||||
|
||||
if SEND_SERVER_VERSION.load(Ordering::Acquire) {
|
||||
builder.insert_header(("Server", SERVER_ID_STRING));
|
||||
}
|
||||
|
||||
builder
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use crate::config::{CliArgs, UnstableOptions, OFFLINE_MODE, SEND_SERVER_VERSION, VALIDATE_TOKENS};
|
||||
use crate::config::{Config, UnstableOptions, OFFLINE_MODE, VALIDATE_TOKENS};
|
||||
use crate::ping::{Request, Response, CONTROL_CENTER_PING_URL};
|
||||
use arc_swap::ArcSwap;
|
||||
use log::{error, info, warn};
|
||||
|
@ -45,18 +45,13 @@ pub enum ServerInitError {
|
|||
}
|
||||
|
||||
impl ServerState {
|
||||
pub async fn init(secret: &str, config: &CliArgs) -> Result<Self, ServerInitError> {
|
||||
pub async fn init(secret: &str, config: &Config) -> Result<Self, ServerInitError> {
|
||||
let resp = reqwest::Client::new()
|
||||
.post(CONTROL_CENTER_PING_URL)
|
||||
.json(&Request::from((secret, config)))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
if config.send_server_string {
|
||||
warn!("Client will send Server header in responses. This is not recommended!");
|
||||
SEND_SERVER_VERSION.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
match resp {
|
||||
Ok(resp) => match resp.json::<Response>().await {
|
||||
Ok(Response::Ok(mut resp)) => {
|
||||
|
|
27
src/units.rs
27
src/units.rs
|
@ -22,18 +22,37 @@ impl Display for Port {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Default, Debug, Hash, Eq, PartialEq)]
|
||||
#[derive(Copy, Clone, Serialize, Deserialize, Default, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct Mebibytes(usize);
|
||||
|
||||
impl Mebibytes {
|
||||
pub const fn as_bytes(&self) -> usize {
|
||||
self.0 << 20
|
||||
pub const fn get(self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Bytes(usize);
|
||||
|
||||
impl Bytes {
|
||||
pub const fn get(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Mebibytes> for Bytes {
|
||||
fn from(mib: Mebibytes) -> Self {
|
||||
Self(mib.0 << 20)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct Kilobits(NonZeroU64);
|
||||
pub struct KilobitsPerSecond(NonZeroU64);
|
||||
|
||||
#[derive(Copy, Clone, Serialize, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct BytesPerSecond(NonZeroU64);
|
||||
|
||||
impl From<KilobitsPerSecond> for BytesPerSecond {
|
||||
fn from(kbps: KilobitsPerSecond) -> Self {
|
||||
Self(unsafe { NonZeroU64::new_unchecked(kbps.0.get() * 125) })
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue