init commit

master
Edward Shen 2022-05-27 15:15:10 -07:00
commit a965314466
Signed by: edward
GPG Key ID: 19182661E818369F
7 changed files with 1795 additions and 0 deletions

2
.cargo/config.toml Normal file
View File

@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

1285
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

23
Cargo.toml Normal file
View File

@ -0,0 +1,23 @@
[package]
name = "koyori"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1"
arrayvec = "0.7"
async-trait = "0.1"
axum = "0.5"
futures = "0.3"
http = "0.2"
once_cell = "1"
prometheus-client = "0.16"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "process", "tracing"] }
hostname = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
console-subscriber = "0.1"

198
src/cpu.rs Normal file
View File

@ -0,0 +1,198 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry;
use serde::Deserialize;
use tokio::process::Command;
use tracing::trace;
use crate::{F64Gauge, Metrics, HOSTNAME};
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
struct Label {
hostname: &'static str,
cpu: String,
}
#[derive(Default)]
pub struct Exporter {
usr: Family<Label, F64Gauge>,
nice: Family<Label, F64Gauge>,
sys: Family<Label, F64Gauge>,
iowait: Family<Label, F64Gauge>,
irq: Family<Label, F64Gauge>,
soft: Family<Label, F64Gauge>,
steal: Family<Label, F64Gauge>,
guest: Family<Label, F64Gauge>,
gnice: Family<Label, F64Gauge>,
idle: Family<Label, F64Gauge>,
}
#[async_trait]
impl Metrics for Exporter {
fn prefix(&self) -> &'static str {
"cpu"
}
async fn should_collect(&self) -> Result<()> {
Command::new("mpstat")
.output()
.await
.context("While checking mpstat")
.map(drop)
}
fn register(&self, sub_registry: &mut Registry) -> Result<()> {
sub_registry.register("usr", "usr time (mpstat)", Box::new(self.usr.clone()));
sub_registry.register("nice", "nice time (mpstat)", Box::new(self.nice.clone()));
sub_registry.register("sys", "sys time (mpstat)", Box::new(self.sys.clone()));
sub_registry.register(
"iowait",
"iowait time (mpstat)",
Box::new(self.iowait.clone()),
);
sub_registry.register("irq", "irq time (mpstat)", Box::new(self.irq.clone()));
sub_registry.register("soft", "soft time (mpstat)", Box::new(self.soft.clone()));
sub_registry.register("steal", "steal time (mpstat)", Box::new(self.steal.clone()));
sub_registry.register("guest", "guest time (mpstat)", Box::new(self.guest.clone()));
sub_registry.register(
"gnice",
"guest nice time (mpstat)",
Box::new(self.gnice.clone()),
);
sub_registry.register("idle ", "idle time (mpstat)", Box::new(self.idle.clone()));
Ok(())
}
async fn collect(&self) -> Result<()> {
trace!("Started");
let json = Command::new("mpstat")
.args(["-o", "JSON", "-P", "ALL", "5", "1"])
.output()
.await
.context("While collecting mpstat")?
.stdout;
let data: MpStatOutput = serde_json::from_slice(&json)?;
let statistics = data
.sysstat
.hosts
.into_iter()
.next()
.context("Getting the first host")?
.statistics;
let cpus = statistics
.into_iter()
.next()
.context("getting first stat measurement")?
.cpu_load;
for cpu in cpus {
let label = Label {
hostname: &HOSTNAME,
cpu: cpu.identifier,
};
self.usr.get_or_create(&label).set(cpu.usr);
self.nice.get_or_create(&label).set(cpu.nice);
self.sys.get_or_create(&label).set(cpu.sys);
self.iowait.get_or_create(&label).set(cpu.iowait);
self.irq.get_or_create(&label).set(cpu.irq);
self.soft.get_or_create(&label).set(cpu.soft);
self.steal.get_or_create(&label).set(cpu.steal);
self.guest.get_or_create(&label).set(cpu.guest);
self.gnice.get_or_create(&label).set(cpu.gnice);
self.idle.get_or_create(&label).set(cpu.idle);
}
trace!("Done");
Ok(())
}
}
#[derive(Deserialize)]
struct MpStatOutput {
sysstat: SysStat,
}
#[derive(Deserialize)]
struct SysStat {
hosts: Vec<Host>,
}
#[derive(Deserialize)]
struct Host {
statistics: Vec<Statistic>,
}
#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
struct Statistic {
cpu_load: Vec<CpuLoad>,
}
#[derive(Deserialize)]
struct CpuLoad {
#[serde(rename = "cpu")]
identifier: String,
usr: f64,
nice: f64,
sys: f64,
iowait: f64,
irq: f64,
soft: f64,
steal: f64,
guest: f64,
gnice: f64,
idle: f64,
}
#[cfg(test)]
mod mp_stat {
use anyhow::Result;
use serde_json::json;
use super::MpStatOutput;
#[test]
fn deserializes() -> Result<(), serde_json::Error> {
let raw = json!({
"sysstat": {
"hosts": [{
"nodename": "kurante",
"sysname": "Linux",
"release": "5.17.7-zen1-1-zen",
"machine": "x86_64",
"number-of-cpus": 16,
"date": "05/27/2022",
"statistics": [{
"timestamp": "12:46:14 PM",
"cpu-load": [
{"cpu": "all", "usr": 3.19, "nice": 0.03, "sys": 0.73, "iowait": 0.08, "irq": 0.14, "soft": 0.08, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 95.77},
{"cpu": "0", "usr": 2.41, "nice": 0.00, "sys": 0.80, "iowait": 0.00, "irq": 0.80, "soft": 0.20, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 95.78},
{"cpu": "1", "usr": 1.20, "nice": 0.20, "sys": 0.80, "iowait": 1.00, "irq": 0.00, "soft": 0.20, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 96.61},
{"cpu": "2", "usr": 3.58, "nice": 0.00, "sys": 1.19, "iowait": 0.20, "irq": 0.60, "soft": 0.40, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 94.04},
{"cpu": "3", "usr": 3.21, "nice": 0.00, "sys": 1.00, "iowait": 0.00, "irq": 0.20, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 95.59},
{"cpu": "4", "usr": 3.62, "nice": 0.00, "sys": 0.40, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 95.98},
{"cpu": "5", "usr": 3.21, "nice": 0.00, "sys": 0.40, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 96.39},
{"cpu": "6", "usr": 4.22, "nice": 0.00, "sys": 0.60, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 95.18},
{"cpu": "7", "usr": 2.81, "nice": 0.00, "sys": 0.40, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 96.79},
{"cpu": "8", "usr": 2.01, "nice": 0.00, "sys": 0.40, "iowait": 0.00, "irq": 0.20, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 97.38},
{"cpu": "9", "usr": 2.18, "nice": 0.00, "sys": 1.39, "iowait": 0.00, "irq": 0.20, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 96.23},
{"cpu": "10", "usr": 2.80, "nice": 0.00, "sys": 0.80, "iowait": 0.00, "irq": 0.00, "soft": 0.20, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 96.20},
{"cpu": "11", "usr": 5.18, "nice": 0.00, "sys": 1.00, "iowait": 0.00, "irq": 0.20, "soft": 0.20, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 93.43},
{"cpu": "12", "usr": 2.61, "nice": 0.00, "sys": 0.40, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 96.99},
{"cpu": "13", "usr": 4.83, "nice": 0.00, "sys": 0.40, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 94.77},
{"cpu": "14", "usr": 1.79, "nice": 0.20, "sys": 0.80, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 97.21},
{"cpu": "15", "usr": 5.41, "nice": 0.00, "sys": 0.80, "iowait": 0.00, "irq": 0.00, "soft": 0.00, "steal": 0.00, "guest": 0.00, "gnice": 0.00, "idle": 93.79}
]
}]
}]
}
});
serde_json::from_value::<MpStatOutput>(raw).map(drop)
}
}

