Skip to content

Commit e6ec6c7

Browse files
committed
Observe metrics from the Tokio runtime
1 parent af01f6e commit e6ec6c7

File tree

3 files changed

+345
-0
lines changed

3 files changed

+345
-0
lines changed

.cargo/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
# modern CPU instructions and compatibility.
33
[target.x86_64-unknown-linux-gnu]
44
rustflags = ["-C", "target-cpu=x86-64-v2"]
5+
6+
[build]
7+
rustflags = ["--cfg", "tokio_unstable"]

crates/cli/src/telemetry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
// SPDX-License-Identifier: AGPL-3.0-only
55
// Please see LICENSE in the repository root for full details.
66

7+
mod tokio;
8+
79
use std::sync::{LazyLock, OnceLock};
810

911
use anyhow::Context as _;
@@ -60,6 +62,8 @@ pub fn setup(config: &TelemetryConfig) -> anyhow::Result<()> {
6062
init_tracer(&config.tracing).context("Failed to configure traces exporter")?;
6163
init_meter(&config.metrics).context("Failed to configure metrics exporter")?;
6264

65+
self::tokio::observe();
66+
6367
Ok(())
6468
}
6569

crates/cli/src/telemetry/tokio.rs

Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
// Copyright 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use opentelemetry::KeyValue;
7+
8+
use super::METER;
9+
10+
/// Install metrics for the tokio runtime.
11+
#[allow(clippy::too_many_lines)]
12+
pub fn observe() {
13+
let handle = tokio::runtime::Handle::current();
14+
let metrics = handle.metrics();
15+
16+
{
17+
let metrics = metrics.clone();
18+
METER
19+
.u64_observable_gauge("tokio_runtime.workers")
20+
.with_description("The number of worker threads used by the runtime")
21+
.with_unit("{worker}")
22+
.with_callback(move |instrument| {
23+
instrument.observe(metrics.num_workers().try_into().unwrap_or(u64::MAX), &[]);
24+
});
25+
}
26+
27+
{
28+
let metrics = metrics.clone();
29+
METER
30+
.u64_observable_gauge("tokio_runtime.blocking_threads")
31+
.with_description("The number of additional threads spawned by the runtime")
32+
.with_unit("{thread}")
33+
.with_callback(move |instrument| {
34+
instrument.observe(
35+
metrics
36+
.num_blocking_threads()
37+
.try_into()
38+
.unwrap_or(u64::MAX),
39+
&[],
40+
);
41+
});
42+
}
43+
44+
{
45+
let metrics = metrics.clone();
46+
METER
47+
.u64_observable_gauge("tokio_runtime.active_tasks")
48+
.with_description("The number of active tasks in the runtime")
49+
.with_unit("{task}")
50+
.with_callback(move |instrument| {
51+
instrument.observe(
52+
metrics.num_alive_tasks().try_into().unwrap_or(u64::MAX),
53+
&[],
54+
);
55+
});
56+
}
57+
58+
{
59+
let metrics = metrics.clone();
60+
METER
61+
.u64_observable_gauge("tokio_runtime.idle_blocking_threads")
62+
.with_description("The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls")
63+
.with_unit("{thread}")
64+
.with_callback(move |instrument| {
65+
instrument.observe(
66+
metrics
67+
.num_idle_blocking_threads()
68+
.try_into()
69+
.unwrap_or(u64::MAX),
70+
&[],
71+
);
72+
})
73+
;
74+
}
75+
76+
{
77+
let metrics = metrics.clone();
78+
METER
79+
.u64_observable_counter("tokio_runtime.remote_schedules")
80+
.with_description("The number of tasks scheduled from outside the runtime")
81+
.with_unit("{task}")
82+
.with_callback(move |instrument| {
83+
instrument.observe(metrics.remote_schedule_count(), &[]);
84+
});
85+
}
86+
87+
{
88+
let metrics = metrics.clone();
89+
METER
90+
.u64_observable_counter("tokio_runtime.budget_forced_yields")
91+
.with_description("The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets")
92+
.with_unit("{yield}")
93+
.with_callback(move |instrument| {
94+
instrument.observe(metrics.budget_forced_yield_count(), &[]);
95+
});
96+
}
97+
98+
{
99+
let metrics = metrics.clone();
100+
METER
101+
.u64_observable_counter("tokio_runtime.io_driver.fd_registrations")
102+
.with_description("The number of file descriptors that have been registered with the runtime's I/O driver")
103+
.with_unit("{fd}")
104+
.with_callback(move |instrument| {
105+
instrument.observe(metrics.io_driver_fd_registered_count(), &[]);
106+
});
107+
}
108+
109+
{
110+
let metrics = metrics.clone();
111+
METER
112+
.u64_observable_counter("tokio_runtime.io_driver.fd_deregistrations")
113+
.with_description("The number of file descriptors that have been deregistered by the runtime's I/O driver")
114+
.with_unit("{fd}")
115+
.with_callback(move |instrument| {
116+
instrument.observe(metrics.io_driver_fd_deregistered_count(), &[]);
117+
});
118+
}
119+
120+
{
121+
let metrics = metrics.clone();
122+
METER
123+
.u64_observable_counter("tokio_runtime.io_driver.fd_readies")
124+
.with_description("The number of ready events processed by the runtime's I/O driver")
125+
.with_unit("{event}")
126+
.with_callback(move |instrument| {
127+
instrument.observe(metrics.io_driver_ready_count(), &[]);
128+
});
129+
}
130+
131+
{
132+
let metrics = metrics.clone();
133+
METER
134+
.u64_observable_gauge("tokio_runtime.global_queue_depth")
135+
.with_description(
136+
"The number of tasks currently scheduled in the runtime's global queue",
137+
)
138+
.with_unit("{task}")
139+
.with_callback(move |instrument| {
140+
instrument.observe(
141+
metrics.global_queue_depth().try_into().unwrap_or(u64::MAX),
142+
&[],
143+
);
144+
});
145+
}
146+
147+
{
148+
let metrics = metrics.clone();
149+
METER
150+
.u64_observable_gauge("tokio_runtime.blocking_queue_depth")
151+
.with_description("The number of tasks currently scheduled in the blocking thread pool, spawned using `spawn_blocking`")
152+
.with_unit("{task}")
153+
.with_callback(move |instrument| {
154+
instrument.observe(
155+
metrics
156+
.blocking_queue_depth()
157+
.try_into()
158+
.unwrap_or(u64::MAX),
159+
&[],
160+
);
161+
});
162+
}
163+
164+
{
165+
let metrics = metrics.clone();
166+
METER
167+
.u64_observable_counter("tokio_runtime.worker.park_count")
168+
.with_description("The total number of times the given worker thread has parked")
169+
.with_callback(move |instrument| {
170+
let num = metrics.num_workers();
171+
for i in 0..num {
172+
instrument.observe(metrics.worker_park_count(i), &[worker_idx(i)]);
173+
}
174+
});
175+
}
176+
177+
{
178+
let metrics = metrics.clone();
179+
METER
180+
.u64_observable_counter("tokio_runtime.worker.noops")
181+
.with_description("The number of times the given worker thread unparked but performed no work before parking again")
182+
.with_unit("{operation}")
183+
.with_callback(move |instrument| {
184+
let num = metrics.num_workers();
185+
for i in 0..num {
186+
instrument.observe(
187+
metrics.worker_noop_count(i),
188+
&[worker_idx(i)],
189+
);
190+
}
191+
});
192+
}
193+
194+
{
195+
let metrics = metrics.clone();
196+
METER
197+
.u64_observable_counter("tokio_runtime.worker.task_steals")
198+
.with_description(
199+
"The number of tasks the given worker thread stole from another worker thread",
200+
)
201+
.with_callback(move |instrument| {
202+
let num = metrics.num_workers();
203+
for i in 0..num {
204+
instrument.observe(metrics.worker_steal_count(i), &[worker_idx(i)]);
205+
}
206+
});
207+
}
208+
209+
{
210+
let metrics = metrics.clone();
211+
METER
212+
.u64_observable_counter("tokio_runtime.worker.steal_operations")
213+
.with_description(
214+
"The number of times the given worker thread stole tasks from another worker thread",
215+
)
216+
.with_callback(move |instrument| {
217+
let num = metrics.num_workers();
218+
for i in 0..num {
219+
instrument.observe(metrics.worker_steal_operations(i), &[worker_idx(i)]);
220+
}
221+
});
222+
}
223+
224+
{
225+
let metrics = metrics.clone();
226+
METER
227+
.u64_observable_counter("tokio_runtime.worker.polls")
228+
.with_description("The number of tasks the given worker thread has polled")
229+
.with_unit("{task}")
230+
.with_callback(move |instrument| {
231+
let num = metrics.num_workers();
232+
for i in 0..num {
233+
instrument.observe(metrics.worker_poll_count(i), &[worker_idx(i)]);
234+
}
235+
});
236+
}
237+
238+
{
239+
let metrics = metrics.clone();
240+
METER
241+
.u64_observable_counter("tokio_runtime.worker.busy_duration")
242+
.with_description("The amount of time the given worker thread has been busy")
243+
.with_unit("ms")
244+
.with_callback(move |instrument| {
245+
let num = metrics.num_workers();
246+
for i in 0..num {
247+
instrument.observe(
248+
metrics
249+
.worker_total_busy_duration(i)
250+
.as_millis()
251+
.try_into()
252+
.unwrap_or(u64::MAX),
253+
&[worker_idx(i)],
254+
);
255+
}
256+
});
257+
}
258+
259+
{
260+
let metrics = metrics.clone();
261+
METER
262+
.u64_observable_counter("tokio_runtime.worker.local_schedules")
263+
.with_description("The number of tasks scheduled from **within** the runtime on the given worker's local queue")
264+
.with_unit("{task}")
265+
.with_callback(move |instrument| {
266+
let num = metrics.num_workers();
267+
for i in 0..num {
268+
instrument.observe(
269+
metrics.worker_local_schedule_count(i),
270+
&[worker_idx(i)],
271+
);
272+
}
273+
});
274+
}
275+
276+
{
277+
let metrics = metrics.clone();
278+
METER
279+
.u64_observable_counter("tokio_runtime.worker.overflows")
280+
.with_description(
281+
"The number of times the given worker thread saturated its local queue",
282+
)
283+
.with_callback(move |instrument| {
284+
let num = metrics.num_workers();
285+
for i in 0..num {
286+
instrument.observe(metrics.worker_overflow_count(i), &[worker_idx(i)]);
287+
}
288+
});
289+
}
290+
291+
{
292+
let metrics = metrics.clone();
293+
METER
294+
.u64_observable_gauge("tokio_runtime.worker.local_queue_depth")
295+
.with_description(
296+
"The number of tasks currently scheduled in the given worker's local queue",
297+
)
298+
.with_unit("{task}")
299+
.with_callback(move |instrument| {
300+
let num = metrics.num_workers();
301+
for i in 0..num {
302+
instrument.observe(
303+
metrics
304+
.worker_local_queue_depth(i)
305+
.try_into()
306+
.unwrap_or(u64::MAX),
307+
&[worker_idx(i)],
308+
);
309+
}
310+
});
311+
}
312+
313+
{
314+
let metrics = metrics.clone();
315+
METER
316+
.u64_observable_gauge("tokio_runtime.worker.mean_poll_time")
317+
.with_description("The mean duration of task polls, in nanoseconds")
318+
.with_unit("ns")
319+
.with_callback(move |instrument| {
320+
let num = metrics.num_workers();
321+
for i in 0..num {
322+
instrument.observe(
323+
metrics
324+
.worker_mean_poll_time(i)
325+
.as_nanos()
326+
.try_into()
327+
.unwrap_or(u64::MAX),
328+
&[worker_idx(i)],
329+
);
330+
}
331+
});
332+
}
333+
}
334+
335+
/// Helper to construct a [`KeyValue`] with the worker index.
336+
fn worker_idx(i: usize) -> KeyValue {
337+
KeyValue::new("worker_idx", i.try_into().unwrap_or(i64::MAX))
338+
}

0 commit comments

Comments
 (0)