Skip to content

Commit 7030339

Browse files
authored
Merge pull request #31694 from bkirwi/tokio-metrics
Add a bunch of tokio metrics to prometheus
2 parents f6c606d + fd9cfd4 commit 7030339

File tree

7 files changed

+81
-14
lines changed

7 files changed

+81
-14
lines changed

src/clusterd/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use mz_http_util::DynamicFilterTarget;
2525
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
2626
use mz_ore::cli::{self, CliConfig};
2727
use mz_ore::error::ErrorExt;
28-
use mz_ore::metrics::MetricsRegistry;
28+
use mz_ore::metrics::{register_runtime_metrics, MetricsRegistry};
2929
use mz_ore::netio::{Listener, SocketAddr};
3030
use mz_ore::now::SYSTEM_TIME;
3131
use mz_persist_client::cache::PersistClientCache;
@@ -38,6 +38,7 @@ use mz_storage::storage_state::StorageInstanceContext;
3838
use mz_storage_client::client::proto_storage_server::ProtoStorageServer;
3939
use mz_storage_types::connections::ConnectionContext;
4040
use mz_txn_wal::operator::TxnsContext;
41+
use tokio::runtime::Handle;
4142
use tower::Service;
4243
use tracing::{error, info};
4344

@@ -184,6 +185,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
184185
.await?;
185186

186187
let tracing_handle = Arc::new(tracing_handle);
188+
register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
187189

188190
// Keep this _after_ the mz_ore::tracing::configure call so that its panic
189191
// hook runs _before_ the one that sends things to sentry.

src/environmentd/src/environmentd/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrches
5454
use mz_ore::cli::{self, CliConfig, KeyValueArg};
5555
use mz_ore::error::ErrorExt;
5656
use mz_ore::metric;
57-
use mz_ore::metrics::MetricsRegistry;
57+
use mz_ore::metrics::{register_runtime_metrics, MetricsRegistry};
5858
use mz_ore::now::SYSTEM_TIME;
5959
use mz_ore::task::RuntimeExt;
6060
use mz_ore::url::SensitiveUrl;
@@ -724,6 +724,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
724724
},
725725
metrics_registry.clone(),
726726
))?;
727+
register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
727728

728729
let span = tracing::info_span!("environmentd::run").entered();
729730

src/ore/src/metrics.rs

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,10 @@ pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
182182
/// Delete-on-drop shadow of Prometheus [raw::UIntGaugeVec].
183183
pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
184184

185-
pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
186-
187185
use crate::assert_none;
188186

