Skip to content

Commit fba488c

Browse files
committed
chore(cube): Cube tracing instrumentation
1 parent 6003159 commit fba488c

File tree

13 files changed

+40
-8
lines changed

13 files changed

+40
-8
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", branch = "cub
156156
tempfile = "3"
157157
tokio = { version = "1.43", features = ["macros", "rt", "sync"] }
158158
url = "2.5.4"
159+
tracing = "0.1.25"
160+
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
159161

160162
[profile.release]
161163
codegen-units = 1

datafusion/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ url = { workspace = true }
133133
uuid = { version = "1.15", features = ["v4", "js"] }
134134
xz2 = { version = "0.1", optional = true, features = ["static"] }
135135
zstd = { version = "0.13", optional = true, default-features = false }
136+
tracing = { workspace = true }
137+
tracing-futures = { workspace = true }
136138

137139
[dev-dependencies]
138140
async-trait = { workspace = true }

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub use row_filter::build_row_filter;
6565
pub use row_filter::can_expr_be_pushed_down_with_schemas;
6666
pub use row_group_filter::RowGroupAccessPlanFilter;
6767
pub use writer::plan_to_parquet;
68+
use tracing_futures::Instrument;
6869

6970
use log::debug;
7071

@@ -525,6 +526,8 @@ impl ExecutionPlan for ParquetExec {
525526
// TODO upgrade DF: we need reader_options_customizer used as in the commented code below.
526527
self.inner.execute(partition_index, ctx)
527528

529+
// TODO upgrade DF: We also want Ok(Box::pin(stream.instrument(tracing::trace_span!("read_files")))) applied, probably in inner.
530+
528531
// let reader_options_customizer =
529532
// get_reader_options_customizer(ctx.session_config());
530533

datafusion/execution/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ parking_lot = { workspace = true }
4949
rand = { workspace = true }
5050
tempfile = { workspace = true }
5151
url = { workspace = true }
52+
tracing = { workspace = true }
53+
tracing-futures = { workspace = true }
5254

5355
[dev-dependencies]
5456
chrono = { workspace = true }

datafusion/execution/src/stream.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,12 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
5858
/// [`Stream`]s there is no mechanism to prevent callers polling so returning
5959
/// `Ready(None)` is recommended.
6060
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
61+
62+
// Cube extension
63+
impl<T> RecordBatchStream for tracing_futures::Instrumented<T>
64+
where T: Stream<Item = Result<RecordBatch>>,
65+
T: RecordBatchStream {
66+
fn schema(&self) -> SchemaRef {
67+
self.inner().schema()
68+
}
69+
}

datafusion/physical-plan/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ parking_lot = { workspace = true }
6464
pin-project-lite = "^0.2.7"
6565
tokio = { workspace = true }
6666
serde = { version = "1.0.214", features = ["derive"] }
67-
tracing = "0.1.25"
68-
tracing-futures = { version = "0.2.5" }
67+
tracing = { workspace = true }
68+
tracing-futures = { workspace = true }
6969

7070
[dev-dependencies]
7171
criterion = { workspace = true, features = ["async_futures"] }

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use datafusion_physical_expr::{
4949
};
5050

5151
use itertools::Itertools;
52+
use tracing_futures::Instrument;
5253

5354
pub(crate) mod group_values;
5455
mod no_grouping;
@@ -338,9 +339,9 @@ enum StreamType {
338339
impl From<StreamType> for SendableRecordBatchStream {
339340
fn from(stream: StreamType) -> Self {
340341
match stream {
341-
StreamType::AggregateStream(stream) => Box::pin(stream),
342-
StreamType::GroupedHash(stream) => Box::pin(stream),
343-
StreamType::GroupedPriorityQueue(stream) => Box::pin(stream),
342+
StreamType::AggregateStream(stream) => Box::pin(stream.instrument(tracing::trace_span!("AggregateStream"))),
343+
StreamType::GroupedHash(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedHashAggregateStream"))),
344+
StreamType::GroupedPriorityQueue(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedTopKAggregateStream"))),
344345
}
345346
}
346347
}

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use std::fmt::Debug;
3535
use std::sync::Arc;
3636

3737
use crate::coalesce_partitions::CoalescePartitionsExec;
38+
use crate::cube_ext;
3839
use crate::display::DisplayableExecutionPlan;
3940
use crate::metrics::MetricsSet;
4041
use crate::projection::ProjectionExec;
@@ -884,7 +885,7 @@ pub async fn collect_partitioned(
884885
let mut join_set = JoinSet::new();
885886
// Execute the plan and collect the results into batches.
886887
streams.into_iter().enumerate().for_each(|(idx, stream)| {
887-
join_set.spawn(async move {
888+
cube_ext::spawn_on_joinset(&mut join_set, async move {
888889
let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
889890
(idx, result)
890891
});

datafusion/physical-plan/src/filter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ struct FilterExecStream {
500500
projection: Option<Vec<usize>>,
501501
}
502502

503+
#[tracing::instrument(level = "trace", skip(batch))]
503504
pub fn batch_filter(
504505
batch: &RecordBatch,
505506
predicate: &Arc<dyn PhysicalExpr>,

datafusion/physical-plan/src/limit.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,8 @@ impl LimitStream {
428428
self.fetch = 0;
429429
self.input = None; // Clear input so it can be dropped early
430430

431+
let span = tracing::trace_span!("truncate_batch"); // TODO upgrade DF: We could use the baseline_metrics values.
432+
let _guard = span.enter();
431433
// It is guaranteed that batch_rows is <= batch.num_rows
432434
Some(batch.slice(0, batch_rows))
433435
} else {

0 commit comments

Comments
 (0)