Add prometheus endpoint
This commit is contained in:
parent
96a6b76baa
commit
79f73ed68e
5 changed files with 105 additions and 1 deletions
38
Cargo.lock
generated
38
Cargo.lock
generated
|
@ -1051,6 +1051,7 @@ dependencies = [
|
||||||
"lru",
|
"lru",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
|
"prometheus",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rustls",
|
"rustls",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -1329,6 +1330,43 @@ dependencies = [
|
||||||
"unicode-xid",
|
"unicode-xid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "procfs"
|
||||||
|
version = "0.9.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ab8809e0c18450a2db0f236d2a44ec0b4c1412d0eb936233579f0990faa5d5cd"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"byteorder",
|
||||||
|
"flate2",
|
||||||
|
"hex",
|
||||||
|
"lazy_static",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prometheus"
|
||||||
|
version = "0.12.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"fnv",
|
||||||
|
"lazy_static",
|
||||||
|
"libc",
|
||||||
|
"memchr",
|
||||||
|
"parking_lot",
|
||||||
|
"procfs",
|
||||||
|
"protobuf",
|
||||||
|
"thiserror",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protobuf"
|
||||||
|
version = "2.23.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "45604fc7a88158e7d514d8e22e14ac746081e7a70d7690074dd0029ee37458d6"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quote"
|
name = "quote"
|
||||||
version = "1.0.9"
|
version = "1.0.9"
|
||||||
|
|
|
@ -30,6 +30,7 @@ log = "0.4"
|
||||||
lfu_cache = "1"
|
lfu_cache = "1"
|
||||||
lru = "0.6"
|
lru = "0.6"
|
||||||
parking_lot = "0.11"
|
parking_lot = "0.11"
|
||||||
|
prometheus = { version = "0.12", features = [ "process" ] }
|
||||||
reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] }
|
reqwest = { version = "0.11", default_features = false, features = [ "json", "stream", "rustls-tls" ] }
|
||||||
rustls = "0.19"
|
rustls = "0.19"
|
||||||
serde = "1"
|
serde = "1"
|
||||||
|
|
|
@ -34,6 +34,7 @@ use crate::state::DynamicServerCert;
|
||||||
|
|
||||||
mod cache;
|
mod cache;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod metrics;
|
||||||
mod ping;
|
mod ping;
|
||||||
mod routes;
|
mod routes;
|
||||||
mod state;
|
mod state;
|
||||||
|
@ -102,6 +103,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
ENCRYPTION_KEY.set(gen_key()).unwrap();
|
ENCRYPTION_KEY.set(gen_key()).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HTTP Server init
|
||||||
|
|
||||||
let server = ServerState::init(&client_secret, &cli_args).await?;
|
let server = ServerState::init(&client_secret, &cli_args).await?;
|
||||||
let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
|
let data_0 = Arc::new(RwLockServerState(RwLock::new(server)));
|
||||||
let data_1 = Arc::clone(&data_0);
|
let data_1 = Arc::clone(&data_0);
|
||||||
|
@ -161,6 +164,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
App::new()
|
App::new()
|
||||||
.service(routes::token_data)
|
.service(routes::token_data)
|
||||||
|
.service(routes::metrics)
|
||||||
.service(routes::token_data_saver)
|
.service(routes::token_data_saver)
|
||||||
.route("{tail:.*}", web::get().to(routes::default))
|
.route("{tail:.*}", web::get().to(routes::default))
|
||||||
.app_data(Data::from(Arc::clone(&data_1)))
|
.app_data(Data::from(Arc::clone(&data_1)))
|
||||||
|
|
36
src/metrics.rs
Normal file
36
src/metrics.rs
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use prometheus::{register_int_counter, IntCounter};
|
||||||
|
|
||||||
|
pub static CACHE_HIT_COUNTER: Lazy<IntCounter> =
|
||||||
|
Lazy::new(|| register_int_counter!("cache.hit", "The number of cache hits").unwrap());
|
||||||
|
|
||||||
|
pub static CACHE_MISS_COUNTER: Lazy<IntCounter> =
|
||||||
|
Lazy::new(|| register_int_counter!("cache.miss", "The number of cache misses").unwrap());
|
||||||
|
|
||||||
|
pub static REQUESTS_TOTAL_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
register_int_counter!("requests.total", "The total number of requests served.").unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static REQUESTS_DATA_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
register_int_counter!(
|
||||||
|
"requests.data",
|
||||||
|
"The number of requests served from the /data endpoint."
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static REQUESTS_DATA_SAVER_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
register_int_counter!(
|
||||||
|
"requests.data-saver",
|
||||||
|
"The number of requests served from the /data-saver endpoint."
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static REQUESTS_OTHER_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
register_int_counter!(
|
||||||
|
"requests.other",
|
||||||
|
"The total number of request not served by primary endpoints."
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
});
|
|
@ -13,6 +13,7 @@ use chrono::{DateTime, Utc};
|
||||||
use futures::{Stream, TryStreamExt};
|
use futures::{Stream, TryStreamExt};
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
|
use prometheus::{Encoder, TextEncoder};
|
||||||
use reqwest::{Client, StatusCode};
|
use reqwest::{Client, StatusCode};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
|
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
|
||||||
|
@ -21,6 +22,10 @@ use thiserror::Error;
|
||||||
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
|
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
|
||||||
use crate::client_api_version;
|
use crate::client_api_version;
|
||||||
use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS};
|
use crate::config::{SEND_SERVER_VERSION, VALIDATE_TOKENS};
|
||||||
|
use crate::metrics::{
|
||||||
|
CACHE_HIT_COUNTER, CACHE_MISS_COUNTER, REQUESTS_DATA_COUNTER, REQUESTS_DATA_SAVER_COUNTER,
|
||||||
|
REQUESTS_OTHER_COUNTER, REQUESTS_TOTAL_COUNTER,
|
||||||
|
};
|
||||||
use crate::state::RwLockServerState;
|
use crate::state::RwLockServerState;
|
||||||
|
|
||||||
pub const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false);
|
pub const BASE64_CONFIG: base64::Config = base64::Config::new(base64::CharacterSet::UrlSafe, false);
|
||||||
|
@ -46,7 +51,10 @@ impl Responder for ServerResponse {
|
||||||
fn respond_to(self, req: &HttpRequest) -> HttpResponse {
|
fn respond_to(self, req: &HttpRequest) -> HttpResponse {
|
||||||
match self {
|
match self {
|
||||||
Self::TokenValidationError(e) => e.respond_to(req),
|
Self::TokenValidationError(e) => e.respond_to(req),
|
||||||
Self::HttpResponse(resp) => resp.respond_to(req),
|
Self::HttpResponse(resp) => {
|
||||||
|
REQUESTS_TOTAL_COUNTER.inc();
|
||||||
|
resp.respond_to(req)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -58,6 +66,7 @@ async fn token_data(
|
||||||
cache: Data<dyn Cache>,
|
cache: Data<dyn Cache>,
|
||||||
path: Path<(String, String, String)>,
|
path: Path<(String, String, String)>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
|
REQUESTS_DATA_COUNTER.inc();
|
||||||
let (token, chapter_hash, file_name) = path.into_inner();
|
let (token, chapter_hash, file_name) = path.into_inner();
|
||||||
if VALIDATE_TOKENS.load(Ordering::Acquire) {
|
if VALIDATE_TOKENS.load(Ordering::Acquire) {
|
||||||
if let Err(e) = validate_token(&state.0.read().precomputed_key, token, &chapter_hash) {
|
if let Err(e) = validate_token(&state.0.read().precomputed_key, token, &chapter_hash) {
|
||||||
|
@ -74,6 +83,7 @@ async fn token_data_saver(
|
||||||
cache: Data<dyn Cache>,
|
cache: Data<dyn Cache>,
|
||||||
path: Path<(String, String, String)>,
|
path: Path<(String, String, String)>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
|
REQUESTS_DATA_SAVER_COUNTER.inc();
|
||||||
let (token, chapter_hash, file_name) = path.into_inner();
|
let (token, chapter_hash, file_name) = path.into_inner();
|
||||||
if VALIDATE_TOKENS.load(Ordering::Acquire) {
|
if VALIDATE_TOKENS.load(Ordering::Acquire) {
|
||||||
if let Err(e) = validate_token(&state.0.read().precomputed_key, token, &chapter_hash) {
|
if let Err(e) = validate_token(&state.0.read().precomputed_key, token, &chapter_hash) {
|
||||||
|
@ -86,6 +96,7 @@ async fn token_data_saver(
|
||||||
|
|
||||||
#[allow(clippy::future_not_send)]
|
#[allow(clippy::future_not_send)]
|
||||||
pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl Responder {
|
pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl Responder {
|
||||||
|
REQUESTS_OTHER_COUNTER.inc();
|
||||||
let path = &format!(
|
let path = &format!(
|
||||||
"{}{}",
|
"{}{}",
|
||||||
state.0.read().image_server,
|
state.0.read().image_server,
|
||||||
|
@ -109,6 +120,17 @@ pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl R
|
||||||
ServerResponse::HttpResponse(resp_builder.body(resp.bytes().await.unwrap_or_default()))
|
ServerResponse::HttpResponse(resp_builder.body(resp.bytes().await.unwrap_or_default()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::future_not_send)]
|
||||||
|
#[get("/metrics")]
|
||||||
|
pub async fn metrics() -> impl Responder {
|
||||||
|
let metric_families = prometheus::gather();
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
TextEncoder::new()
|
||||||
|
.encode(&metric_families, &mut buffer)
|
||||||
|
.unwrap();
|
||||||
|
String::from_utf8(buffer).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
enum TokenValidationError {
|
enum TokenValidationError {
|
||||||
#[error("Failed to decode base64 token.")]
|
#[error("Failed to decode base64 token.")]
|
||||||
|
@ -198,6 +220,7 @@ async fn fetch_image(
|
||||||
|
|
||||||
match cache.get(&key).await {
|
match cache.get(&key).await {
|
||||||
Some(Ok((image, metadata))) => {
|
Some(Ok((image, metadata))) => {
|
||||||
|
CACHE_HIT_COUNTER.inc();
|
||||||
return construct_response(image, &metadata);
|
return construct_response(image, &metadata);
|
||||||
}
|
}
|
||||||
Some(Err(_)) => {
|
Some(Err(_)) => {
|
||||||
|
@ -206,6 +229,8 @@ async fn fetch_image(
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CACHE_MISS_COUNTER.inc();
|
||||||
|
|
||||||
// It's important to not get a write lock before this request, else we're
|
// It's important to not get a write lock before this request, else we're
|
||||||
// holding the read lock until the await resolves.
|
// holding the read lock until the await resolves.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue