Skip to content

Commit 727d0a9

Browse files
Tokio metrics (#352)
1 parent f09fcb7 commit 727d0a9

File tree

9 files changed

+247
-5
lines changed

9 files changed

+247
-5
lines changed

.claude/settings.local.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
"Bash(cargo run:*)",
2020
"Bash(RUST_LOG=debug cargo run:*)",
2121
"Bash(cat:*)",
22-
"Bash(cargo clippy:*)"
22+
"Bash(cargo clippy:*)",
23+
"WebFetch(domain:github.com)",
24+
"Bash(ls:*)",
25+
"Bash(grep:*)"
2326
],
2427
"deny": [],
2528
"ask": []

Cargo.lock

Lines changed: 17 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/datafusion-app/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ futures = "0.3.30"
2727
indexmap = { features = ["serde"], version = "2.8.0" }
2828
itertools = "0.13.0"
2929
log = "0.4.22"
30+
metrics = { optional = true, version = "0.24.0" }
3031
num_cpus = "1.16.0"
3132
object_store = { features = ["aws"], optional = true, version = "0.12" }
3233
object_store_opendal = { optional = true, version = "0.54" }
@@ -36,6 +37,9 @@ opendal = { features = [
3637
parking_lot = "0.12.3"
3738
serde = { features = ["derive"], version = "1.0.197" }
3839
tokio = { features = ["macros", "rt-multi-thread"], version = "1.36.0" }
40+
tokio-metrics = { features = [
41+
"metrics-rs-integration",
42+
], optional = true, version = "0.4" }
3943
tokio-stream = { features = ["net"], version = "0.1.15" }
4044
tonic = { optional = true, version = "0.14" }
4145
url = { optional = true, version = "2.5.2" }
@@ -51,7 +55,7 @@ flightsql = ["dep:arrow-flight", "dep:base64", "dep:tonic"]
5155
functions-json = ["dep:datafusion-functions-json"]
5256
functions-parquet = ["dep:datafusion-functions-parquet"]
5357
huggingface = ["object_store_opendal", "opendal", "url"]
54-
observability = []
58+
observability = ["dep:metrics", "dep:tokio-metrics"]
5559
s3 = ["object_store/aws", "url"]
5660
udfs-wasm = ["dep:datafusion-udfs-wasm"]
5761
vortex = ["dep:vortex-datafusion"]

crates/datafusion-app/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,13 +323,19 @@ fn default_catalog_name() -> String {
323323
pub struct ObservabilityConfig {
324324
#[serde(default = "default_observability_schema_name")]
325325
pub schema_name: String,
326+
#[serde(default = "default_tokio_metrics_enabled")]
327+
pub tokio_metrics_enabled: bool,
328+
#[serde(default = "default_tokio_metrics_interval_secs")]
329+
pub tokio_metrics_interval_secs: u64,
326330
}
327331

328332
#[cfg(feature = "observability")]
329333
impl Default for ObservabilityConfig {
330334
fn default() -> Self {
331335
Self {
332336
schema_name: default_observability_schema_name(),
337+
tokio_metrics_enabled: default_tokio_metrics_enabled(),
338+
tokio_metrics_interval_secs: default_tokio_metrics_interval_secs(),
333339
}
334340
}
335341
}
@@ -343,3 +349,13 @@ fn default_observability() -> ObservabilityConfig {
343349
fn default_observability_schema_name() -> String {
344350
"observability".to_string()
345351
}
352+
353+
#[cfg(feature = "observability")]
354+
fn default_tokio_metrics_enabled() -> bool {
355+
true
356+
}
357+
358+
#[cfg(feature = "observability")]
359+
fn default_tokio_metrics_interval_secs() -> u64 {
360+
10
361+
}

crates/datafusion-app/src/executor/dedicated.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,26 @@ impl DedicatedExecutor {
202202

203203
let handle = rx_handle.recv().expect("driver started");
204204

205+
// Start tokio metrics collection for the dedicated executor runtime
206+
#[cfg(feature = "observability")]
207+
let metrics_collector = {
208+
use crate::observability::TokioMetricsCollector;
209+
use std::time::Duration;
210+
211+
Some(TokioMetricsCollector::start(
212+
handle.clone(),
213+
"cpu_runtime".to_string(),
214+
Duration::from_secs(10),
215+
))
216+
};
217+
205218
let state = State {
206219
handle: Some(handle),
207220
start_shutdown: notify_shutdown,
208221
completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(),
209222
thread: Some(thread),
223+
#[cfg(feature = "observability")]
224+
_metrics_collector: metrics_collector,
210225
};
211226

212227
Self {
@@ -352,6 +367,10 @@ struct State {
352367

353368
/// The inner thread that can be used to join during drop.
354369
thread: Option<std::thread::JoinHandle<()>>,
370+
371+
/// Tokio metrics collector for the dedicated executor runtime.
372+
#[cfg(feature = "observability")]
373+
_metrics_collector: Option<crate::observability::TokioMetricsCollector>,
355374
}
356375

357376
// IMPORTANT: Implement `Drop` for `State`, NOT for `DedicatedExecutor`, because the executor can be cloned and clones

crates/datafusion-app/src/observability/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ use tokio_stream::StreamExt;
3333

3434
use crate::config::ObservabilityConfig;
3535

36+
#[cfg(feature = "observability")]
37+
pub mod tokio_metrics;
38+
39+
#[cfg(feature = "observability")]
40+
pub use tokio_metrics::TokioMetricsCollector;
41+
3642
const REQUESTS_TABLE_NAME: &str = "requests";
3743

3844
#[derive(Clone, Debug)]
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use log::info;
2+
use std::time::Duration;
3+
use tokio::runtime::Handle;
4+
use tokio_metrics::RuntimeMonitor;
5+
6+
pub struct TokioMetricsCollector {
7+
_reporter_handle: tokio::task::JoinHandle<()>,
8+
}
9+
10+
impl TokioMetricsCollector {
11+
/// Start collecting metrics from the given runtime handle
12+
///
13+
/// # Arguments
14+
/// * `handle` - Tokio runtime handle to monitor
15+
/// * `runtime_name` - Name prefix for metrics (e.g., "io_runtime" or "cpu_runtime")
16+
/// * `interval` - How often to poll and report metrics
17+
pub fn start(handle: Handle, runtime_name: String, interval: Duration) -> Self {
18+
info!(
19+
"Starting tokio metrics collection for {} with interval {:?}",
20+
runtime_name, interval
21+
);
22+
23+
let runtime_monitor = RuntimeMonitor::new(&handle);
24+
25+
let reporter_handle = tokio::spawn(async move {
26+
let mut intervals = runtime_monitor.intervals();
27+
let mut interval_timer = tokio::time::interval(interval);
28+
interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
29+
30+
loop {
31+
interval_timer.tick().await;
32+
33+
if let Some(metrics) = intervals.next() {
34+
// Report metrics using metrics crate
35+
use metrics::{counter, gauge};
36+
37+
let prefix = &runtime_name;
38+
39+
// Always available metrics
40+
gauge!(format!("{}_workers_count", prefix)).set(metrics.workers_count as f64);
41+
counter!(format!("{}_total_park_count", prefix))
42+
.absolute(metrics.total_park_count);
43+
gauge!(format!("{}_max_park_count", prefix)).set(metrics.max_park_count as f64);
44+
gauge!(format!("{}_min_park_count", prefix)).set(metrics.min_park_count as f64);
45+
gauge!(format!("{}_total_busy_duration_us", prefix))
46+
.set(metrics.total_busy_duration.as_micros() as f64);
47+
gauge!(format!("{}_max_busy_duration_us", prefix))
48+
.set(metrics.max_busy_duration.as_micros() as f64);
49+
gauge!(format!("{}_min_busy_duration_us", prefix))
50+
.set(metrics.min_busy_duration.as_micros() as f64);
51+
gauge!(format!("{}_global_queue_depth", prefix))
52+
.set(metrics.global_queue_depth as f64);
53+
gauge!(format!("{}_elapsed_seconds", prefix))
54+
.set(metrics.elapsed.as_secs_f64());
55+
}
56+
}
57+
});
58+
59+
Self {
60+
_reporter_handle: reporter_handle,
61+
}
62+
}
63+
64+
/// Start monitoring the current runtime
65+
pub fn start_current(runtime_name: String, interval: Duration) -> Self {
66+
let handle = Handle::current();
67+
Self::start(handle, runtime_name, interval)
68+
}
69+
}
70+
71+
#[cfg(test)]
72+
mod tests {
73+
use super::*;
74+
75+
#[tokio::test]
76+
async fn test_metrics_collector_starts() {
77+
let _collector = TokioMetricsCollector::start_current(
78+
"test_runtime".to_string(),
79+
Duration::from_secs(1),
80+
);
81+
82+
// Sleep to allow at least one metrics report
83+
tokio::time::sleep(Duration::from_millis(1100)).await;
84+
85+
// Collector should still be alive
86+
// In a real test, we'd verify metrics are being recorded
87+
}
88+
}

src/main.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,31 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
6161
env_logger::init();
6262
}
6363
let cfg = create_config(cli.config_path());
64+
65+
// Start tokio metrics collection for IO runtime when running servers
66+
#[cfg(any(feature = "flightsql", feature = "http"))]
67+
let _io_metrics_collector = {
68+
use datafusion_app::observability::TokioMetricsCollector;
69+
use std::time::Duration;
70+
71+
let is_server = match &cli.command {
72+
#[cfg(feature = "http")]
73+
Some(Command::ServeHttp { .. }) => true,
74+
#[cfg(feature = "flightsql")]
75+
Some(Command::ServeFlightSql { .. }) => true,
76+
_ => false,
77+
};
78+
79+
if is_server {
80+
Some(TokioMetricsCollector::start_current(
81+
"io_runtime".to_string(),
82+
Duration::from_secs(10),
83+
))
84+
} else {
85+
None
86+
}
87+
};
88+
6489
if let Some(Command::GenerateTpch {
6590
scale_factor,
6691
format,

src/server/mod.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,73 @@ fn describe_metrics() {
4040
"do_get_fallback_latency_ms",
4141
metrics::Unit::Milliseconds,
4242
"Do get fallback latency ms"
43-
)
43+
);
44+
45+
// Tokio runtime metrics descriptions
46+
#[cfg(any(feature = "http", feature = "flightsql"))]
47+
{
48+
// IO Runtime metrics
49+
describe_counter!(
50+
"io_runtime_workers_count",
51+
"Number of worker threads in IO runtime"
52+
);
53+
describe_counter!(
54+
"io_runtime_total_park_count",
55+
"Total park count for IO runtime"
56+
);
57+
describe_counter!(
58+
"io_runtime_total_noop_count",
59+
"Total noop count for IO runtime"
60+
);
61+
describe_counter!(
62+
"io_runtime_total_steal_count",
63+
"Total steal count for IO runtime"
64+
);
65+
describe_counter!(
66+
"io_runtime_num_remote_schedules",
67+
"Number of remote schedules for IO runtime"
68+
);
69+
describe_histogram!(
70+
"io_runtime_mean_poll_duration_us",
71+
metrics::Unit::Microseconds,
72+
"Mean poll duration for IO runtime"
73+
);
74+
describe_histogram!(
75+
"io_runtime_budget_forced_yield_count",
76+
"Budget forced yield count for IO runtime"
77+
);
78+
79+
// CPU Runtime metrics
80+
describe_counter!(
81+
"cpu_runtime_workers_count",
82+
"Number of worker threads in CPU runtime"
83+
);
84+
describe_counter!(
85+
"cpu_runtime_total_park_count",
86+
"Total park count for CPU runtime"
87+
);
88+
describe_counter!(
89+
"cpu_runtime_total_noop_count",
90+
"Total noop count for CPU runtime"
91+
);
92+
describe_counter!(
93+
"cpu_runtime_total_steal_count",
94+
"Total steal count for CPU runtime"
95+
);
96+
describe_counter!(
97+
"cpu_runtime_num_remote_schedules",
98+
"Number of remote schedules for CPU runtime"
99+
);
100+
describe_histogram!(
101+
"cpu_runtime_mean_poll_duration_us",
102+
metrics::Unit::Microseconds,
103+
"Mean poll duration for CPU runtime"
104+
);
105+
describe_histogram!(
106+
"cpu_runtime_budget_forced_yield_count",
107+
"Budget forced yield count for CPU runtime"
108+
);
109+
}
44110
}
45111

46112
pub fn try_start_metrics_server(metrics_addr: SocketAddr) -> Result<()> {

0 commit comments

Comments
 (0)