Skip to content

Commit 27818a2

Browse files
implement distributed EXPLAIN ANALYZE
This change adds support for displaying a distributed EXPLAIN ANALYZE output with metrics. It updates the TPCH validation tests to assert the EXPLAIN ANALYZE output for each query. I collected the single node results [here](https://github.com/datafusion-contrib/datafusion-distributed/blob/js/full-explain-analyze-rebased-comparison/tests/tpch_validation_test.rs) - using the `output_rows` metric values, we can cross check that the distributed plan metrics are correct. Implemenation notes: - Adds `src/explain.rs` to stores the main entrypoint to rendering the output string - We now re-write the whole plan before rendering using `rewrite_distributed_plan_with_metrics()` which is a new public API that takes an executed `DistributedExec` and wraps each node in the appropriate metrics - A user can call this method on an executed plan and traverse it to collect metrics from particular nodes (This may be hard though because all nodes are wrapped in MetricsWrapperExec...) - Adds a `Option<DisplayCtx>` field to `DistributedExec` to contain extra display settings - We use this to smuggle the information into `display_plan_ascii` because its only relevant in the distributed case - Significantly refactors TaskMetricsRewriter -> StageMetricsWriter. See comment. Informs: #123 Other follow up work: - #185 - #184 - #188 - #189 - #190
1 parent f4b94f0 commit 27818a2

19 files changed

+1669
-193
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,6 @@ mod tests {
658658
let df = ctx.sql(query).await?;
659659

660660
let physical_plan = df.create_physical_plan().await?;
661-
Ok(display_plan_ascii(physical_plan.as_ref()))
661+
Ok(display_plan_ascii(physical_plan.as_ref(), false))
662662
}
663663
}

src/execution_plans/distributed.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ use crate::execution_plans::common::require_one_child;
44
use crate::protobuf::DistributedCodec;
55
use crate::stage::{ExecutionTask, Stage};
66
use datafusion::common::exec_err;
7+
use datafusion::common::internal_datafusion_err;
78
use datafusion::common::tree_node::{Transformed, TreeNode};
9+
use datafusion::error::DataFusionError;
810
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
911
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1012
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1113
use rand::Rng;
1214
use std::any::Any;
1315
use std::fmt::Formatter;
1416
use std::sync::Arc;
17+
use std::sync::Mutex;
1518
use url::Url;
1619

1720
/// [ExecutionPlan] that executes the inner plan in distributed mode.
@@ -20,14 +23,30 @@ use url::Url;
2023
/// channel resolver and assigned to each task in each stage.
2124
/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them
2225
/// over the wire.
23-
#[derive(Debug, Clone)]
26+
#[derive(Debug)]
2427
pub struct DistributedExec {
2528
pub plan: Arc<dyn ExecutionPlan>,
29+
pub prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
2630
}
2731

2832
impl DistributedExec {
2933
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
30-
Self { plan }
34+
Self {
35+
plan,
36+
prepared_plan: Arc::new(Mutex::new(None)),
37+
}
38+
}
39+
40+
/// Returns the plan which is lazily prepared on execute() and actually gets executed.
41+
/// It is updated on every call to execute(). Returns an error if .execute() has not been called.
42+
pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
43+
self.prepared_plan
44+
.lock()
45+
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
46+
.clone()
47+
.ok_or_else(|| {
48+
internal_datafusion_err!("No prepared plan found. Was execute() called?")
49+
})
3150
}
3251

3352
fn prepare_plan(
@@ -98,7 +117,8 @@ impl ExecutionPlan for DistributedExec {
98117
children: Vec<Arc<dyn ExecutionPlan>>,
99118
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
100119
Ok(Arc::new(DistributedExec {
101-
plan: require_one_child(children)?,
120+
plan: require_one_child(&children)?,
121+
prepared_plan: self.prepared_plan.clone(),
102122
}))
103123
}
104124

@@ -120,7 +140,15 @@ impl ExecutionPlan for DistributedExec {
120140
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
121141
let codec = DistributedCodec::new_combined_with_user(context.session_config());
122142

123-
let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
124-
plan.execute(partition, context)
143+
let prepared = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
144+
{
145+
let mut guard = self
146+
.prepared_plan
147+
.lock()
148+
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?;
149+
*guard = Some(prepared.clone());
150+
}
151+
152+
prepared.execute(partition, context)
125153
}
126154
}

