179 lines
5.3 KiB
Rust
179 lines
5.3 KiB
Rust
#![warn(clippy::pedantic, clippy::nursery, clippy::cargo)]
|
|
|
|
use std::net::SocketAddr;
|
|
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::{Context, Result};
|
|
use async_trait::async_trait;
|
|
use axum::body::Full;
|
|
use axum::response::{IntoResponse, Response};
|
|
use axum::routing::get;
|
|
use axum::{Extension, Router, Server};
|
|
use futures::future::join_all;
|
|
use futures::stream::iter;
|
|
use futures::StreamExt;
|
|
use http::header::CONTENT_TYPE;
|
|
use once_cell::sync::Lazy;
|
|
use prometheus_client::encoding::text::encode;
|
|
use prometheus_client::metrics::gauge::Gauge;
|
|
use prometheus_client::registry::Registry;
|
|
use tokio::select;
|
|
use tokio::sync::Notify;
|
|
use tokio::time::Instant;
|
|
use tracing::{debug, error, info};
|
|
use tracing_subscriber::layer::SubscriberExt;
|
|
use tracing_subscriber::util::SubscriberInitExt;
|
|
use tracing_subscriber::{fmt, EnvFilter};
|
|
|
|
mod cpu;
|
|
mod zfs;
|
|
|
|
pub static HOSTNAME: Lazy<String> = Lazy::new(|| {
|
|
hostname::get()
|
|
.expect("to get the hostname")
|
|
.into_string()
|
|
.expect("hostname to be valid utf-8")
|
|
});
|
|
|
|
type MetricClient = dyn Metrics + Send + Sync;
|
|
|
|
type U64Gauge = Gauge<u64, AtomicU64>;
|
|
type U32Gauge = Gauge<u32, AtomicU32>;
|
|
type F64Gauge = Gauge<f64, AtomicU64>;
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
tracing_subscriber::registry()
|
|
// .with(console_subscriber::spawn())
|
|
.with(fmt::layer())
|
|
.with(EnvFilter::from_default_env())
|
|
.init();
|
|
|
|
let mut registry = Registry::default();
|
|
let namespaced_registry = registry.sub_registry_with_prefix("koyori");
|
|
|
|
let collectors: Vec<Box<MetricClient>> = iter([
|
|
Box::new(zfs::Exporter::default()) as Box<MetricClient>,
|
|
Box::new(cpu::Exporter::default()),
|
|
])
|
|
.filter_map(|collector| async {
|
|
if let Err(e) = collector.should_collect().await {
|
|
error!("Not collecting {}: {e}", collector.prefix());
|
|
None
|
|
} else {
|
|
info!("{} collector enabled.", collector.prefix());
|
|
Some(collector)
|
|
}
|
|
})
|
|
.collect()
|
|
.await;
|
|
|
|
info!("Started with {} collectors", collectors.len());
|
|
|
|
for collector in &collectors {
|
|
collector.register(namespaced_registry.sub_registry_with_prefix(collector.prefix()))?;
|
|
}
|
|
|
|
let stop_signal = Arc::new(Notify::new());
|
|
let stop_signal_listener = Arc::clone(&stop_signal);
|
|
let stop_signal_bool = Arc::new(AtomicBool::new(false));
|
|
let exporter_fut = tokio::spawn(periodically_collect(
|
|
collectors,
|
|
Arc::clone(&stop_signal),
|
|
Arc::clone(&stop_signal_bool),
|
|
));
|
|
|
|
tokio::spawn(async move {
|
|
tokio::signal::ctrl_c().await.unwrap();
|
|
info!("Sending stop signal...");
|
|
stop_signal.notify_waiters();
|
|
stop_signal_bool.store(true, Ordering::Release);
|
|
});
|
|
|
|
let registry = Arc::new(registry);
|
|
|
|
let app = Router::new()
|
|
.route("/metrics", get(|registry| async { metrics(registry) }))
|
|
.layer(Extension(registry));
|
|
|
|
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
|
|
info!("Listening on {}", addr);
|
|
let server_fut = Server::bind(&addr)
|
|
.serve(app.into_make_service())
|
|
.with_graceful_shutdown(async move {
|
|
stop_signal_listener.notified().await;
|
|
info!("Stopping server...");
|
|
});
|
|
|
|
if let Err(e) = server_fut.await {
|
|
error!("An error occurred while starting the server: {e}");
|
|
}
|
|
|
|
match exporter_fut.await {
|
|
Ok(Ok(_)) => Ok(()),
|
|
Ok(Err(e)) => {
|
|
error!("An error occurred while collecting metrics: {e}");
|
|
Err(e)
|
|
}
|
|
Err(e) => {
|
|
error!("The exporter task panicked: {e}");
|
|
Err(e).context("While running the exporter task")
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn periodically_collect(
|
|
collectors: Vec<Box<MetricClient>>,
|
|
stop_signal: Arc<Notify>,
|
|
stop_signal_bool: Arc<AtomicBool>,
|
|
) -> Result<()> {
|
|
while !stop_signal_bool.load(Ordering::Acquire) {
|
|
// Ensure a minimum 5s interval between collections
|
|
let wait_until = Instant::now() + Duration::from_secs(5);
|
|
debug!("Running collectors...");
|
|
|
|
let collectors_fut = join_all(collectors.iter().map(|c| c.collect()));
|
|
|
|
// Wait on the collectors to complete, or stop immediately if a stop
|
|
// signal is received.
|
|
let collector_results = select! {
|
|
_ = stop_signal.notified() => break,
|
|
res = collectors_fut => res,
|
|
};
|
|
|
|
for res in collector_results {
|
|
if let Err(e) = res {
|
|
error!("Error while collecting metrics: {e}");
|
|
}
|
|
}
|
|
tokio::time::sleep_until(wait_until).await;
|
|
}
|
|
|
|
Ok::<_, anyhow::Error>(())
|
|
}
|
|
|
|
fn metrics(Extension(registry): Extension<Arc<Registry>>) -> impl IntoResponse {
|
|
let mut encoded = Vec::new();
|
|
encode(&mut encoded, ®istry).unwrap();
|
|
Response::builder()
|
|
.header(
|
|
CONTENT_TYPE,
|
|
"application/openmetrics-text; version=1.0.0; charset=utf-8",
|
|
)
|
|
.body(Full::from(encoded))
|
|
.unwrap()
|
|
}
|
|
|
|
#[async_trait]
|
|
trait Metrics {
|
|
fn prefix(&self) -> &'static str;
|
|
|
|
async fn should_collect(&self) -> Result<()>;
|
|
|
|
fn register(&self, sub_registry: &mut Registry) -> Result<()>;
|
|
|
|
async fn collect(&self) -> Result<()>;
|
|
}
|