187+
pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
188+
189189
/// Access to non-delete-on-drop vector types
190190
pub mod raw {
191191
use prometheus::core::{AtomicU64, GenericGaugeVec};
@@ -217,13 +217,12 @@ impl MetricsRegistry {
217217
}
218218

219219
/// Registers a gauge whose value is computed when observed.
220-
pub fn register_computed_gauge<F, P>(
220+
pub fn register_computed_gauge<P>(
221221
&self,
222222
opts: MakeCollectorOpts,
223-
f: F,
223+
f: impl Fn() -> P::T + Send + Sync + 'static,
224224
) -> ComputedGenericGauge<P>
225225
where
226-
F: Fn() -> P::T + Send + Sync + 'static,
227226
P: Atomic + 'static,
228227
{
229228
let gauge = ComputedGenericGauge {
@@ -769,6 +768,65 @@ impl DurationMetric for &'_ mut f64 {
769768
}
770769
}
771770

771+
/// Register the Tokio runtime's metrics in our metrics registry.
772+
#[cfg(feature = "async")]
773+
pub fn register_runtime_metrics(
774+
name: &'static str,
775+
runtime_metrics: tokio::runtime::RuntimeMetrics,
776+
registry: &MetricsRegistry,
777+
) {
778+
macro_rules! register {
779+
($method:ident, $doc:literal) => {
780+
let metrics = runtime_metrics.clone();
781+
registry.register_computed_gauge::<prometheus::core::AtomicU64>(
782+
crate::metric!(
783+
name: concat!("mz_tokio_", stringify!($method)),
784+
help: $doc,
785+
const_labels: {"runtime" => name},
786+
),
787+
move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
788+
);
789+
};
790+
}
791+
792+
register!(
793+
num_workers,
794+
"The number of worker threads used by the runtime."
795+
);
796+
register!(
797+
num_alive_tasks,
798+
"The current number of alive tasks in the runtime."
799+
);
800+
register!(
801+
global_queue_depth,
802+
"The number of tasks currently scheduled in the runtime's global queue."
803+
);
804+
register!(
805+
num_blocking_threads,
806+
"The number of additional threads spawned by the runtime."
807+
);
808+
register!(
809+
num_idle_blocking_threads,
810+
"The number of idle threads which have spawned by the runtime for spawn_blocking calls."
811+
);
812+
register!(
813+
spawned_tasks_count,
814+
"The number of tasks spawned in this runtime since it was created."
815+
);
816+
register!(
817+
remote_schedule_count,
818+
"The number of tasks scheduled from outside of the runtime."
819+
);
820+
register!(
821+
budget_forced_yield_count,
822+
"The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
823+
);
824+
register!(
825+
blocking_queue_depth,
826+
"The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
827+
);
828+
}
829+
772830
#[cfg(test)]
773831
mod tests {
774832
use std::time::Duration;

src/persist-client/src/async_runtime.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use std::future::Future;
1313
use std::sync::atomic::{AtomicUsize, Ordering};
1414

15+
use mz_ore::metrics::{register_runtime_metrics, MetricsRegistry};
1516
use mz_ore::task::{JoinHandle, RuntimeExt};
1617
use tokio::runtime::{Builder, Runtime};
1718

@@ -41,9 +42,12 @@ pub struct IsolatedRuntime {
4142

4243
impl IsolatedRuntime {
4344
/// Creates a new isolated runtime.
44-
pub fn new(worker_threads: usize) -> IsolatedRuntime {
45-
let runtime = Builder::new_multi_thread()
46-
.worker_threads(worker_threads)
45+
pub fn new(metrics: &MetricsRegistry, worker_threads: Option<usize>) -> IsolatedRuntime {
46+
let mut runtime = Builder::new_multi_thread();
47+
if let Some(worker_threads) = worker_threads {
48+
runtime.worker_threads(worker_threads);
49+
}
50+
let runtime = runtime
4751
.thread_name_fn(|| {
4852
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
4953
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
@@ -54,6 +58,7 @@ impl IsolatedRuntime {
5458
.enable_all()
5559
.build()
5660
.expect("known to be valid");
61+
register_runtime_metrics("persist", runtime.metrics(), metrics);
5762
IsolatedRuntime {
5863
inner: Some(runtime),
5964
}
@@ -79,7 +84,7 @@ impl IsolatedRuntime {
7984

8085
impl Default for IsolatedRuntime {
8186
fn default() -> Self {
82-
IsolatedRuntime::new(num_cpus::get())
87+
IsolatedRuntime::new(&MetricsRegistry::new(), None)
8388
}
8489
}
8590

src/persist-client/src/cache.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ impl PersistClientCache {
8686
Arc::clone(&state_cache),
8787
pubsub_client.receiver,
8888
);
89-
let isolated_runtime = IsolatedRuntime::new(cfg.isolated_runtime_worker_threads);
89+
let isolated_runtime =
90+
IsolatedRuntime::new(registry, Some(cfg.isolated_runtime_worker_threads));
9091

9192
PersistClientCache {
9293
cfg,

src/persist-client/src/cli/admin.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
223223
.await?;
224224

225225
if force_downgrade_upper {
226-
let isolated_runtime = Arc::new(IsolatedRuntime::default());
226+
let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227227
let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228228
let shared_states = Arc::new(StateCache::new(
229229
&cfg,

src/persist-client/src/cli/inspect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ pub async fn blob_usage(args: &StateArgs) -> Result<(), anyhow::Error> {
647647
let consensus =
648648
make_consensus(&cfg, &args.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
649649
let blob = make_blob(&cfg, &args.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
650-
let isolated_runtime = Arc::new(IsolatedRuntime::default());
650+
let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
651651
let state_cache = Arc::new(StateCache::new(
652652
&cfg,
653653
Arc::clone(&metrics),

0 commit comments

Comments
 (0)