#![warn(clippy::pedantic, clippy::nursery, clippy::cargo)] use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; use async_trait::async_trait; use axum::body::Full; use axum::response::{IntoResponse, Response}; use axum::routing::get; use axum::{Extension, Router, Server}; use futures::future::join_all; use futures::stream::iter; use futures::StreamExt; use http::header::CONTENT_TYPE; use once_cell::sync::Lazy; use prometheus_client::encoding::text::encode; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::registry::Registry; use tokio::select; use tokio::sync::Notify; use tokio::time::Instant; use tracing::{debug, error, info}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{fmt, EnvFilter}; mod cpu; mod zfs; pub static HOSTNAME: Lazy = Lazy::new(|| { hostname::get() .expect("to get the hostname") .into_string() .expect("hostname to be valid utf-8") }); type MetricClient = dyn Metrics + Send + Sync; type U64Gauge = Gauge; type U32Gauge = Gauge; type F64Gauge = Gauge; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::registry() // .with(console_subscriber::spawn()) .with(fmt::layer()) .with(EnvFilter::from_default_env()) .init(); let mut registry = Registry::default(); let namespaced_registry = registry.sub_registry_with_prefix("koyori"); let collectors: Vec> = iter([ Box::new(zfs::Exporter::default()) as Box, Box::new(cpu::Exporter::default()), ]) .filter_map(|collector| async { if let Err(e) = collector.should_collect().await { error!("Not collecting {}: {e}", collector.prefix()); None } else { info!("{} collector enabled.", collector.prefix()); Some(collector) } }) .collect() .await; info!("Started with {} collectors", collectors.len()); for collector in &collectors { collector.register(namespaced_registry.sub_registry_with_prefix(collector.prefix()))?; } let stop_signal = Arc::new(Notify::new()); let stop_signal_listener = Arc::clone(&stop_signal); let stop_signal_bool = Arc::new(AtomicBool::new(false)); let exporter_fut = tokio::spawn(periodically_collect( collectors, Arc::clone(&stop_signal), Arc::clone(&stop_signal_bool), )); tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); info!("Sending stop signal..."); stop_signal.notify_waiters(); stop_signal_bool.store(true, Ordering::Release); }); let registry = Arc::new(registry); let app = Router::new() .route("/metrics", get(|registry| async { metrics(registry) })) .layer(Extension(registry)); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); info!("Listening on {}", addr); let server_fut = Server::bind(&addr) .serve(app.into_make_service()) .with_graceful_shutdown(async move { stop_signal_listener.notified().await; info!("Stopping server..."); }); if let Err(e) = server_fut.await { error!("An error occurred while starting the server: {e}"); } match exporter_fut.await { Ok(Ok(_)) => Ok(()), Ok(Err(e)) => { error!("An error occurred while collecting metrics: {e}"); Err(e) } Err(e) => { error!("The exporter task panicked: {e}"); Err(e).context("While running the exporter task") } } } async fn periodically_collect( collectors: Vec>, stop_signal: Arc, stop_signal_bool: Arc, ) -> Result<()> { while !stop_signal_bool.load(Ordering::Acquire) { // Ensure a minimum 5s interval between collections let wait_until = Instant::now() + Duration::from_secs(5); debug!("Running collectors..."); let collectors_fut = join_all(collectors.iter().map(|c| c.collect())); // Wait on the collectors to complete, or stop immediately if a stop // signal is received. let collector_results = select! { _ = stop_signal.notified() => break, res = collectors_fut => res, }; for res in collector_results { if let Err(e) = res { error!("Error while collecting metrics: {e}"); } } tokio::time::sleep_until(wait_until).await; } Ok::<_, anyhow::Error>(()) } fn metrics(Extension(registry): Extension>) -> impl IntoResponse { let mut encoded = Vec::new(); encode(&mut encoded, ®istry).unwrap(); Response::builder() .header( CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8", ) .body(Full::from(encoded)) .unwrap() } #[async_trait] trait Metrics { fn prefix(&self) -> &'static str; async fn should_collect(&self) -> Result<()>; fn register(&self, sub_registry: &mut Registry) -> Result<()>; async fn collect(&self) -> Result<()>; }