178
src/main.rs Normal file
View File

@ -0,0 +1,178 @@
#![warn(clippy::pedantic, clippy::nursery)]
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use async_trait::async_trait;
use axum::body::Full;
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use axum::{Extension, Router, Server};
use futures::future::join_all;
use futures::stream::iter;
use futures::StreamExt;
use http::header::CONTENT_TYPE;
use once_cell::sync::Lazy;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use tokio::select;
use tokio::sync::Notify;
use tokio::time::Instant;
use tracing::{debug, error, info};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};
mod cpu;
mod zfs;
pub static HOSTNAME: Lazy<String> = Lazy::new(|| {
hostname::get()
.expect("to get the hostname")
.into_string()
.expect("hostname to be valid utf-8")
});
type MetricClient = dyn Metrics + Send + Sync;
type U64Gauge = Gauge<u64, AtomicU64>;
type U32Gauge = Gauge<u32, AtomicU32>;
type F64Gauge = Gauge<f64, AtomicU64>;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
// .with(console_subscriber::spawn())
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
let mut registry = Registry::default();
let namespaced_registry = registry.sub_registry_with_prefix("koyori");
let collectors: Vec<Box<MetricClient>> = iter([
Box::new(zfs::Exporter::default()) as Box<MetricClient>,
Box::new(cpu::Exporter::default()),
])
.filter_map(|collector| async {
if let Err(e) = collector.should_collect().await {
error!("Not collecting {}: {e}", collector.prefix());
None
} else {
info!("{} collector enabled.", collector.prefix());
Some(collector)
}
})
.collect()
.await;
info!("Started with {} collectors", collectors.len());
for collector in &collectors {
collector.register(namespaced_registry.sub_registry_with_prefix(collector.prefix()))?;
}
let stop_signal = Arc::new(Notify::new());
let stop_signal_listener = Arc::clone(&stop_signal);
let stop_signal_bool = Arc::new(AtomicBool::new(false));
let exporter_fut = tokio::spawn(periodically_collect(
collectors,
Arc::clone(&stop_signal),
Arc::clone(&stop_signal_bool),
));
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("Sending stop signal...");
stop_signal.notify_waiters();
stop_signal_bool.store(true, Ordering::Release);
});
let registry = Arc::new(registry);
let app = Router::new()
.route("/metrics", get(|registry| async { metrics(registry) }))
.layer(Extension(registry));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
info!("Listening on {}", addr);
let server_fut = Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async move {
stop_signal_listener.notified().await;
info!("Stopping server...");
});
if let Err(e) = server_fut.await {
error!("An error occurred while starting the server: {e}");
}
match exporter_fut.await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => {
error!("An error occurred while collecting metrics: {e}");
Err(e)
}
Err(e) => {
error!("The exporter task panicked: {e}");
Err(e).context("While running the exporter task")
}
}
}
async fn periodically_collect(
collectors: Vec<Box<MetricClient>>,
stop_signal: Arc<Notify>,
stop_signal_bool: Arc<AtomicBool>,
) -> Result<()> {
while !stop_signal_bool.load(Ordering::Acquire) {
// Ensure a minimum 5s interval between collections
let wait_until = Instant::now() + Duration::from_secs(5);
debug!("Running collectors...");
let collectors_fut = join_all(collectors.iter().map(|c| c.collect()));
// Wait on the collectors to complete, or stop immediately if a stop
// signal is received.
let collector_results = select! {
_ = stop_signal.notified() => break,
res = collectors_fut => res,
};
for res in collector_results {
if let Err(e) = res {
error!("Error while collecting metrics: {e}");
}
}
tokio::time::sleep_until(wait_until).await;
}
Ok::<_, anyhow::Error>(())
}
fn metrics(Extension(registry): Extension<Arc<Registry>>) -> impl IntoResponse {
let mut encoded = Vec::new();
encode(&mut encoded, &registry).unwrap();
Response::builder()
.header(
CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(Full::from(encoded))
.unwrap()
}
#[async_trait]
trait Metrics {
fn prefix(&self) -> &'static str;
async fn should_collect(&self) -> Result<()>;
fn register(&self, sub_registry: &mut Registry) -> Result<()>;
async fn collect(&self) -> Result<()>;
}

108
src/zfs.rs Normal file
View File

@ -0,0 +1,108 @@
use anyhow::{Context, Result};
use arrayvec::ArrayVec;
use async_trait::async_trait;
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry;
use tokio::process::Command;
use tracing::trace;
use crate::{F64Gauge, Metrics, U32Gauge, U64Gauge, HOSTNAME};
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
struct Label {
hostname: &'static str,
zpool: String,
}
#[derive(Default)]
pub struct Exporter {
size: Family<Label, U64Gauge>,
allocated: Family<Label, U64Gauge>,
free: Family<Label, U64Gauge>,
fragmentation: Family<Label, U64Gauge>,
capacity: Family<Label, U64Gauge>,
dedupe: Family<Label, F64Gauge>,
health: Family<Label, U32Gauge>,
}
#[async_trait]
impl Metrics for Exporter {
fn prefix(&self) -> &'static str {
"zfs"
}
async fn should_collect(&self) -> Result<()> {
Command::new("zpool")
.arg("list")
.output()
.await
.context("While checking zpool list")
.map(drop)
}
fn register(&self, sub_registry: &mut Registry) -> Result<()> {
sub_registry.register("size", "zpool size", Box::new(self.size.clone()));
sub_registry.register(
"allocated",
"zpool allocated",
Box::new(self.allocated.clone()),
);
sub_registry.register("free", "zpool free", Box::new(self.free.clone()));
sub_registry.register(
"fragmentation",
"zpool fragmentation",
Box::new(self.fragmentation.clone()),
);
sub_registry.register(
"capacity",
"zpool capacity",
Box::new(self.capacity.clone()),
);
sub_registry.register("dedupe", "zpool dedupe", Box::new(self.dedupe.clone()));
sub_registry.register("health", "zpool health", Box::new(self.health.clone()));
Ok(())
}
async fn collect(&self) -> Result<()> {
trace!("Started");
let zpool = Command::new("zpool")
.args([
"list",
"-H",
"-p",
"-o",
"name,size,alloc,free,frag,cap,dedup,health",
])
.output()
.await?;
for string_data in String::from_utf8(zpool.stdout)?.lines() {
let mut info = string_data.split_whitespace();
let zpool_name = info.next().context("getting the zpool name")?;
let [size, alloc, free, frag, cap, dedup, health] = info
.collect::<ArrayVec<_, 7>>()
.into_inner()
.map_err(|_| anyhow::anyhow!("parsing zpool info"))?;
let label = Label {
hostname: &*HOSTNAME,
zpool: zpool_name.to_string(),
};
self.size.get_or_create(&label).set(size.parse()?);
self.allocated.get_or_create(&label).set(alloc.parse()?);
self.free.get_or_create(&label).set(free.parse()?);
self.fragmentation.get_or_create(&label).set(frag.parse()?);
self.capacity.get_or_create(&label).set(cap.parse()?);
self.dedupe.get_or_create(&label).set(dedup.parse()?);
self.health
.get_or_create(&label)
.set(if health == "ONLINE" { 0 } else { 1 });
}
trace!("Done");
Ok(())
}
}