From 24eff63a5e777e0c9f546e57810defcbe7996201 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Sat, 30 Oct 2021 23:38:20 -0700 Subject: [PATCH] Add support for querying db via SIGUSR1 --- Cargo.lock | 34 ++++++++++++++++++++++++++++++++++ server/Cargo.toml | 3 +++ server/src/main.rs | 25 +++++++++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0420f2d..84270dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -987,12 +987,15 @@ dependencies = [ "bincode", "bytes", "chrono", + "futures", "headers", "lazy_static", "omegaupload-common", "rand", "rocksdb", "serde", + "signal-hook", + "signal-hook-tokio", "tokio", "tower-http", "tracing", @@ -1453,6 +1456,37 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" +[[package]] +name = "signal-hook" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "signal-hook-tokio" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6c5d32165ff8b94e68e7b3bdecb1b082e958c22434b363482cfb89dcd6f3ff8" +dependencies = [ + "futures-core", + "libc", + "signal-hook", + "tokio", +] + [[package]] name = "slab" version = "0.4.5" diff --git a/server/Cargo.toml b/server/Cargo.toml index 00ee5fb..21900dd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,12 +14,15 @@ bincode = "1" # to enable the feature bytes = { version = "*", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } +futures = "0.3" # We just need to pull in whatever axum is pulling in headers = "*" lazy_static = "1" rand = "0.8" rocksdb = { version = "0.17", default_features = false, features = ["zstd"] } serde = { version = "1", features = ["derive"] } +signal-hook = "0.3" +signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tower-http = { version = "0.1", features = ["fs"] } tracing = { version = "0.1" } diff --git a/server/src/main.rs b/server/src/main.rs index 4ab7189..1e1be31 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -13,6 +13,7 @@ use axum::http::StatusCode; use axum::response::Html; use axum::{service, AddExtensionLayer, Router}; use chrono::Utc; +use futures::stream::StreamExt; use headers::HeaderMap; use lazy_static::lazy_static; use omegaupload_common::{Expiration, API_ENDPOINT}; @@ -20,6 +21,8 @@ use rand::thread_rng; use rand::Rng; use rocksdb::{ColumnFamilyDescriptor, IteratorMode}; use rocksdb::{Options, DB}; +use signal_hook::consts::SIGUSR1; +use signal_hook_tokio::Signals; use tokio::task; use tower_http::services::ServeDir; use tracing::{error, instrument, trace}; @@ -58,6 +61,10 @@ async fn main() -> Result<()> { set_up_expirations(&db); + let signals = Signals::new(&[SIGUSR1])?; + let signals_handle = signals.handle(); + let signals_task = tokio::spawn(handle_signals(signals, Arc::clone(&db))); + let root_service = service::get(ServeDir::new("static")) .handle_error(|_| Ok::<_, Infallible>(StatusCode::NOT_FOUND)); @@ -81,6 +88,9 @@ async fn main() -> Result<()> { // Must be called for correct shutdown DB::destroy(&Options::default(), PASTE_DB_PATH)?; + + signals_handle.close(); + signals_task.await?; Ok(()) } @@ -149,6 +159,21 @@ fn set_up_expirations(db: &Arc) { info!("Cleanup timers have been initialized."); } +async fn handle_signals(mut signals: Signals, db: Arc) { + while let Some(signal) = signals.next().await { + match signal { + SIGUSR1 => { + let meta_cf = db.cf_handle(META_CF_NAME).unwrap(); + info!( + "Active paste count: {}", + db.iterator_cf(meta_cf, IteratorMode::Start).count() + ); + } + _ => (), + } + } +} + #[instrument(skip(db, body), err)] async fn upload( Extension(db): Extension>,