Skip to content

Commit 39be8c3

Browse files
committed
Observe metrics from the Tokio runtime
1 parent be4df36 commit 39be8c3

File tree

3 files changed

+366
-1
lines changed

3 files changed

+366
-1
lines changed

.cargo/config.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
[build]
2+
rustflags = ["--cfg", "tokio_unstable"]
3+
14
# On x86_64, we target the x86-64-v2 psABI, as it is a good compromise between
25
# modern CPU instructions and compatibility.
36
[target.x86_64-unknown-linux-gnu]
4-
rustflags = ["-C", "target-cpu=x86-64-v2"]
7+
rustflags = ["--cfg", "tokio_unstable", "-C", "target-cpu=x86-64-v2"]

crates/cli/src/telemetry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
// SPDX-License-Identifier: AGPL-3.0-only
55
// Please see LICENSE in the repository root for full details.
66

7+
mod tokio;
8+
79
use std::sync::{LazyLock, OnceLock};
810

911
use anyhow::Context as _;
@@ -60,6 +62,8 @@ pub fn setup(config: &TelemetryConfig) -> anyhow::Result<()> {
6062
init_tracer(&config.tracing).context("Failed to configure traces exporter")?;
6163
init_meter(&config.metrics).context("Failed to configure metrics exporter")?;
6264

65+
self::tokio::observe();
66+
6367
Ok(())
6468
}
6569

