Skip to content

Commit de97bfa

Browse files
committed
Improve the datafusion explain output
1 parent 797da3f commit de97bfa

File tree

5 files changed

+192
-10
lines changed

5 files changed

+192
-10
lines changed

crates/storage-query-datafusion/src/invocation_state/table.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use anyhow::anyhow;
1616
use datafusion::arrow::datatypes::SchemaRef;
1717
use datafusion::arrow::record_batch::RecordBatch;
1818
use datafusion::common::DataFusionError;
19+
use datafusion::physical_plan::metrics::Time;
1920
use datafusion::physical_plan::stream::RecordBatchReceiverStream;
2021
use datafusion::physical_plan::{PhysicalExpr, SendableRecordBatchStream};
2122
use tokio::sync::mpsc::Sender;
@@ -100,6 +101,7 @@ impl<S: StatusHandle + Send + Sync + Debug + Clone + 'static> ScanPartition for
100101
_predicate: Option<Arc<dyn PhysicalExpr>>,
101102
batch_size: usize,
102103
limit: Option<usize>,
104+
_elapsed_compute: Time,
103105
) -> anyhow::Result<SendableRecordBatchStream> {
104106
let status = self.status_handle.clone();
105107
let partition_store_manager = self.partition_store_manager.clone();

crates/storage-query-datafusion/src/partition_store_scanner.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010

1111
use std::ops::RangeInclusive;
1212
use std::sync::Arc;
13+
use std::time::Instant;
1314
use std::{fmt::Debug, ops::ControlFlow};
1415

1516
use anyhow::anyhow;
1617
use datafusion::arrow::datatypes::SchemaRef;
1718
use datafusion::error::DataFusionError;
1819
use datafusion::execution::SendableRecordBatchStream;
1920
use datafusion::physical_plan::PhysicalExpr;
21+
use datafusion::physical_plan::metrics::Time;
2022
use datafusion::physical_plan::stream::RecordBatchReceiverStream;
2123

2224
use restate_partition_store::{PartitionStore, PartitionStoreManager};
@@ -72,6 +74,7 @@ where
7274
S: ScanLocalPartition<Builder = RB>,
7375
RB: crate::table_util::Builder + Send + Sync + 'static,
7476
{
77+
#[allow(clippy::too_many_arguments)]
7578
fn scan_partition(
7679
&self,
7780
partition_id: PartitionId,
@@ -80,6 +83,7 @@ where
8083
predicate: Option<Arc<dyn PhysicalExpr>>,
8184
batch_size: usize,
8285
limit: Option<usize>,
86+
elapsed_compute: Time,
8387
) -> anyhow::Result<SendableRecordBatchStream> {
8488
let partition_store_manager = self.partition_store_manager.clone();
8589
let mut stream_builder = RecordBatchReceiverStream::builder(projection.clone(), 1);
@@ -94,9 +98,13 @@ where
9498
DataFusionError::External(err.into())
9599
})?;
96100

101+
// timer starts on first row, stops on scanner drop
102+
let mut elapsed_compute = ElapsedCompute::new(elapsed_compute);
103+
97104
let mut batch_sender = BatchSender::new(projection, tx, predicate, batch_size, limit);
98105

99106
S::for_each_row(&partition_store, range, move |row| {
107+
elapsed_compute.start();
100108
match S::append_row(batch_sender.builder_mut(), row) {
101109
Ok(()) => {}
102110
err => return ControlFlow::Break(err),
@@ -127,6 +135,7 @@ where
127135
predicate: Option<Arc<dyn PhysicalExpr>>,
128136
batch_size: usize,
129137
limit: Option<usize>,
138+
elapsed_compute: Time,
130139
) -> anyhow::Result<SendableRecordBatchStream> {
131140
self.scan_partition(
132141
partition_id,
@@ -135,6 +144,30 @@ where
135144
predicate,
136145
batch_size,
137146
limit,
147+
elapsed_compute,
138148
)
139149
}
140150
}
151+
152+
struct ElapsedCompute {
153+
time: Time,
154+
start: Option<Instant>,
155+
}
156+
157+
impl ElapsedCompute {
158+
fn new(time: Time) -> Self {
159+
Self { time, start: None }
160+
}
161+
162+
fn start(&mut self) {
163+
self.start.get_or_insert_with(Instant::now);
164+
}
165+
}
166+
167+
impl Drop for ElapsedCompute {
168+
fn drop(&mut self) {
169+
if let Some(start) = &self.start {
170+
self.time.add_elapsed(*start)
171+
}
172+
}
173+
}

crates/storage-query-datafusion/src/remote_query_scanner_manager.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use datafusion::arrow::datatypes::SchemaRef;
1818
use datafusion::execution::SendableRecordBatchStream;
1919

2020
use datafusion::physical_plan::PhysicalExpr;
21+
use datafusion::physical_plan::metrics::Time;
2122
use restate_core::Metadata;
2223
use restate_core::partitions::PartitionRouting;
2324
use restate_types::NodeId;
@@ -184,6 +185,7 @@ impl ScanPartition for RemotePartitionsScanner {
184185
predicate: Option<Arc<dyn PhysicalExpr>>,
185186
batch_size: usize,
186187
limit: Option<usize>,
188+
elapsed_compute: Time,
187189
) -> anyhow::Result<SendableRecordBatchStream> {
188190
match self.manager.get_partition_target_node(partition_id)? {
189191
PartitionLocation::Local => {
@@ -197,6 +199,7 @@ impl ScanPartition for RemotePartitionsScanner {
197199
predicate,
198200
batch_size,
199201
limit,
202+
elapsed_compute,
200203
)?)
201204
}
202205
PartitionLocation::Remote { node_id } => Ok(remote_scan_as_datafusion_stream(

crates/storage-query-datafusion/src/scanner_task.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use datafusion::arrow::datatypes::SchemaRef;
1616
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1717
use datafusion::physical_expr::expressions::DynamicFilterPhysicalExpr;
1818
use datafusion::physical_plan::PhysicalExpr;
19+
use datafusion::physical_plan::metrics::Time;
1920
use tokio::sync::mpsc;
2021
use tokio_stream::StreamExt as TokioStreamExt;
2122
use tracing::{debug, warn};
@@ -92,6 +93,7 @@ impl ScannerTask {
9293
request
9394
.limit
9495
.map(|limit| usize::try_from(limit).expect("limit to fit in a usize")),
96+
Time::new(),
9597
)?;
9698

9799
let (tx, rx) = mpsc::unbounded_channel();

0 commit comments

Comments
 (0)