finish minimum version

feature/v32-tokens
Edward Shen 2021-03-22 17:47:56 -04:00
parent 562b439c25
commit 95e41f42c0
Signed by: edward
GPG Key ID: 19182661E818369F
9 changed files with 1332 additions and 167 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target
.env

859
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
actix-web = { version = "4.0.0-beta.4", features = [ "rustls" ] }
awc = "3.0.0-beta.3"
reqwest = { version = "0.11", features = ["json"] }
parking_lot = "0.11"
base64 = "0.13"
sodiumoxide = "0.2"
@ -22,4 +22,8 @@ rustls = "0.19"
simple_logger = "1"
lru = "0.6"
futures = "0.3"
bytes = "1"
bytes = "1"
cacache = "8"
ssri = "5"
bincode = "1"
ctrlc = "3"

202
src/cache.rs Normal file
View File

@ -0,0 +1,202 @@
use std::{fmt::Display, path::PathBuf};
use futures::future::join_all;
use log::warn;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use ssri::Integrity;
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct CacheKey(pub String, pub String, pub bool);
impl Display for CacheKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.2 {
write!(f, "saver/{}/{}", self.0, self.1)
} else {
write!(f, "data/{}/{}", self.0, self.1)
}
}
}
#[derive(Serialize, Deserialize)]
pub struct CachedImage {
pub data: Vec<u8>,
pub content_type: Option<Vec<u8>>,
pub content_length: Option<Vec<u8>>,
pub last_modified: Option<Vec<u8>>,
}
impl CachedImage {
fn len(&self) -> usize {
self.data.capacity()
+ self
.content_type
.as_ref()
.map(Vec::capacity)
.unwrap_or_default()
+ self
.content_length
.as_ref()
.map(Vec::capacity)
.unwrap_or_default()
+ self
.last_modified
.as_ref()
.map(Vec::capacity)
.unwrap_or_default()
}
fn shrink_to_fit(&mut self) {
self.data.shrink_to_fit();
self.content_length.as_mut().map(Vec::shrink_to_fit);
self.last_modified.as_mut().map(Vec::shrink_to_fit);
}
}
pub struct Cache {
in_memory: LruCache<CacheKey, CachedImage>,
memory_max_size: usize,
memory_cur_size: usize,
on_disk: LruCache<CacheKey, Integrity>,
disk_path: PathBuf,
disk_max_size: usize,
disk_cur_size: usize,
}
impl Cache {
pub fn new(memory_max_size: usize, disk_max_size: usize, disk_path: PathBuf) -> Self {
Self {
in_memory: LruCache::unbounded(),
memory_max_size,
memory_cur_size: 0,
on_disk: LruCache::unbounded(),
disk_path,
disk_max_size,
disk_cur_size: 0,
}
}
pub async fn get(&mut self, key: &CacheKey) -> Option<&CachedImage> {
if self.in_memory.contains(key) {
return self.in_memory.get(key);
}
if let Some(disk_key) = self.on_disk.pop(key) {
// extract from disk, if it exists
let bytes = cacache::read_hash(&self.disk_path, &disk_key).await.ok()?;
// We don't particularly care if we fail to delete from disk since
// if it's an error it means it's already been dropped.
cacache::remove_hash(&self.disk_path, &disk_key).await.ok();
self.disk_cur_size -= bytes.len();
let cached_image: CachedImage = match bincode::deserialize(&bytes) {
Ok(image) => image,
Err(e) => {
warn!("Failed to serialize on-disk data?! {}", e);
return None;
}
};
// put into in-memory
self.memory_cur_size += cached_image.len();
self.put(key.clone(), cached_image).await;
}
None
}
pub async fn put(&mut self, key: CacheKey, mut image: CachedImage) {
image.shrink_to_fit();
let mut hot_evicted = vec![];
let new_img_size = image.len();
if self.memory_max_size >= new_img_size {
// Evict oldest entires to make space for new image.
while new_img_size + self.memory_cur_size > self.memory_max_size {
match self.in_memory.pop_lru() {
Some((key, evicted_image)) => {
self.memory_cur_size -= evicted_image.len();
hot_evicted.push((key, evicted_image));
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
self.in_memory.put(key, image);
self.memory_cur_size += new_img_size;
} else {
// Image was larger than memory capacity, push directly into cold
// storage.
self.push_into_cold(key, image).await;
}
// Push evicted hot entires into cold storage.
for (key, image) in hot_evicted {
self.push_into_cold(key, image).await;
}
}
async fn push_into_cold(&mut self, key: CacheKey, image: CachedImage) {
let image = bincode::serialize(&image).unwrap(); // This should never fail.
let new_img_size = image.len();
let mut to_drop = vec![];
if self.disk_max_size >= new_img_size {
// Add images to drop from cold cache into a queue
while new_img_size + self.disk_cur_size > self.disk_max_size {
match self.on_disk.pop_lru() {
Some((key, disk_key)) => {
// Basically, the assumption here is that if the meta
// data was deleted, we can't assume that deleting the
// value will yield any free space back, so we assume a
// size of zero while this is the case. This also means
// that we automatically drop broken links as well.
let on_disk_size = cacache::metadata(&self.disk_path, key.to_string())
.await
.unwrap_or_default()
.map(|metadata| metadata.size)
.unwrap_or_default();
self.disk_cur_size -= on_disk_size;
to_drop.push(disk_key);
}
None => unreachable!(concat!(
"Invariant violated. Cache is empty but we already ",
"guaranteed we can remove items from cache to make space."
)),
}
}
} else {
warn!(
"Got request to push file larger than maximum disk cache. Refusing to insert on disk! {}",
key
);
return;
}
let mut futs = vec![];
for key in &to_drop {
futs.push(cacache::remove_hash(&self.disk_path, key));
}
// Run all cold caching in parallel, we don't care if the removal fails
// because it just means it's already been removed.
join_all(futs).await;
let new_disk_key = match cacache::write(&self.disk_path, key.to_string(), image).await {
Ok(key) => key,
Err(e) => {
warn!(
"failed to write to disk cache, dropping value instead: {}",
e
);
return;
}
};
self.on_disk.put(key.clone(), new_disk_key);
self.disk_cur_size += new_img_size;
}
}

View File

@ -2,135 +2,36 @@
#![allow(clippy::future_not_send)] // We're end users, so this is ok
use std::env::{self, VarError};
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use std::{num::ParseIntError, sync::Arc};
use std::{num::ParseIntError, sync::atomic::Ordering};
use crate::ping::Tls;
use actix_web::rt::{spawn, time};
use actix_web::rt::{spawn, time, System};
use actix_web::web;
use actix_web::web::Data;
use actix_web::{App, HttpServer};
use awc::{error::SendRequestError, Client};
use log::{debug, error, info, warn};
use lru::LruCache;
use log::{error, warn, LevelFilter};
use parking_lot::RwLock;
use ping::{Request, Response};
use rustls::sign::{CertifiedKey, RSASigningKey};
use rustls::PrivateKey;
use rustls::{Certificate, NoClientAuth, ResolvesServerCert, ServerConfig};
use rustls::{NoClientAuth, ServerConfig};
use simple_logger::SimpleLogger;
use sodiumoxide::crypto::box_::PrecomputedKey;
use state::{RwLockServerState, ServerState};
use stop::send_stop;
use thiserror::Error;
use url::Url;
mod cache;
mod ping;
mod routes;
mod state;
mod stop;
const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
#[macro_export]
macro_rules! client_api_version {
() => {
"30"
};
}
struct ServerState {
precomputed_key: PrecomputedKey,
image_server: Url,
tls_config: Tls,
disabled_tokens: bool,
url: String,
cache: LruCache<(String, String, bool), CachedImage>,
}
struct CachedImage {
data: Vec<u8>,
content_type: Option<Vec<u8>>,
content_length: Option<Vec<u8>>,
last_modified: Option<Vec<u8>>,
}
impl ServerState {
async fn init(config: &Config) -> Result<Self, ()> {
let resp = Client::new()
.post(CONTROL_CENTER_PING_URL)
.send_json(&Request::from(config))
.await;
match resp {
Ok(mut resp) => match resp.json::<Response>().await {
Ok(resp) => {
let key = resp
.token_key
.and_then(|key| {
if let Some(key) = PrecomputedKey::from_slice(key.as_bytes()) {
Some(key)
} else {
error!("Failed to parse token key: got {}", key);
None
}
})
.unwrap();
if resp.compromised {
warn!("Got compromised response from control center!");
}
if resp.paused {
debug!("Got paused response from control center.");
}
info!("This client's URL has been set to {}", resp.url);
if resp.disabled_tokens {
info!("This client will not validated tokens");
}
Ok(Self {
precomputed_key: key,
image_server: resp.image_server,
tls_config: resp.tls.unwrap(),
disabled_tokens: resp.disabled_tokens,
url: resp.url,
cache: LruCache::new(1000),
})
}
Err(e) => {
warn!("Got malformed response: {}", e);
Err(())
}
},
Err(e) => match e {
SendRequestError::Timeout => {
error!("Response timed out to control server. Is MangaDex down?");
Err(())
}
e => {
warn!("Failed to send request: {}", e);
Err(())
}
},
}
}
}
pub struct RwLockServerState(RwLock<ServerState>);
impl ResolvesServerCert for RwLockServerState {
fn resolve(&self, _: rustls::ClientHello) -> Option<CertifiedKey> {
let read_guard = self.0.read();
Some(CertifiedKey {
cert: vec![Certificate(read_guard.tls_config.certificate.clone())],
key: Arc::new(Box::new(
RSASigningKey::new(&PrivateKey(read_guard.tls_config.private_key.clone())).unwrap(),
)),
ocsp: None,
sct_list: None,
})
}
}
#[derive(Error, Debug)]
enum ServerError {
#[error("There was a failure parsing config")]
@ -141,13 +42,29 @@ enum ServerError {
#[actix_web::main]
async fn main() -> Result<(), std::io::Error> {
// It's ok to fail early here, it would imply we have a invalid config.
dotenv::dotenv().ok();
SimpleLogger::new().init().unwrap();
SimpleLogger::new()
.with_level(LevelFilter::Info)
.init()
.unwrap();
let config = Config::new().unwrap();
let port = config.port;
let server = ServerState::init(&config).await.unwrap();
// Set ctrl+c to send a stop message
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
let client_secret = config.secret.clone();
ctrlc::set_handler(move || {
let client_secret = client_secret.clone();
System::new().block_on(async move {
send_stop(&client_secret).await;
});
r.store(false, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
let data_1 = Arc::clone(&data_0);
let data_2 = Arc::clone(&data_0);
@ -167,32 +84,52 @@ async fn main() -> Result<(), std::io::Error> {
HttpServer::new(move || {
App::new()
.service(routes::token_data)
.service(routes::no_token_data)
.service(routes::token_data_saver)
.service(routes::no_token_data_saver)
.route("{tail:.*}", web::get().to(routes::default))
.app_data(Data::from(Arc::clone(&data_1)))
})
.shutdown_timeout(60)
.bind_rustls(format!("0.0.0.0:{}", port), tls_config)?
.run()
.await
.await?;
// Waiting for us to finish sending stop message
while running.load(Ordering::SeqCst) {
std::thread::sleep(Duration::from_millis(250));
}
Ok(())
}
pub struct Config {
secret: String,
port: u16,
memory_quota: usize,
disk_quota: usize,
disk_path: PathBuf,
network_speed: usize,
}
impl Config {
fn new() -> Result<Self, ServerError> {
let secret = env::var("CLIENT_SECRET")?;
let port = env::var("PORT")?.parse::<u16>()?;
let disk_quota = env::var("MAX_STORAGE_BYTES")?.parse::<usize>()?;
let network_speed = env::var("MAX_NETWORK_SPEED")?.parse::<usize>()?;
let port = env::var("PORT")?.parse()?;
let disk_quota = env::var("DISK_CACHE_QUOTA_BYTES")?.parse()?;
let memory_quota = env::var("MEM_CACHE_QUOTA_BYTES")?.parse()?;
let network_speed = env::var("MAX_NETWORK_SPEED")?.parse()?;
let disk_path = env::var("DISK_CACHE_PATH")
.unwrap_or("./cache".to_string())
.parse()
.unwrap();
Ok(Self {
secret,
port,
disk_quota,
memory_quota,
disk_path,
network_speed,
})
}

View File

@ -1,14 +1,16 @@
use std::sync::Arc;
use awc::{error::SendRequestError, Client};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::box_::PrecomputedKey;
use url::Url;
use crate::{client_api_version, Config, RwLockServerState, CONTROL_CENTER_PING_URL};
use crate::state::RwLockServerState;
use crate::{client_api_version, Config};
#[derive(Serialize)]
pub const CONTROL_CENTER_PING_URL: &str = "https://api.mangadex.network/ping";
#[derive(Serialize, Debug)]
pub struct Request<'a> {
secret: &'a str,
port: u16,
@ -44,7 +46,7 @@ impl<'a> From<&'a Config> for Request<'a> {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct Response {
pub image_server: Url,
pub latest_build: usize,
@ -52,38 +54,40 @@ pub struct Response {
pub token_key: Option<String>,
pub compromised: bool,
pub paused: bool,
pub disabled_tokens: bool,
pub force_tokens: bool,
pub tls: Option<Tls>,
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct Tls {
pub created_at: String,
pub private_key: Vec<u8>,
pub certificate: Vec<u8>,
pub private_key: String,
pub certificate: String,
}
pub async fn update_server_state(req: &Config, data: &mut Arc<RwLockServerState>) {
let req = Request::from_config_and_state(req, data);
let resp = Client::new()
.post(CONTROL_CENTER_PING_URL)
.send_json(&req)
.await;
let client = reqwest::Client::new();
let resp = client.post(CONTROL_CENTER_PING_URL).json(&req).send().await;
match resp {
Ok(mut resp) => match resp.json::<Response>().await {
Ok(resp) => match resp.json::<Response>().await {
Ok(resp) => {
let mut write_guard = data.0.write();
write_guard.image_server = resp.image_server;
if let Some(key) = resp.token_key {
match PrecomputedKey::from_slice(key.as_bytes()) {
Some(key) => write_guard.precomputed_key = key,
None => error!("Failed to parse token key: got {}", key),
if let Some(key) = base64::decode(&key)
.ok()
.and_then(|k| PrecomputedKey::from_slice(&k))
{
write_guard.precomputed_key = key;
} else {
error!("Failed to parse token key: got {}", key);
}
}
write_guard.disabled_tokens = resp.disabled_tokens;
write_guard.force_tokens = resp.force_tokens;
if let Some(tls) = resp.tls {
write_guard.tls_config = tls;
@ -104,7 +108,7 @@ pub async fn update_server_state(req: &Config, data: &mut Arc<RwLockServerState>
Err(e) => warn!("Got malformed response: {}", e),
},
Err(e) => match e {
SendRequestError::Timeout => {
e if e.is_timeout() => {
error!("Response timed out to control server. Is MangaDex down?")
}
e => warn!("Failed to send request: {}", e),

View File

@ -7,19 +7,20 @@ use actix_web::http::header::{
};
use actix_web::web::Path;
use actix_web::{get, web::Data, HttpRequest, HttpResponse, Responder};
use awc::Client;
use base64::DecodeError;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream;
use log::{error, warn};
use log::{error, info, warn};
use serde::Deserialize;
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
use thiserror::Error;
use crate::{client_api_version, CachedImage, RwLockServerState};
use crate::cache::{CacheKey, CachedImage};
use crate::client_api_version;
use crate::state::RwLockServerState;
const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false);
pub const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false);
const SERVER_ID_STRING: &str = concat!(
env!("CARGO_CRATE_NAME"),
@ -50,7 +51,7 @@ async fn token_data(
path: Path<(String, String, String)>,
) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner();
if !state.0.read().disabled_tokens {
if state.0.read().force_tokens {
if let Err(e) = validate_token(&state.0.read().precomputed_key, token, &chapter_hash) {
return ServerResponse::TokenValidationError(e);
}
@ -65,7 +66,7 @@ async fn token_data_saver(
path: Path<(String, String, String)>,
) -> impl Responder {
let (token, chapter_hash, file_name) = path.into_inner();
if !state.0.read().disabled_tokens {
if state.0.read().force_tokens {
if let Err(e) = validate_token(&state.0.read().precomputed_key, token, &chapter_hash) {
return ServerResponse::TokenValidationError(e);
}
@ -91,6 +92,24 @@ async fn no_token_data_saver(
fetch_image(state, chapter_hash, file_name, true).await
}
pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl Responder {
let path = &format!(
"{}{}",
state.0.read().image_server,
req.path().chars().skip(1).collect::<String>()
);
info!("Got unknown path, just proxying: {}", path);
let resp = reqwest::get(path).await.unwrap();
let content_type = resp.headers().get(CONTENT_TYPE);
let mut resp_builder = HttpResponseBuilder::new(resp.status());
if let Some(content_type) = content_type {
resp_builder.insert_header((CONTENT_TYPE, content_type));
}
push_headers(&mut resp_builder);
ServerResponse::HttpResponse(resp_builder.body(resp.bytes().await.unwrap_or_default()))
}
#[derive(Error, Debug)]
enum TokenValidationError {
#[error("Failed to decode base64 token.")]
@ -165,26 +184,46 @@ async fn fetch_image(
file_name: String,
is_data_saver: bool,
) -> ServerResponse {
let key = (chapter_hash, file_name, is_data_saver);
let key = CacheKey(chapter_hash, file_name, is_data_saver);
if let Some(cached) = state.0.write().cache.get(&key) {
if let Some(cached) = state.0.write().cache.get(&key).await {
return construct_response(cached);
}
let mut state = state.0.write();
let resp = if is_data_saver {
Client::new().get(format!(
reqwest::get(format!(
"{}/data-saver/{}/{}",
state.image_server, &key.1, &key.2
))
} else {
Client::new().get(format!("{}/data/{}/{}", state.image_server, &key.1, &key.2))
reqwest::get(format!("{}/data/{}/{}", state.image_server, &key.1, &key.2))
}
.send()
.await;
match resp {
Ok(mut resp) => {
Ok(resp) => {
let content_type = resp.headers().get(CONTENT_TYPE);
let is_image = content_type
.map(|v| String::from_utf8_lossy(v.as_ref()).contains("image/"))
.unwrap_or_default();
if resp.status() != 200 || !is_image {
warn!(
"Got non-OK or non-image response code from upstream, proxying and not caching result.",
);
let mut resp_builder = HttpResponseBuilder::new(resp.status());
if let Some(content_type) = content_type {
resp_builder.insert_header((CONTENT_TYPE, content_type));
}
push_headers(&mut resp_builder);
return ServerResponse::HttpResponse(
resp_builder.body(resp.bytes().await.unwrap_or_default()),
);
}
let headers = resp.headers();
let content_type = headers.get(CONTENT_TYPE).map(AsRef::as_ref).map(Vec::from);
let content_length = headers
@ -192,7 +231,7 @@ async fn fetch_image(
.map(AsRef::as_ref)
.map(Vec::from);
let last_modified = headers.get(LAST_MODIFIED).map(AsRef::as_ref).map(Vec::from);
let body = resp.body().await;
let body = resp.bytes().await;
match body {
Ok(bytes) => {
let cached = CachedImage {
@ -202,7 +241,7 @@ async fn fetch_image(
last_modified,
};
let resp = construct_response(&cached);
state.cache.put(key, cached);
state.cache.put(key, cached).await;
return resp;
}
Err(e) => {

119
src/state.rs Normal file
View File

@ -0,0 +1,119 @@
use std::{io::BufReader, sync::Arc};
use crate::cache::Cache;
use crate::ping::{Request, Response, Tls, CONTROL_CENTER_PING_URL};
use crate::Config;
use log::{debug, error, info, warn};
use parking_lot::RwLock;
use rustls::internal::pemfile::{certs, rsa_private_keys};
use rustls::sign::{CertifiedKey, RSASigningKey};
use rustls::ResolvesServerCert;
use sodiumoxide::crypto::box_::PrecomputedKey;
use url::Url;
pub struct ServerState {
pub precomputed_key: PrecomputedKey,
pub image_server: Url,
pub tls_config: Tls,
pub force_tokens: bool,
pub url: String,
pub cache: Cache,
}
impl ServerState {
pub async fn init(config: &Config) -> Result<Self, ()> {
let resp = reqwest::Client::new()
.post(CONTROL_CENTER_PING_URL)
.json(&Request::from(config))
.send()
.await;
match resp {
Ok(resp) => match resp.json::<Response>().await {
Ok(resp) => {
let key = resp
.token_key
.and_then(|key| {
if let Some(key) = base64::decode(&key)
.ok()
.and_then(|k| PrecomputedKey::from_slice(&k))
{
Some(key)
} else {
error!("Failed to parse token key: got {}", key);
None
}
})
.unwrap();
if resp.compromised {
warn!("Got compromised response from control center!");
}
if resp.paused {
debug!("Got paused response from control center.");
}
info!("This client's URL has been set to {}", resp.url);
if resp.force_tokens {
info!("This client will validate tokens");
}
Ok(Self {
precomputed_key: key,
image_server: resp.image_server,
tls_config: resp.tls.unwrap(),
force_tokens: resp.force_tokens,
url: resp.url,
cache: Cache::new(
config.memory_quota,
config.disk_quota,
config.disk_path.clone(),
),
})
}
Err(e) => {
warn!("Got malformed response: {}", e);
Err(())
}
},
Err(e) => match e {
e if e.is_timeout() => {
error!("Response timed out to control server. Is MangaDex down?");
Err(())
}
e => {
warn!("Failed to send request: {}", e);
Err(())
}
},
}
}
}
pub struct RwLockServerState(pub RwLock<ServerState>);
impl ResolvesServerCert for RwLockServerState {
fn resolve(&self, _: rustls::ClientHello) -> Option<CertifiedKey> {
let read_guard = self.0.read();
let priv_key = rsa_private_keys(&mut BufReader::new(
read_guard.tls_config.private_key.as_bytes(),
))
.ok()?
.pop()
.unwrap();
let certs = certs(&mut BufReader::new(
read_guard.tls_config.certificate.as_bytes(),
))
.ok()?;
Some(CertifiedKey {
cert: certs,
key: Arc::new(Box::new(RSASigningKey::new(&priv_key).unwrap())),
ocsp: None,
sct_list: None,
})
}
}

View File

@ -1,3 +1,29 @@
struct StopRequest {
secret: String,
use log::{info, warn};
use serde::Serialize;
const CONTROL_CENTER_STOP_URL: &str = "https://api.mangadex.network/ping";
#[derive(Serialize)]
struct StopRequest<'a> {
secret: &'a str,
}
pub async fn send_stop(secret: &str) {
let request = StopRequest { secret };
let client = reqwest::Client::new();
match client
.post(CONTROL_CENTER_STOP_URL)
.json(&request)
.send()
.await
{
Ok(resp) => {
if resp.status() == 200 {
info!("Successfully sent stop message.");
} else {
warn!("Got weird response from server: {:?}", resp.headers());
}
}
Err(e) => warn!("Got error while sending stop message: {}", e),
}
}