koyori/src/main.rs

179 lines
5.3 KiB
Rust

#![warn(clippy::pedantic, clippy::nursery, clippy::cargo)]
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use async_trait::async_trait;
use axum::body::Full;
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use axum::{Extension, Router, Server};
use futures::future::join_all;
use futures::stream::iter;
use futures::StreamExt;
use http::header::CONTENT_TYPE;
use once_cell::sync::Lazy;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use tokio::select;
use tokio::sync::Notify;
use tokio::time::Instant;
use tracing::{debug, error, info};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};
mod cpu;
mod zfs;
pub static HOSTNAME: Lazy<String> = Lazy::new(|| {
hostname::get()
.expect("to get the hostname")
.into_string()
.expect("hostname to be valid utf-8")
});
type MetricClient = dyn Metrics + Send + Sync;
type U64Gauge = Gauge<u64, AtomicU64>;
type U32Gauge = Gauge<u32, AtomicU32>;
type F64Gauge = Gauge<f64, AtomicU64>;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
// .with(console_subscriber::spawn())
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
let mut registry = Registry::default();
let namespaced_registry = registry.sub_registry_with_prefix("koyori");
let collectors: Vec<Box<MetricClient>> = iter([
Box::new(zfs::Exporter::default()) as Box<MetricClient>,
Box::new(cpu::Exporter::default()),
])
.filter_map(|collector| async {
if let Err(e) = collector.should_collect().await {
error!("Not collecting {}: {e}", collector.prefix());
None
} else {
info!("{} collector enabled.", collector.prefix());
Some(collector)
}
})
.collect()
.await;
info!("Started with {} collectors", collectors.len());
for collector in &collectors {
collector.register(namespaced_registry.sub_registry_with_prefix(collector.prefix()))?;
}
let stop_signal = Arc::new(Notify::new());
let stop_signal_listener = Arc::clone(&stop_signal);
let stop_signal_bool = Arc::new(AtomicBool::new(false));
let exporter_fut = tokio::spawn(periodically_collect(
collectors,
Arc::clone(&stop_signal),
Arc::clone(&stop_signal_bool),
));
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("Sending stop signal...");
stop_signal.notify_waiters();
stop_signal_bool.store(true, Ordering::Release);
});
let registry = Arc::new(registry);
let app = Router::new()
.route("/metrics", get(|registry| async { metrics(registry) }))
.layer(Extension(registry));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
info!("Listening on {}", addr);
let server_fut = Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async move {
stop_signal_listener.notified().await;
info!("Stopping server...");
});
if let Err(e) = server_fut.await {
error!("An error occurred while starting the server: {e}");
}
match exporter_fut.await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => {
error!("An error occurred while collecting metrics: {e}");
Err(e)
}
Err(e) => {
error!("The exporter task panicked: {e}");
Err(e).context("While running the exporter task")
}
}
}
async fn periodically_collect(
collectors: Vec<Box<MetricClient>>,
stop_signal: Arc<Notify>,
stop_signal_bool: Arc<AtomicBool>,
) -> Result<()> {
while !stop_signal_bool.load(Ordering::Acquire) {
// Ensure a minimum 5s interval between collections
let wait_until = Instant::now() + Duration::from_secs(5);
debug!("Running collectors...");
let collectors_fut = join_all(collectors.iter().map(|c| c.collect()));
// Wait on the collectors to complete, or stop immediately if a stop
// signal is received.
let collector_results = select! {
_ = stop_signal.notified() => break,
res = collectors_fut => res,
};
for res in collector_results {
if let Err(e) = res {
error!("Error while collecting metrics: {e}");
}
}
tokio::time::sleep_until(wait_until).await;
}
Ok::<_, anyhow::Error>(())
}
fn metrics(Extension(registry): Extension<Arc<Registry>>) -> impl IntoResponse {
let mut encoded = Vec::new();
encode(&mut encoded, &registry).unwrap();
Response::builder()
.header(
CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(Full::from(encoded))
.unwrap()
}
#[async_trait]
trait Metrics {
fn prefix(&self) -> &'static str;
async fn should_collect(&self) -> Result<()>;
fn register(&self, sub_registry: &mut Registry) -> Result<()>;
async fn collect(&self) -> Result<()>;
}