Skip to content

Commit fff6457

Browse files
committed
chore(cube): Cube tracing instrumentation
1 parent bbcc720 commit fff6457

File tree

13 files changed

+42
-10
lines changed

13 files changed

+42
-10
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", branch = "cub
141141
tempfile = "3"
142142
thiserror = "1.0.44"
143143
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
144+
tracing = "0.1.25"
145+
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
144146
url = "2.2"
145147

146148
[profile.release]

datafusion/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ url = { workspace = true }
137137
uuid = { version = "1.7", features = ["v4"] }
138138
xz2 = { version = "0.1", optional = true, features = ["static"] }
139139
zstd = { version = "0.13", optional = true, default-features = false }
140+
tracing = { workspace = true }
141+
tracing-futures = { workspace = true }
140142

141143
[dev-dependencies]
142144
arrow-buffer = { workspace = true }

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub use metrics::ParquetFileMetrics;
6767
use opener::ParquetOpener;
6868
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
6969
pub use writer::plan_to_parquet;
70+
use tracing_futures::Instrument;
7071

7172
/// Execution plan for reading one or more Parquet files.
7273
///
@@ -740,7 +741,7 @@ impl ExecutionPlan for ParquetExec {
740741
let stream =
741742
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?;
742743

743-
Ok(Box::pin(stream))
744+
Ok(Box::pin(stream.instrument(tracing::trace_span!("read_files"))))
744745
}
745746

746747
fn metrics(&self) -> Option<MetricsSet> {

datafusion/execution/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,7 @@ parking_lot = { workspace = true }
4949
rand = { workspace = true }
5050
tempfile = { workspace = true }
5151
url = { workspace = true }
52+
tracing = { workspace = true }
53+
tracing-futures = { workspace = true }
54+
55+

datafusion/execution/src/stream.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,12 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
3131

3232
/// Trait for a [`Stream`] of [`RecordBatch`]es
3333
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
34+
35+
// Cube extension
36+
impl<T> RecordBatchStream for tracing_futures::Instrumented<T>
37+
where T: Stream<Item = Result<RecordBatch>>,
38+
T: RecordBatchStream {
39+
fn schema(&self) -> SchemaRef {
40+
self.inner().schema()
41+
}
42+
}

datafusion/physical-plan/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ pin-project-lite = "^0.2.7"
6767
rand = { workspace = true }
6868
tokio = { workspace = true }
6969
serde = { version = "1.0.214", features = ["derive"] }
70-
tracing = "0.1.25"
71-
tracing-futures = { version = "0.2.5" }
70+
tracing = { workspace = true }
71+
tracing-futures = { workspace = true }
7272

7373
[dev-dependencies]
7474
rstest = { workspace = true }

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use datafusion_physical_expr::{
4949

5050
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
5151
use itertools::Itertools;
52+
use tracing_futures::Instrument;
5253

5354
pub mod group_values;
5455
mod no_grouping;
@@ -248,9 +249,9 @@ enum StreamType {
248249
impl From<StreamType> for SendableRecordBatchStream {
249250
fn from(stream: StreamType) -> Self {
250251
match stream {
251-
StreamType::AggregateStream(stream) => Box::pin(stream),
252-
StreamType::GroupedHash(stream) => Box::pin(stream),
253-
StreamType::GroupedPriorityQueue(stream) => Box::pin(stream),
252+
StreamType::AggregateStream(stream) => Box::pin(stream.instrument(tracing::trace_span!("AggregateStream"))),
253+
StreamType::GroupedHash(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedHashAggregateStream"))),
254+
StreamType::GroupedPriorityQueue(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedTopKAggregateStream"))),
254255
}
255256
}
256257
}

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortE
4040
use datafusion_physical_expr_common::sort_expr::LexRequirement;
4141

4242
use crate::coalesce_partitions::CoalescePartitionsExec;
43+
use crate::cube_ext;
4344
use crate::display::DisplayableExecutionPlan;
4445
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
4546
pub use crate::metrics::Metric;
@@ -738,7 +739,7 @@ pub async fn collect_partitioned(
738739
let mut join_set = JoinSet::new();
739740
// Execute the plan and collect the results into batches.
740741
streams.into_iter().enumerate().for_each(|(idx, stream)| {
741-
join_set.spawn(async move {
742+
cube_ext::spawn_on_joinset(&mut join_set, async move {
742743
let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
743744
(idx, result)
744745
});

datafusion/physical-plan/src/filter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ struct FilterExecStream {
426426
projection: Option<Vec<usize>>,
427427
}
428428

429+
#[tracing::instrument(level = "trace", skip(batch))]
429430
pub fn batch_filter(
430431
batch: &RecordBatch,
431432
predicate: &Arc<dyn PhysicalExpr>,

datafusion/physical-plan/src/limit.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ impl LimitStream {
419419
self.fetch = 0;
420420
self.input = None; // clear input so it can be dropped early
421421

422+
let span = tracing::trace_span!("truncate_batch"); // TODO upgrade DF: We could use the baseline_metrics values.
423+
let _guard = span.enter();
422424
// It is guaranteed that batch_rows is <= batch.num_rows
423425
Some(batch.slice(0, batch_rows))
424426
} else {

0 commit comments

Comments
 (0)