Skip to content

Commit 4acc60b

Browse files
fvaleyeion-elgreco
andauthored
feat(tracing): add tracing spans to all I/O sections (#3795)
# Description To better understand performance in the `delta-rs` crate, I added additional tracing to capture more detailed debug-level performance information. Python now uses `OpenTelemetry` to collect tracing data emitted from Rust. With this change, we gain true end-to-end visibility: Python spans can serve as parents of Rust spans (and vice versa), ensuring a continuous trace across both runtimes. # Related Issue(s) - close #3641 # Documentation - https://docs.rs/tracing/latest/tracing/ --------- Signed-off-by: Florian Valeye <[email protected]> Co-authored-by: Ion Koutsouris <[email protected]>
1 parent acd75d6 commit 4acc60b

File tree

24 files changed

+1039
-393
lines changed

24 files changed

+1039
-393
lines changed

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ uuid = { version = "1" }
7272
async-trait = { version = "0.1" }
7373
futures = { version = "0.3" }
7474
tokio = { version = "1" }
75+
76+
# opentelemetry
77+
tracing-opentelemetry = { version = "0.32" }
78+
opentelemetry = { version = "0.31" }
79+
opentelemetry-otlp = { version = "0.31", features = ["http-proto"] }
80+
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
81+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
7582
num_cpus = { version = "1" }
7683

7784
[workspace.metadata.typos]

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ rstest = { version = "0.26.1" }
100100
serial_test = "3"
101101
tempfile = "3"
102102
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
103+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
103104

104105
[features]
105106
default = ["rustls"]

crates/core/src/delta_datafusion/find_files.rs

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use datafusion::physical_plan::filter::FilterExec;
1313
use datafusion::physical_plan::limit::LocalLimitExec;
1414
use datafusion::physical_plan::ExecutionPlan;
1515
use itertools::Itertools;
16+
use tracing::*;
1617

1718
use crate::delta_datafusion::{
1819
df_logical_schema, get_path_column, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN,
@@ -31,6 +32,15 @@ pub(crate) struct FindFiles {
3132
}
3233

3334
/// Finds files in a snapshot that match the provided predicate.
35+
#[instrument(
36+
skip_all,
37+
fields(
38+
version = snapshot.version(),
39+
has_predicate = predicate.is_some(),
40+
partition_scan = field::Empty,
41+
candidate_count = field::Empty
42+
)
43+
)]
3444
pub(crate) async fn find_files(
3545
snapshot: &EagerSnapshot,
3646
log_store: LogStoreRef,
@@ -53,24 +63,35 @@ pub(crate) async fn find_files(
5363

5464
if expr_properties.partition_only {
5565
let candidates = scan_memory_table(snapshot, predicate).await?;
56-
Ok(FindFiles {
66+
let result = FindFiles {
5767
candidates,
5868
partition_scan: true,
59-
})
69+
};
70+
Span::current().record("partition_scan", result.partition_scan);
71+
Span::current().record("candidate_count", result.candidates.len());
72+
Ok(result)
6073
} else {
6174
let candidates =
6275
find_files_scan(snapshot, log_store, session, predicate.to_owned()).await?;
6376

64-
Ok(FindFiles {
77+
let result = FindFiles {
6578
candidates,
6679
partition_scan: false,
67-
})
80+
};
81+
Span::current().record("partition_scan", result.partition_scan);
82+
Span::current().record("candidate_count", result.candidates.len());
83+
Ok(result)
6884
}
6985
}
70-
None => Ok(FindFiles {
71-
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
72-
partition_scan: true,
73-
}),
86+
None => {
87+
let result = FindFiles {
88+
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
89+
partition_scan: true,
90+
};
91+
Span::current().record("partition_scan", result.partition_scan);
92+
Span::current().record("candidate_count", result.candidates.len());
93+
Ok(result)
94+
}
7495
}
7596
}
7697

@@ -188,6 +209,14 @@ fn join_batches_with_add_actions(
188209
}
189210

190211
/// Determine which files contain a record that satisfies the predicate
212+
#[instrument(
213+
skip_all,
214+
fields(
215+
version = snapshot.version(),
216+
total_files = field::Empty,
217+
matching_files = field::Empty
218+
)
219+
)]
191220
async fn find_files_scan(
192221
snapshot: &EagerSnapshot,
193222
log_store: LogStoreRef,
@@ -204,6 +233,8 @@ async fn find_files_scan(
204233
})
205234
.collect();
206235

236+
Span::current().record("total_files", candidate_map.len());
237+
207238
let scan_config = DeltaScanConfigBuilder::default()
208239
.with_file_column(true)
209240
.build(snapshot)?;
@@ -240,12 +271,15 @@ async fn find_files_scan(
240271
let task_ctx = Arc::new(TaskContext::from(session));
241272
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
242273

243-
join_batches_with_add_actions(
274+
let result = join_batches_with_add_actions(
244275
path_batches,
245276
candidate_map,
246277
config.file_column_name.as_ref().unwrap(),
247278
true,
248-
)
279+
)?;
280+
281+
Span::current().record("matching_files", result.len());
282+
Ok(result)
249283
}
250284

251285
async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaResult<Vec<Add>> {

0 commit comments

Comments
 (0)