src/execution_plans/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::fmt::{Debug, Formatter};
1111
/// A transparent wrapper that delegates all execution to its child but returns custom metrics. This node is invisible during display.
1212
/// The structure of a plan tree is closely tied to the [TaskMetricsRewriter].
1313
pub struct MetricsWrapperExec {
14-
inner: Arc<dyn ExecutionPlan>,
14+
pub(crate) inner: Arc<dyn ExecutionPlan>,
1515
/// metrics for this plan node.
1616
metrics: MetricsSet,
1717
/// children is initially None. When used by the [TaskMetricsRewriter], the children will be updated
@@ -53,7 +53,7 @@ impl ExecutionPlan for MetricsWrapperExec {
5353
}
5454

5555
fn properties(&self) -> &PlanProperties {
56-
unimplemented!("MetricsWrapperExec does not implement properties")
56+
self.inner.properties()
5757
}
5858

5959
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

src/flight_service/do_get.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ impl ArrowFlightEndpoint {
143143
let stream = map_last_stream(stream, move |last| {
144144
if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
145145
task_data_entries.remove(key.clone());
146+
return last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el));
146147
}
147-
last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el))
148+
last
148149
});
149150

150151
Ok(Response::new(Box::pin(stream.map_err(|err| match err {
@@ -164,7 +165,7 @@ fn collect_and_create_metrics_flight_data(
164165
plan: Arc<dyn ExecutionPlan>,
165166
incoming: FlightData,
166167
) -> Result<FlightData, FlightError> {
167-
// Get the metrics for the task executed on this worker. Separately, collect metrics for child tasks.
168+
// Get the metrics for the task executed on this worker + child tasks.
168169
let mut result = TaskMetricsCollector::new()
169170
.collect(plan)
170171
.map_err(|err| FlightError::ProtocolError(err.to_string()))?;

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub use flight_service::{
2727
DistributedSessionBuilderContext, MappedDistributedSessionBuilder,
2828
MappedDistributedSessionBuilderExt,
2929
};
30+
pub use metrics::rewrite_distributed_plan_with_metrics;
3031
pub use stage::{
3132
DistributedTaskContext, ExecutionTask, Stage, display_plan_ascii, display_plan_graphviz,
33+
explain_analyze,
3234
};

src/metrics/metrics_collecting_stream.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ mod tests {
155155
tasks: vec![TaskMetrics {
156156
stage_key: Some(stage_key.clone()),
157157
// use the task number to seed the test metrics set for convenience
158-
metrics: vec![make_test_metrics_set_proto_from_seed(stage_key.task_number)],
158+
metrics: vec![make_test_metrics_set_proto_from_seed(
159+
stage_key.task_number,
160+
4,
161+
)],
159162
}],
160163
})),
161164
})
@@ -200,7 +203,7 @@ mod tests {
200203
assert_eq!(collected_metrics.len(), 1);
201204
assert_eq!(
202205
collected_metrics[0],
203-
make_test_metrics_set_proto_from_seed(stage_key.task_number)
206+
make_test_metrics_set_proto_from_seed(stage_key.task_number, 4)
204207
);
205208
}
206209
}
@@ -209,7 +212,7 @@ mod tests {
209212
async fn test_metrics_collecting_stream_error_missing_stage_key() {
210213
let task_metrics_with_no_stage_key = TaskMetrics {
211214
stage_key: None,
212-
metrics: vec![make_test_metrics_set_proto_from_seed(1)],
215+
metrics: vec![make_test_metrics_set_proto_from_seed(1, 4)],
213216
};
214217

215218
let invalid_app_metadata = FlightAppMetadata {

src/metrics/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ pub(crate) mod proto;
33
mod task_metrics_collector;
44
mod task_metrics_rewriter;
55
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
6-
pub(crate) use task_metrics_collector::TaskMetricsCollector;
6+
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
7+
pub use task_metrics_rewriter::rewrite_distributed_plan_with_metrics;

src/metrics/proto.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ pub struct MetricsSetProto {
2626
pub metrics: Vec<MetricProto>,
2727
}
2828

29+
impl MetricsSetProto {
30+
pub fn new() -> Self {
31+
Self {
32+
metrics: Vec::new(),
33+
}
34+
}
35+
36+
pub fn push(&mut self, metric: MetricProto) {
37+
self.metrics.push(metric)
38+
}
39+
}
40+
2941
/// MetricValueProto is a protobuf mirror of the [datafusion::physical_plan::metrics::MetricValue] enum.
3042
#[derive(Clone, PartialEq, Eq, ::prost::Oneof)]
3143
pub enum MetricValueProto {

src/metrics/task_metrics_collector.rs

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -232,25 +232,7 @@ mod tests {
232232
ctx
233233
}
234234

235-
/// runs a sql query and returns the coordinator StageExec
236-
async fn plan_sql(ctx: &SessionContext, sql: &str) -> DistributedExec {
237-
let df = ctx.sql(sql).await.unwrap();
238-
let physical_distributed = df.create_physical_plan().await.unwrap();
239-
240-
let stage_exec = match physical_distributed
241-
.as_any()
242-
.downcast_ref::<DistributedExec>()
243-
{
244-
Some(stage_exec) => stage_exec.clone(),
245-
None => panic!(
246-
"expected StageExec from distributed optimization, got: {}",
247-
physical_distributed.name()
248-
),
249-
};
250-
stage_exec
251-
}
252-
253-
async fn execute_plan(stage_exec: &DistributedExec, ctx: &SessionContext) {
235+
async fn execute_plan(stage_exec: Arc<dyn ExecutionPlan>, ctx: &SessionContext) {
254236
let task_ctx = ctx.task_ctx();
255237
let stream = stage_exec.execute(0, task_ctx).unwrap();
256238

@@ -270,21 +252,27 @@ mod tests {
270252
async fn run_metrics_collection_e2e_test(sql: &str) {
271253
// Plan and execute the query
272254
let ctx = make_test_ctx().await;
273-
let stage_exec = plan_sql(&ctx, sql).await;
274-
execute_plan(&stage_exec, &ctx).await;
255+
let df = ctx.sql(sql).await.unwrap();
256+
let plan = df.create_physical_plan().await.unwrap();
257+
execute_plan(plan.clone(), &ctx).await;
258+
259+
let dist_exec = plan
260+
.as_any()
261+
.downcast_ref::<DistributedExec>()
262+
.expect("expected DistributedExec");
275263

276264
// Assert to ensure the distributed test case is sufficiently complex.
277-
let (stages, expected_stage_keys) = get_stages_and_stage_keys(&stage_exec);
265+
let (stages, expected_stage_keys) = get_stages_and_stage_keys(dist_exec);
278266
assert!(
279267
expected_stage_keys.len() > 1,
280268
"expected more than 1 stage key in test. the plan was not distributed):\n{}",
281-
DisplayableExecutionPlan::new(&stage_exec).indent(true)
269+
DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
282270
);
283271

284272
// Collect metrics for all tasks from the root StageExec.
285273
let collector = TaskMetricsCollector::new();
286274

287-
let result = collector.collect(stage_exec.plan.clone()).unwrap();
275+
let result = collector.collect(dist_exec.plan.clone()).unwrap();
288276

289277
// Ensure that there's metrics for each node for each task for each stage.
290278
for expected_stage_key in expected_stage_keys {

0 commit comments

Comments
 (0)