From 8b53b5e6de15c76b535eb1c957ca0aa840bf67a0 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 24 Mar 2025 13:53:34 +0100 Subject: [PATCH 1/3] Enable the tokio unstable features --- .cargo/config.toml | 5 ++++- .github/workflows/coverage.yaml | 2 +- crates/cli/build.rs | 3 +++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 81711ee47..307bcfe43 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,4 +1,7 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] + # On x86_64, we target the x86-64-v2 psABI, as it is a good compromise between # modern CPU instructions and compatibility. [target.x86_64-unknown-linux-gnu] -rustflags = ["-C", "target-cpu=x86-64-v2"] +rustflags = ["--cfg", "tokio_unstable", "-C", "target-cpu=x86-64-v2"] diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml index 025b09808..e7d6ab1c8 100644 --- a/.github/workflows/coverage.yaml +++ b/.github/workflows/coverage.yaml @@ -116,7 +116,7 @@ jobs: run: | cargo test --no-fail-fast --workspace env: - RUSTFLAGS: "-Cinstrument-coverage" + RUSTFLAGS: "-Cinstrument-coverage --cfg tokio_unstable" LLVM_PROFILE_FILE: "cargo-test-%p-%m.profraw" DATABASE_URL: postgresql://postgres:postgres@localhost/postgres SQLX_OFFLINE: "1" diff --git a/crates/cli/build.rs b/crates/cli/build.rs index e2a56ee14..2615b1284 100644 --- a/crates/cli/build.rs +++ b/crates/cli/build.rs @@ -6,6 +6,9 @@ use vergen_gitcl::{Emitter, GitclBuilder, RustcBuilder}; fn main() -> anyhow::Result<()> { + // Instruct rustc that we'll be using #[cfg(tokio_unstable)] + println!("cargo::rustc-check-cfg=cfg(tokio_unstable)"); + // At build time, we override the version through the environment variable // VERGEN_GIT_DESCRIBE. In some contexts, it means this variable is set but // empty, so we unset it here. From fe7359fb4256b1be3179f2653d5a20d40b9cf428 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 24 Mar 2025 13:54:23 +0100 Subject: [PATCH 2/3] Build the Tokio runtime manually, without #[tokio::main] --- crates/cli/src/main.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index a43a792bc..469713113 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -49,8 +49,15 @@ impl sentry::TransportFactory for SentryTransportFactory { } } -#[tokio::main] -async fn main() -> anyhow::Result { +fn main() -> anyhow::Result { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + runtime.block_on(async_main()) +} + +async fn async_main() -> anyhow::Result { // We're splitting the "fallible" part of main in another function to have a // chance to shutdown the telemetry exporters regardless of if there was an // error or not From 0b764c15da3a12cec8f26cf165e9b2d2715a5edd Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 24 Mar 2025 13:54:50 +0100 Subject: [PATCH 3/3] Observe tokio runtime metrics --- crates/cli/src/main.rs | 14 +- crates/cli/src/telemetry.rs | 5 + crates/cli/src/telemetry/tokio.rs | 439 ++++++++++++++++++++++++++++++ 3 files changed, 455 insertions(+), 3 deletions(-) create mode 100644 crates/cli/src/telemetry/tokio.rs diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 469713113..92eacbd5f 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -50,9 +50,17 @@ impl sentry::TransportFactory for SentryTransportFactory { } fn main() -> anyhow::Result { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()?; + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.enable_all(); + + #[cfg(tokio_unstable)] + builder + .enable_metrics_poll_time_histogram() + .metrics_poll_time_histogram_configuration(tokio::runtime::HistogramConfiguration::log( + tokio::runtime::LogHistogram::default(), + )); + + let runtime = builder.build()?; runtime.block_on(async_main()) } diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index 2401e4e47..79704d2e2 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -4,6 +4,8 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. +mod tokio; + use std::sync::{LazyLock, OnceLock}; use anyhow::Context as _; @@ -60,6 +62,9 @@ pub fn setup(config: &TelemetryConfig) -> anyhow::Result<()> { init_tracer(&config.tracing).context("Failed to configure traces exporter")?; init_meter(&config.metrics).context("Failed to configure metrics exporter")?; + let handle = ::tokio::runtime::Handle::current(); + self::tokio::observe(handle.metrics()); + Ok(()) } diff --git a/crates/cli/src/telemetry/tokio.rs b/crates/cli/src/telemetry/tokio.rs new file mode 100644 index 000000000..a8c0ace13 --- /dev/null +++ b/crates/cli/src/telemetry/tokio.rs @@ -0,0 +1,439 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use opentelemetry::KeyValue; +use tokio::runtime::RuntimeMetrics; + +use super::METER; + +/// Install metrics for the tokio runtime. +#[allow(clippy::too_many_lines)] +pub fn observe(metrics: RuntimeMetrics) { + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.workers") + .with_description("The number of worker threads used by the runtime") + .with_unit("{worker}") + .with_callback(move |instrument| { + instrument.observe(metrics.num_workers().try_into().unwrap_or(u64::MAX), &[]); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.blocking_threads") + .with_description("The number of additional threads spawned by the runtime") + .with_unit("{thread}") + .with_callback(move |instrument| { + instrument.observe( + metrics + .num_blocking_threads() + .try_into() + .unwrap_or(u64::MAX), + &[], + ); + }) + .build(); + } + + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.global_queue_depth") + .with_description( + "The number of tasks currently scheduled in the runtime’s global queue", + ) + .with_unit("{task}") + .with_callback(move |instrument| { + instrument.observe( + metrics.global_queue_depth().try_into().unwrap_or(u64::MAX), + &[], + ); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.idle_blocking_threads") + .with_description("The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls") + .with_unit("{thread}") + .with_callback(move |instrument| { + instrument.observe( + metrics + .num_idle_blocking_threads() + .try_into() + .unwrap_or(u64::MAX), + &[], + ); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.remote_schedules") + .with_description("The number of tasks scheduled from outside the runtime") + .with_unit("{task}") + .with_callback(move |instrument| { + instrument.observe(metrics.remote_schedule_count(), &[]); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.budget_forced_yields") + .with_description("The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets") + .with_unit("{yield}") + .with_callback(move |instrument| { + instrument.observe(metrics.budget_forced_yield_count(), &[]); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.io_driver.fd_registrations") + .with_description("The number of file descriptors that have been registered with the runtime's I/O driver") + .with_unit("{fd}") + .with_callback(move |instrument| { + instrument.observe(metrics.io_driver_fd_registered_count(), &[]); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.io_driver.fd_deregistrations") + .with_description("The number of file descriptors that have been deregistered by the runtime's I/O driver") + .with_unit("{fd}") + .with_callback(move |instrument| { + instrument.observe(metrics.io_driver_fd_deregistered_count(), &[]); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.io_driver.fd_readies") + .with_description("The number of ready events processed by the runtime's I/O driver") + .with_unit("{event}") + .with_callback(move |instrument| { + instrument.observe(metrics.io_driver_ready_count(), &[]); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.global_queue_depth") + .with_description( + "The number of tasks currently scheduled in the runtime's global queue", + ) + .with_unit("{task}") + .with_callback(move |instrument| { + instrument.observe( + metrics.global_queue_depth().try_into().unwrap_or(u64::MAX), + &[], + ); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.blocking_queue_depth") + .with_description("The number of tasks currently scheduled in the blocking thread pool, spawned using `spawn_blocking`") + .with_unit("{task}") + .with_callback(move |instrument| { + instrument.observe( + metrics + .blocking_queue_depth() + .try_into() + .unwrap_or(u64::MAX), + &[], + ); + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.park_count") + .with_description("The total number of times the given worker thread has parked") + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe(metrics.worker_park_count(i), &[worker_idx(i)]); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.noops") + .with_description("The number of times the given worker thread unparked but performed no work before parking again") + .with_unit("{operation}") + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe( + metrics.worker_noop_count(i), + &[worker_idx(i)], + ); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.task_steals") + .with_description( + "The number of tasks the given worker thread stole from another worker thread", + ) + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe(metrics.worker_steal_count(i), &[worker_idx(i)]); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.steal_operations") + .with_description( + "The number of times the given worker thread stole tasks from another worker thread", + ) + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe(metrics.worker_steal_operations(i), &[worker_idx(i)]); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.polls") + .with_description("The number of tasks the given worker thread has polled") + .with_unit("{task}") + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe(metrics.worker_poll_count(i), &[worker_idx(i)]); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.busy_duration") + .with_description("The amount of time the given worker thread has been busy") + .with_unit("ms") + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe( + metrics + .worker_total_busy_duration(i) + .as_millis() + .try_into() + .unwrap_or(u64::MAX), + &[worker_idx(i)], + ); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.local_schedules") + .with_description("The number of tasks scheduled from **within** the runtime on the given worker's local queue") + .with_unit("{task}") + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe( + metrics.worker_local_schedule_count(i), + &[worker_idx(i)], + ); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_counter("tokio_runtime.worker.overflows") + .with_description( + "The number of times the given worker thread saturated its local queue", + ) + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe(metrics.worker_overflow_count(i), &[worker_idx(i)]); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.worker.local_queue_depth") + .with_description( + "The number of tasks currently scheduled in the given worker's local queue", + ) + .with_unit("{task}") + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe( + metrics + .worker_local_queue_depth(i) + .try_into() + .unwrap_or(u64::MAX), + &[worker_idx(i)], + ); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.worker.mean_poll_time") + .with_description("The mean duration of task polls, in nanoseconds") + .with_unit("ns") + .with_callback(move |instrument| { + let num = metrics.num_workers(); + for i in 0..num { + instrument.observe( + metrics + .worker_mean_poll_time(i) + .as_nanos() + .try_into() + .unwrap_or(u64::MAX), + &[worker_idx(i)], + ); + } + }) + .build(); + } + + #[cfg(tokio_unstable)] + { + if metrics.poll_time_histogram_enabled() { + // This adapts the histogram Tokio gives us to a format used by + // OpenTelemetry. We're cheating a bit here, as we're only mimicking + // a histogram using a counter. + + // Prepare the key-value pairs for the histogram buckets + let mut buckets: Vec<_> = (0..metrics.poll_time_histogram_num_buckets()) + .map(|i| { + let range = metrics.poll_time_histogram_bucket_range(i); + let value = range.end.as_nanos().try_into().unwrap_or(i64::MAX); + let kv = KeyValue::new("le", value); + (i, kv) + }) + .collect(); + + // Change the last bucket to +Inf + buckets.last_mut().unwrap().1 = KeyValue::new("le", "+Inf"); + + // Prepare the key-value pairs for each worker + let workers: Vec<_> = (0..metrics.num_workers()) + .map(|i| (i, worker_idx(i))) + .collect(); + + let metrics = metrics.clone(); + METER + .u64_observable_gauge("tokio_runtime.worker.poll_time_bucket") + .with_description("An histogram of the poll time of tasks, in nanoseconds") + // We don't set a unit here, as it would add it as a suffix to the metric name + .with_callback(move |instrument| { + for (worker, worker_idx) in &workers { + // Histogram buckets in OTEL accumulate values, whereas + // Tokio gives us the count wihtin each bucket, so we + // have to sum them as we go through them + let mut sum = 0; + for (bucket, le) in &buckets { + let count = metrics.poll_time_histogram_bucket_count(*worker, *bucket); + sum += count; + instrument.observe(sum, &[worker_idx.clone(), le.clone()]); + } + } + }) + .build(); + } + } + + { + METER + .u64_observable_gauge("tokio_runtime.alive_tasks") + .with_description("The number of alive tasks in the runtime") + .with_unit("{task}") + .with_callback(move |instrument| { + instrument.observe( + metrics.num_alive_tasks().try_into().unwrap_or(u64::MAX), + &[], + ); + }) + .build(); + } +} + +/// Helper to construct a [`KeyValue`] with the worker index. +#[allow(dead_code)] +fn worker_idx(i: usize) -> KeyValue { + KeyValue::new("worker_idx", i.try_into().unwrap_or(i64::MAX)) +}