Skip to content

Commit 4230ce8

Browse files
feat: Add tracing instrumentation for plan creation
Introduces TracingQueryPlanner and rule instrumentation for analyzer, logical optimizer, and physical optimizer rules with tracing spans. Phase spans group individual rule executions and capture plan diffs when rules modify plans. Ergonomic API via RuleInstrumentationOptions::full() and instrument_rules_with_info_spans! macro.
1 parent 2527512 commit 4230ce8

27 files changed

+16731
-117
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ authors = ["DataDog <[email protected]>"]
3737
rust-version = "1.86.0"
3838

3939
[workspace.dependencies]
40+
async-trait = "0.1"
4041
datafusion = { version = "51.0.0", default-features = false }
4142
datafusion-tracing = { path = "datafusion-tracing", version = "51.0.0" }
4243
futures = "0.3"

README.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ use datafusion::{
7373
prelude::*,
7474
};
7575
use datafusion_tracing::{
76-
instrument_with_info_spans, pretty_format_compact_batch, InstrumentationOptions,
76+
instrument_rules_with_info_spans, instrument_with_info_spans,
77+
pretty_format_compact_batch, InstrumentationOptions, RuleInstrumentationOptions,
7778
};
7879
use std::sync::Arc;
7980
use tracing::field;
@@ -83,8 +84,8 @@ async fn main() -> Result<()> {
8384
// Initialize tracing subscriber as usual
8485
// (See examples/otlp.rs for a complete example).
8586

86-
// Set up tracing options (you can customize these).
87-
let options = InstrumentationOptions::builder()
87+
// Set up execution plan tracing options (you can customize these).
88+
let exec_options = InstrumentationOptions::builder()
8889
.record_metrics(true)
8990
.preview_limit(5)
9091
.preview_fn(Arc::new(|batch: &RecordBatch| {
@@ -95,7 +96,7 @@ async fn main() -> Result<()> {
9596
.build();
9697

9798
let instrument_rule = instrument_with_info_spans!(
98-
options: options,
99+
options: exec_options,
99100
env = field::Empty,
100101
region = field::Empty,
101102
);
@@ -105,8 +106,19 @@ async fn main() -> Result<()> {
105106
.with_physical_optimizer_rule(instrument_rule)
106107
.build();
107108

109+
// Instrument all rules (analyzer, logical optimizer, physical optimizer)
110+
// Physical plan creation tracing is automatically enabled when physical_optimizer is set
111+
let rule_options = RuleInstrumentationOptions::full().with_plan_diff();
112+
let session_state = instrument_rules_with_info_spans!(
113+
options: rule_options,
114+
state: session_state
115+
);
116+
108117
let ctx = SessionContext::new_with_state(session_state);
109118

119+
// Execute a query - the entire lifecycle is now traced:
120+
// SQL Parsing -> Logical Plan -> Analyzer Rules -> Optimizer Rules ->
121+
// Physical Plan Creation -> Physical Optimizer Rules -> Execution
110122
let results = ctx.sql("SELECT 1").await?.collect().await?;
111123
println!(
112124
"Query Results:\n{}",

datafusion-tracing/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ documentation = "https://docs.rs/datafusion-tracing"
3333
homepage = "https://github.com/datafusion-contrib/datafusion-tracing"
3434

3535
[dependencies]
36+
async-trait = { workspace = true }
3637
comfy-table = { version = "7.2" }
3738
datafusion = { workspace = true }
3839
delegate = "0.13"
3940
futures = { workspace = true }
4041
pin-project = "1.1"
42+
similar = { version = "2.7", default-features = false, features = ["text"] }
4143
tracing = { workspace = true }
4244
tracing-futures = { workspace = true }
4345
unicode-width = "0.2"

datafusion-tracing/src/instrument_rule.rs renamed to datafusion-tracing/src/exec_instrument_rule.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
//
1818
// This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc.
1919

20-
use crate::instrumented::InstrumentedExec;
21-
use crate::instrumented::SpanCreateFn;
20+
use crate::instrumented_exec::InstrumentedExec;
21+
use crate::instrumented_exec::SpanCreateFn;
2222
use crate::options::InstrumentationOptions;
2323
use datafusion::common::runtime::{JoinSetTracer, set_join_set_tracer};
2424
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};

datafusion-tracing/src/lib.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
//! prelude::*,
6464
//! };
6565
//! use datafusion_tracing::{
66-
//! instrument_with_info_spans, pretty_format_compact_batch, InstrumentationOptions,
66+
//! instrument_rules_with_info_spans, instrument_with_info_spans,
67+
//! pretty_format_compact_batch, InstrumentationOptions, RuleInstrumentationOptions,
6768
//! };
6869
//! use std::sync::Arc;
6970
//! use tracing::field;
@@ -73,8 +74,8 @@
7374
//! // Initialize tracing subscriber as usual
7475
//! // (See examples/otlp.rs for a complete example).
7576
//!
76-
//! // Set up tracing options (you can customize these).
77-
//! let options = InstrumentationOptions::builder()
77+
//! // Set up execution plan tracing options (you can customize these).
78+
//! let exec_options = InstrumentationOptions::builder()
7879
//! .record_metrics(true)
7980
//! .preview_limit(5)
8081
//! .preview_fn(Arc::new(|batch: &RecordBatch| {
@@ -85,7 +86,7 @@
8586
//! .build();
8687
//!
8788
//! let instrument_rule = instrument_with_info_spans!(
88-
//! options: options,
89+
//! options: exec_options,
8990
//! env = field::Empty,
9091
//! region = field::Empty,
9192
//! );
@@ -95,8 +96,19 @@
9596
//! .with_physical_optimizer_rule(instrument_rule)
9697
//! .build();
9798
//!
99+
//! // Instrument all rules (analyzer, logical optimizer, physical optimizer)
100+
//! // Physical plan creation tracing is automatically enabled when physical_optimizer is set
101+
//! let rule_options = RuleInstrumentationOptions::full().with_plan_diff();
102+
//! let session_state = instrument_rules_with_info_spans!(
103+
//! options: rule_options,
104+
//! state: session_state
105+
//! );
106+
//!
98107
//! let ctx = SessionContext::new_with_state(session_state);
99108
//!
109+
//! // Execute a query - the entire lifecycle is now traced:
110+
//! // SQL Parsing -> Logical Plan -> Analyzer Rules -> Optimizer Rules ->
111+
//! // Physical Plan Creation -> Physical Optimizer Rules -> Execution
100112
//! let results = ctx.sql("SELECT 1").await?.collect().await?;
101113
//! println!(
102114
//! "Query Results:\n{}",
@@ -110,21 +122,33 @@
110122
//! A more complete example can be found in the [examples directory](https://github.com/datafusion-contrib/datafusion-tracing/tree/main/examples).
111123
//!
112124
113-
mod instrument_rule;
114-
mod instrumented;
115-
mod instrumented_macros;
125+
// Execution plan instrumentation (wraps ExecutionPlan nodes with tracing)
126+
mod exec_instrument_macros;
127+
mod exec_instrument_rule;
128+
mod instrumented_exec;
129+
130+
// Rule instrumentation (wraps analyzer/optimizer/physical optimizer rules with tracing)
131+
mod rule_instrumentation;
132+
mod rule_instrumentation_macros;
133+
134+
// Shared utilities
116135
mod metrics;
117136
mod node;
118137
mod options;
138+
mod planner;
119139
mod preview;
120140
mod preview_utils;
141+
mod rule_options;
121142
mod utils;
122143

123144
// Hide implementation details from documentation.
124-
// This function is only public because it needs to be accessed by the macros,
125-
// but it's not intended for direct use by consumers of this crate.
145+
// These functions are only public because they need to be accessed by the macros,
146+
// but they're not intended for direct use by consumers of this crate.
147+
#[doc(hidden)]
148+
pub use exec_instrument_rule::new_instrument_rule;
126149
#[doc(hidden)]
127-
pub use instrument_rule::new_instrument_rule;
150+
pub use rule_instrumentation::instrument_session_state;
128151

129152
pub use options::InstrumentationOptions;
130153
pub use preview_utils::pretty_format_compact_batch;
154+
pub use rule_options::RuleInstrumentationOptions;

datafusion-tracing/src/options.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ pub struct InstrumentationOptions {
4343
pub custom_fields: HashMap<String, String>,
4444
}
4545

46+
impl std::fmt::Debug for InstrumentationOptions {
47+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48+
f.debug_struct("InstrumentationOptions")
49+
.field("record_metrics", &self.record_metrics)
50+
.field("preview_limit", &self.preview_limit)
51+
.field(
52+
"preview_fn",
53+
&if self.preview_fn.is_some() {
54+
"Some(PreviewFn)"
55+
} else {
56+
"None"
57+
},
58+
)
59+
.field("custom_fields", &self.custom_fields)
60+
.finish()
61+
}
62+
}
63+
4664
impl InstrumentationOptions {
4765
/// Creates a new builder for `InstrumentationOptions`.
4866
pub fn builder() -> InstrumentationOptionsBuilder {

datafusion-tracing/src/planner.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
//
18+
// This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc.
19+
20+
use async_trait::async_trait;
21+
use datafusion::common::Result;
22+
use datafusion::execution::SessionStateBuilder;
23+
use datafusion::execution::context::{QueryPlanner, SessionState};
24+
use datafusion::logical_expr::LogicalPlan;
25+
use datafusion::physical_plan::{ExecutionPlan, displayable};
26+
use std::sync::Arc;
27+
use tracing::{Instrument, Level};
28+
29+
/// A `QueryPlanner` that instruments the creation of the physical plan.
30+
///
31+
/// This is automatically applied when instrumenting a `SessionState` with physical
32+
/// optimizer instrumentation enabled (PhaseOnly or Full).
33+
#[derive(Debug)]
34+
pub(crate) struct TracingQueryPlanner {
35+
inner: Arc<dyn QueryPlanner + Send + Sync>,
36+
level: Level,
37+
}
38+
39+
impl TracingQueryPlanner {
40+
/// Create a new `TracingQueryPlanner` that wraps the provided inner planner at a specific level.
41+
fn new_with_level(inner: Arc<dyn QueryPlanner + Send + Sync>, level: Level) -> Self {
42+
Self { inner, level }
43+
}
44+
45+
/// Wraps the query planner of an existing `SessionState` with tracing instrumentation at a specific level.
46+
///
47+
/// This preserves any custom `QueryPlanner` that may already be configured in the state,
48+
/// ensuring that tracing is added as a layer on top of existing functionality.
49+
pub(crate) fn instrument_state_with_level(
50+
state: SessionState,
51+
level: Level,
52+
) -> SessionState {
53+
let current_planner = state.query_planner().clone();
54+
let wrapped_planner = Arc::new(Self::new_with_level(current_planner, level));
55+
56+
SessionStateBuilder::from(state)
57+
.with_query_planner(wrapped_planner)
58+
.build()
59+
}
60+
}
61+
62+
#[async_trait]
63+
impl QueryPlanner for TracingQueryPlanner {
64+
async fn create_physical_plan(
65+
&self,
66+
logical_plan: &LogicalPlan,
67+
session_state: &SessionState,
68+
) -> Result<Arc<dyn ExecutionPlan>> {
69+
let span = match self.level {
70+
Level::TRACE => tracing::trace_span!(
71+
"create_physical_plan",
72+
logical_plan = tracing::field::Empty,
73+
physical_plan = tracing::field::Empty,
74+
error = tracing::field::Empty
75+
),
76+
Level::DEBUG => tracing::debug_span!(
77+
"create_physical_plan",
78+
logical_plan = tracing::field::Empty,
79+
physical_plan = tracing::field::Empty,
80+
error = tracing::field::Empty
81+
),
82+
Level::INFO => tracing::info_span!(
83+
"create_physical_plan",
84+
logical_plan = tracing::field::Empty,
85+
physical_plan = tracing::field::Empty,
86+
error = tracing::field::Empty
87+
),
88+
Level::WARN => tracing::warn_span!(
89+
"create_physical_plan",
90+
logical_plan = tracing::field::Empty,
91+
physical_plan = tracing::field::Empty,
92+
error = tracing::field::Empty
93+
),
94+
Level::ERROR => tracing::error_span!(
95+
"create_physical_plan",
96+
logical_plan = tracing::field::Empty,
97+
physical_plan = tracing::field::Empty,
98+
error = tracing::field::Empty
99+
),
100+
};
101+
102+
// Record the logical plan as a formatted string with schema
103+
let logical_plan_str = logical_plan.display_indent_schema().to_string();
104+
span.record("logical_plan", logical_plan_str.as_str());
105+
106+
let physical_plan = self
107+
.inner
108+
.create_physical_plan(logical_plan, session_state)
109+
.instrument(span.clone())
110+
.await;
111+
112+
match &physical_plan {
113+
Ok(plan) => {
114+
// Record the physical plan as a formatted string
115+
let physical_plan_str =
116+
displayable(plan.as_ref()).indent(true).to_string();
117+
span.record("physical_plan", physical_plan_str.as_str());
118+
}
119+
Err(e) => {
120+
span.record("error", e.to_string().as_str());
121+
}
122+
}
123+
124+
physical_plan
125+
}
126+
}

0 commit comments

Comments
 (0)