crates/cli/src/telemetry/tokio.rs

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
// Copyright 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use opentelemetry::KeyValue;
7+
8+
use super::METER;
9+
10+
/// Install metrics for the tokio runtime.
11+
#[allow(clippy::too_many_lines)]
12+
pub fn observe() {
13+
let handle = tokio::runtime::Handle::current();
14+
let metrics = handle.metrics();
15+
16+
{
17+
let metrics = metrics.clone();
18+
METER
19+
.u64_observable_gauge("tokio_runtime.workers")
20+
.with_description("The number of worker threads used by the runtime")
21+
.with_unit("{worker}")
22+
.with_callback(move |instrument| {
23+
instrument.observe(metrics.num_workers().try_into().unwrap_or(u64::MAX), &[]);
24+
})
25+
.build();
26+
}
27+
28+
{
29+
let metrics = metrics.clone();
30+
METER
31+
.u64_observable_gauge("tokio_runtime.blocking_threads")
32+
.with_description("The number of additional threads spawned by the runtime")
33+
.with_unit("{thread}")
34+
.with_callback(move |instrument| {
35+
instrument.observe(
36+
metrics
37+
.num_blocking_threads()
38+
.try_into()
39+
.unwrap_or(u64::MAX),
40+
&[],
41+
);
42+
})
43+
.build();
44+
}
45+
46+
{
47+
let metrics = metrics.clone();
48+
METER
49+
.u64_observable_gauge("tokio_runtime.active_tasks")
50+
.with_description("The number of active tasks in the runtime")
51+
.with_unit("{task}")
52+
.with_callback(move |instrument| {
53+
instrument.observe(
54+
metrics.num_alive_tasks().try_into().unwrap_or(u64::MAX),
55+
&[],
56+
);
57+
})
58+
.build();
59+
}
60+
61+
{
62+
let metrics = metrics.clone();
63+
METER
64+
.u64_observable_gauge("tokio_runtime.idle_blocking_threads")
65+
.with_description("The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls")
66+
.with_unit("{thread}")
67+
.with_callback(move |instrument| {
68+
instrument.observe(
69+
metrics
70+
.num_idle_blocking_threads()
71+
.try_into()
72+
.unwrap_or(u64::MAX),
73+
&[],
74+
);
75+
})
76+
.build();
77+
}
78+
79+
{
80+
let metrics = metrics.clone();
81+
METER
82+
.u64_observable_counter("tokio_runtime.remote_schedules")
83+
.with_description("The number of tasks scheduled from outside the runtime")
84+
.with_unit("{task}")
85+
.with_callback(move |instrument| {
86+
instrument.observe(metrics.remote_schedule_count(), &[]);
87+
})
88+
.build();
89+
}
90+
91+
{
92+
let metrics = metrics.clone();
93+
METER
94+
.u64_observable_counter("tokio_runtime.budget_forced_yields")
95+
.with_description("The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets")
96+
.with_unit("{yield}")
97+
.with_callback(move |instrument| {
98+
instrument.observe(metrics.budget_forced_yield_count(), &[]);
99+
})
100+
.build();
101+
}
102+
103+
{
104+
let metrics = metrics.clone();
105+
METER
106+
.u64_observable_counter("tokio_runtime.io_driver.fd_registrations")
107+
.with_description("The number of file descriptors that have been registered with the runtime's I/O driver")
108+
.with_unit("{fd}")
109+
.with_callback(move |instrument| {
110+
instrument.observe(metrics.io_driver_fd_registered_count(), &[]);
111+
})
112+
.build();
113+
}
114+
115+
{
116+
let metrics = metrics.clone();
117+
METER
118+
.u64_observable_counter("tokio_runtime.io_driver.fd_deregistrations")
119+
.with_description("The number of file descriptors that have been deregistered by the runtime's I/O driver")
120+
.with_unit("{fd}")
121+
.with_callback(move |instrument| {
122+
instrument.observe(metrics.io_driver_fd_deregistered_count(), &[]);
123+
})
124+
.build();
125+
}
126+
127+
{
128+
let metrics = metrics.clone();
129+
METER
130+
.u64_observable_counter("tokio_runtime.io_driver.fd_readies")
131+
.with_description("The number of ready events processed by the runtime's I/O driver")
132+
.with_unit("{event}")
133+
.with_callback(move |instrument| {
134+
instrument.observe(metrics.io_driver_ready_count(), &[]);
135+
})
136+
.build();
137+
}
138+
139+
{
140+
let metrics = metrics.clone();
141+
METER
142+
.u64_observable_gauge("tokio_runtime.global_queue_depth")
143+
.with_description(
144+
"The number of tasks currently scheduled in the runtime's global queue",
145+
)
146+
.with_unit("{task}")
147+
.with_callback(move |instrument| {
148+
instrument.observe(
149+
metrics.global_queue_depth().try_into().unwrap_or(u64::MAX),
150+
&[],
151+
);
152+
})
153+
.build();
154+
}
155+
156+
{
157+
let metrics = metrics.clone();
158+
METER
159+
.u64_observable_gauge("tokio_runtime.blocking_queue_depth")
160+
.with_description("The number of tasks currently scheduled in the blocking thread pool, spawned using `spawn_blocking`")
161+
.with_unit("{task}")
162+
.with_callback(move |instrument| {
163+
instrument.observe(
164+
metrics
165+
.blocking_queue_depth()
166+
.try_into()
167+
.unwrap_or(u64::MAX),
168+
&[],
169+
);
170+
})
171+
.build();
172+
}
173+
174+
{
175+
let metrics = metrics.clone();
176+
METER
177+
.u64_observable_counter("tokio_runtime.worker.park_count")
178+
.with_description("The total number of times the given worker thread has parked")
179+
.with_callback(move |instrument| {
180+
let num = metrics.num_workers();
181+
for i in 0..num {
182+
instrument.observe(metrics.worker_park_count(i), &[worker_idx(i)]);
183+
}
184+
})
185+
.build();
186+
}
187+
188+
{
189+
let metrics = metrics.clone();
190+
METER
191+
.u64_observable_counter("tokio_runtime.worker.noops")
192+
.with_description("The number of times the given worker thread unparked but performed no work before parking again")
193+
.with_unit("{operation}")
194+
.with_callback(move |instrument| {
195+
let num = metrics.num_workers();
196+
for i in 0..num {
197+
instrument.observe(
198+
metrics.worker_noop_count(i),
199+
&[worker_idx(i)],
200+
);
201+
}
202+
})
203+
.build();
204+
}
205+
206+
{
207+
let metrics = metrics.clone();
208+
METER
209+
.u64_observable_counter("tokio_runtime.worker.task_steals")
210+
.with_description(
211+
"The number of tasks the given worker thread stole from another worker thread",
212+
)
213+
.with_callback(move |instrument| {
214+
let num = metrics.num_workers();
215+
for i in 0..num {
216+
instrument.observe(metrics.worker_steal_count(i), &[worker_idx(i)]);
217+
}
218+
})
219+
.build();
220+
}
221+
222+
{
223+
let metrics = metrics.clone();
224+
METER
225+
.u64_observable_counter("tokio_runtime.worker.steal_operations")
226+
.with_description(
227+
"The number of times the given worker thread stole tasks from another worker thread",
228+
)
229+
.with_callback(move |instrument| {
230+
let num = metrics.num_workers();
231+
for i in 0..num {
232+
instrument.observe(metrics.worker_steal_operations(i), &[worker_idx(i)]);
233+
}
234+
})
235+
.build();
236+
}
237+
238+
{
239+
let metrics = metrics.clone();
240+
METER
241+
.u64_observable_counter("tokio_runtime.worker.polls")
242+
.with_description("The number of tasks the given worker thread has polled")
243+
.with_unit("{task}")
244+
.with_callback(move |instrument| {
245+
let num = metrics.num_workers();
246+
for i in 0..num {
247+
instrument.observe(metrics.worker_poll_count(i), &[worker_idx(i)]);
248+
}
249+
})
250+
.build();
251+
}
252+
253+
{
254+
let metrics = metrics.clone();
255+
METER
256+
.u64_observable_counter("tokio_runtime.worker.busy_duration")
257+
.with_description("The amount of time the given worker thread has been busy")
258+
.with_unit("ms")
259+
.with_callback(move |instrument| {
260+
let num = metrics.num_workers();
261+
for i in 0..num {
262+
instrument.observe(
263+
metrics
264+
.worker_total_busy_duration(i)
265+
.as_millis()
266+
.try_into()
267+
.unwrap_or(u64::MAX),
268+
&[worker_idx(i)],
269+
);
270+
}
271+
})
272+
.build();
273+
}
274+
275+
{
276+
let metrics = metrics.clone();
277+
METER
278+
.u64_observable_counter("tokio_runtime.worker.local_schedules")
279+
.with_description("The number of tasks scheduled from **within** the runtime on the given worker's local queue")
280+
.with_unit("{task}")
281+
.with_callback(move |instrument| {
282+
let num = metrics.num_workers();
283+
for i in 0..num {
284+
instrument.observe(
285+
metrics.worker_local_schedule_count(i),
286+
&[worker_idx(i)],
287+
);
288+
}
289+
})
290+
.build();
291+
}
292+
293+
{
294+
let metrics = metrics.clone();
295+
METER
296+
.u64_observable_counter("tokio_runtime.worker.overflows")
297+
.with_description(
298+
"The number of times the given worker thread saturated its local queue",
299+
)
300+
.with_callback(move |instrument| {
301+
let num = metrics.num_workers();
302+
for i in 0..num {
303+
instrument.observe(metrics.worker_overflow_count(i), &[worker_idx(i)]);
304+
}
305+
})
306+
.build();
307+
}
308+
309+
{
310+
let metrics = metrics.clone();
311+
METER
312+
.u64_observable_gauge("tokio_runtime.worker.local_queue_depth")
313+
.with_description(
314+
"The number of tasks currently scheduled in the given worker's local queue",
315+
)
316+
.with_unit("{task}")
317+
.with_callback(move |instrument| {
318+
let num = metrics.num_workers();
319+
for i in 0..num {
320+
instrument.observe(
321+
metrics
322+
.worker_local_queue_depth(i)
323+
.try_into()
324+
.unwrap_or(u64::MAX),
325+
&[worker_idx(i)],
326+
);
327+
}
328+
})
329+
.build();
330+
}
331+
332+
{
333+
let metrics = metrics.clone();
334+
METER
335+
.u64_observable_gauge("tokio_runtime.worker.mean_poll_time")
336+
.with_description("The mean duration of task polls, in nanoseconds")
337+
.with_unit("ns")
338+
.with_callback(move |instrument| {
339+
let num = metrics.num_workers();
340+
for i in 0..num {
341+
instrument.observe(
342+
metrics
343+
.worker_mean_poll_time(i)
344+
.as_nanos()
345+
.try_into()
346+
.unwrap_or(u64::MAX),
347+
&[worker_idx(i)],
348+
);
349+
}
350+
})
351+
.build();
352+
}
353+
}
354+
355+
/// Helper to construct a [`KeyValue`] with the worker index.
356+
fn worker_idx(i: usize) -> KeyValue {
357+
KeyValue::new("worker_idx", i.try_into().unwrap_or(i64::MAX))
358+
}

0 commit comments

Comments
 (0)