Add support for querying db via SIGUSR1
This commit is contained in:
parent
9a85d13bd9
commit
24eff63a5e
3 changed files with 62 additions and 0 deletions
34
Cargo.lock
generated
34
Cargo.lock
generated
|
@ -987,12 +987,15 @@ dependencies = [
|
||||||
"bincode",
|
"bincode",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"futures",
|
||||||
"headers",
|
"headers",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"omegaupload-common",
|
"omegaupload-common",
|
||||||
"rand",
|
"rand",
|
||||||
"rocksdb",
|
"rocksdb",
|
||||||
"serde",
|
"serde",
|
||||||
|
"signal-hook",
|
||||||
|
"signal-hook-tokio",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower-http",
|
"tower-http",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -1453,6 +1456,37 @@ version = "1.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
|
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "signal-hook"
|
||||||
|
version = "0.3.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"signal-hook-registry",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "signal-hook-registry"
|
||||||
|
version = "1.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "signal-hook-tokio"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f6c5d32165ff8b94e68e7b3bdecb1b082e958c22434b363482cfb89dcd6f3ff8"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"libc",
|
||||||
|
"signal-hook",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "slab"
|
name = "slab"
|
||||||
version = "0.4.5"
|
version = "0.4.5"
|
||||||
|
|
|
@ -14,12 +14,15 @@ bincode = "1"
|
||||||
# to enable the feature
|
# to enable the feature
|
||||||
bytes = { version = "*", features = ["serde"] }
|
bytes = { version = "*", features = ["serde"] }
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
|
futures = "0.3"
|
||||||
# We just need to pull in whatever axum is pulling in
|
# We just need to pull in whatever axum is pulling in
|
||||||
headers = "*"
|
headers = "*"
|
||||||
lazy_static = "1"
|
lazy_static = "1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
rocksdb = { version = "0.17", default_features = false, features = ["zstd"] }
|
rocksdb = { version = "0.17", default_features = false, features = ["zstd"] }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
signal-hook = "0.3"
|
||||||
|
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
|
||||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||||
tower-http = { version = "0.1", features = ["fs"] }
|
tower-http = { version = "0.1", features = ["fs"] }
|
||||||
tracing = { version = "0.1" }
|
tracing = { version = "0.1" }
|
||||||
|
|
|
@ -13,6 +13,7 @@ use axum::http::StatusCode;
|
||||||
use axum::response::Html;
|
use axum::response::Html;
|
||||||
use axum::{service, AddExtensionLayer, Router};
|
use axum::{service, AddExtensionLayer, Router};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
use futures::stream::StreamExt;
|
||||||
use headers::HeaderMap;
|
use headers::HeaderMap;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use omegaupload_common::{Expiration, API_ENDPOINT};
|
use omegaupload_common::{Expiration, API_ENDPOINT};
|
||||||
|
@ -20,6 +21,8 @@ use rand::thread_rng;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rocksdb::{ColumnFamilyDescriptor, IteratorMode};
|
use rocksdb::{ColumnFamilyDescriptor, IteratorMode};
|
||||||
use rocksdb::{Options, DB};
|
use rocksdb::{Options, DB};
|
||||||
|
use signal_hook::consts::SIGUSR1;
|
||||||
|
use signal_hook_tokio::Signals;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use tower_http::services::ServeDir;
|
use tower_http::services::ServeDir;
|
||||||
use tracing::{error, instrument, trace};
|
use tracing::{error, instrument, trace};
|
||||||
|
@ -58,6 +61,10 @@ async fn main() -> Result<()> {
|
||||||
|
|
||||||
set_up_expirations(&db);
|
set_up_expirations(&db);
|
||||||
|
|
||||||
|
let signals = Signals::new(&[SIGUSR1])?;
|
||||||
|
let signals_handle = signals.handle();
|
||||||
|
let signals_task = tokio::spawn(handle_signals(signals, Arc::clone(&db)));
|
||||||
|
|
||||||
let root_service = service::get(ServeDir::new("static"))
|
let root_service = service::get(ServeDir::new("static"))
|
||||||
.handle_error(|_| Ok::<_, Infallible>(StatusCode::NOT_FOUND));
|
.handle_error(|_| Ok::<_, Infallible>(StatusCode::NOT_FOUND));
|
||||||
|
|
||||||
|
@ -81,6 +88,9 @@ async fn main() -> Result<()> {
|
||||||
|
|
||||||
// Must be called for correct shutdown
|
// Must be called for correct shutdown
|
||||||
DB::destroy(&Options::default(), PASTE_DB_PATH)?;
|
DB::destroy(&Options::default(), PASTE_DB_PATH)?;
|
||||||
|
|
||||||
|
signals_handle.close();
|
||||||
|
signals_task.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,6 +159,21 @@ fn set_up_expirations(db: &Arc<DB>) {
|
||||||
info!("Cleanup timers have been initialized.");
|
info!("Cleanup timers have been initialized.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_signals(mut signals: Signals, db: Arc<DB>) {
|
||||||
|
while let Some(signal) = signals.next().await {
|
||||||
|
match signal {
|
||||||
|
SIGUSR1 => {
|
||||||
|
let meta_cf = db.cf_handle(META_CF_NAME).unwrap();
|
||||||
|
info!(
|
||||||
|
"Active paste count: {}",
|
||||||
|
db.iterator_cf(meta_cf, IteratorMode::Start).count()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(skip(db, body), err)]
|
#[instrument(skip(db, body), err)]
|
||||||
async fn upload<const N: usize>(
|
async fn upload<const N: usize>(
|
||||||
Extension(db): Extension<Arc<DB>>,
|
Extension(db): Extension<Arc<DB>>,
|
||||||
|
|
Loading…
Reference in a new issue