Skip to content

Commit 0b95862

Browse files
implement distributed EXPLAIN ANALYZE
This change adds support for displaying a distributed EXPLAIN ANALYZE output. It updates the TPCH validation tests to assert the EXPLAIN ANALYZE output for each query. Implemenation notes: - Adds `src/explain.rs` to stores the main entrypoint to rendering the output string - I left a TODO about pushing some of the work to `DistributedExec` or a new node type - Adds a `Option<DisplayCtx>` field to `DistributedExec` to contain extra information for display purposes. - We use this to smuggle the information into `display_plan_ascii` because its only relevant in the distributed case - Then, at display-time, when displaying a task, we re-write each task plan to use the metrics from the `DislplayCtx` Informs: #123 Remaning work: - disable any metrics propagation if not running EXPLAIN ANALYZE as it adds extra overhead - consider refactoring explain.rs - graphviz - add docs + exalidraw to explain the metrics protocol
1 parent a638e6d commit 0b95862

File tree

10 files changed

+2719
-67
lines changed

10 files changed

+2719
-67
lines changed

src/common/ttl_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ where
289289
mod tests {
290290
use super::*;
291291
use std::sync::atomic::Ordering;
292-
use tokio::time::{Duration, sleep};
292+
use tokio::time::Duration;
293293

294294
#[tokio::test]
295295
async fn test_basic_insert_and_get() {

src/execution_plans/distributed.rs

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
33
use crate::execution_plans::common::require_one_child;
4-
use crate::protobuf::DistributedCodec;
4+
use crate::protobuf::{DistributedCodec, StageKey};
5+
use crate::stage::DisplayCtx;
56
use crate::{ExecutionTask, Stage};
7+
use bytes::Bytes;
68
use datafusion::common::exec_err;
79
use datafusion::common::tree_node::{Transformed, TreeNode};
10+
use datafusion::error::DataFusionError;
811
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
912
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1013
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1114
use rand::Rng;
1215
use std::any::Any;
1316
use std::fmt::Formatter;
1417
use std::sync::Arc;
18+
use std::sync::Mutex;
1519
use url::Url;
1620

17-
/// [ExecutionPlan] that executes the inner plan in distributed mode.
21+
/// [ExecutionPan] that executes the inner plan in distributed mode.
1822
/// Before executing it, two modifications are lazily performed on the plan:
1923
/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the
2024
/// channel resolver and assigned to each task in each stage.
@@ -23,11 +27,47 @@ use url::Url;
2327
#[derive(Debug, Clone)]
2428
pub struct DistributedExec {
2529
pub plan: Arc<dyn ExecutionPlan>,
30+
31+
pub prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
32+
pub display_ctx: Option<DisplayCtx>,
2633
}
2734

2835
impl DistributedExec {
2936
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
30-
Self { plan }
37+
Self {
38+
plan,
39+
prepared_plan: Arc::new(Mutex::new(None)),
40+
display_ctx: None,
41+
}
42+
}
43+
44+
/// Returns a special stage key used to identify the root "stage" of the distributed plan.
45+
/// TODO: reconcile this with display_plan_graphviz
46+
pub(crate) fn to_stage_key(&self) -> StageKey {
47+
StageKey {
48+
query_id: Bytes::new(),
49+
stage_id: 0_u64,
50+
task_number: 0,
51+
}
52+
}
53+
54+
pub(crate) fn with_display_ctx(&self, display_ctx: DisplayCtx) -> Self {
55+
Self {
56+
display_ctx: Some(display_ctx),
57+
..self.clone()
58+
}
59+
}
60+
61+
/// Returns the prepared plan which is lazily prepared on execute(). It is updated on every
62+
/// call. Returns an error if .execute() has not been called.
63+
pub(crate) fn pepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
64+
self.prepared_plan
65+
.lock()
66+
.map_err(|e| DataFusionError::Internal(format!("Failed to lock prepared plan: {}", e)))?
67+
.clone()
68+
.ok_or(DataFusionError::Internal(
69+
"No prepared plan found. Was execute() called?".to_string(),
70+
))
3171
}
3272

3373
fn prepare_plan(
@@ -99,6 +139,8 @@ impl ExecutionPlan for DistributedExec {
99139
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
100140
Ok(Arc::new(DistributedExec {
101141
plan: require_one_child(&children)?,
142+
prepared_plan: self.prepared_plan.clone(),
143+
display_ctx: self.display_ctx.clone(),
102144
}))
103145
}
104146

@@ -120,7 +162,14 @@ impl ExecutionPlan for DistributedExec {
120162
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
121163
let codec = DistributedCodec::new_combined_with_user(context.session_config());
122164

123-
let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
124-
plan.execute(partition, context)
165+
let prepared = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
166+
{
167+
let mut guard = self.prepared_plan.lock().map_err(|e| {
168+
DataFusionError::Internal(format!("Failed to lock prepared plan: {}", e))
169+
})?;
170+
*guard = Some(prepared.clone());
171+
}
172+
173+
prepared.execute(partition, context)
125174
}
126175
}

src/explain.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use crate::display_plan_ascii;
2+
use crate::execution_plans::DistributedExec;
3+
use crate::metrics::MetricsCollectorResult;
4+
use crate::metrics::TaskMetricsCollector;
5+
use crate::metrics::proto::MetricsSetProto;
6+
use crate::metrics::proto::df_metrics_set_to_proto;
7+
use crate::stage::DisplayCtx;
8+
use datafusion::error::DataFusionError;
9+
use datafusion::physical_plan::ExecutionPlan;
10+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
11+
use std::sync::Arc;
12+
13+
/// explain_analyze renders an [ExecutionPlan] with metrics.
14+
pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataFusionError> {
15+
// Check if the plan is distributed by looking for a root DistributedExec.
16+
match executed.as_any().downcast_ref::<DistributedExec>() {
17+
None => Ok(DisplayableExecutionPlan::with_metrics(executed.as_ref())
18+
.indent(true)
19+
.to_string()),
20+
Some(dist_exec) => {
21+
// If the plan was distributed, collect metrics from the coordinating stage exec.
22+
// TODO: Should we move this into the DistributedExec itself or a new ExplainAnalyzeExec?
23+
let MetricsCollectorResult {
24+
task_metrics,
25+
mut input_task_metrics,
26+
} = TaskMetricsCollector::new().collect(dist_exec.pepared_plan()?)?;
27+
28+
input_task_metrics.insert(
29+
dist_exec.to_stage_key(),
30+
task_metrics
31+
.into_iter()
32+
.map(|metrics| df_metrics_set_to_proto(&metrics))
33+
.collect::<Result<Vec<MetricsSetProto>, DataFusionError>>()?,
34+
);
35+
36+
let display_ctx = DisplayCtx::new(input_task_metrics);
37+
Ok(display_plan_ascii(&dist_exec.with_display_ctx(display_ctx)))
38+
}
39+
}
40+
}

src/flight_service/do_get.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ impl ArrowFlightEndpoint {
143143
let stream = map_last_stream(stream, move |last| {
144144
if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
145145
task_data_entries.remove(key.clone());
146+
return last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el));
146147
}
147-
last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el))
148+
last
148149
});
149150

150151
Ok(Response::new(Box::pin(stream.map_err(|err| match err {

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod config_extension_ext;
66
mod distributed_ext;
77
mod distributed_physical_optimizer_rule;
88
mod execution_plans;
9+
mod explain;
910
mod flight_service;
1011
mod metrics;
1112
mod stage;
@@ -20,6 +21,7 @@ pub use distributed_physical_optimizer_rule::{
2021
DistributedPhysicalOptimizerRule, NetworkBoundaryExt,
2122
};
2223
pub use execution_plans::{NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec};
24+
pub use explain::explain_analyze;
2325
pub use flight_service::{
2426
ArrowFlightEndpoint, DefaultSessionBuilder, DistributedSessionBuilder,
2527
DistributedSessionBuilderContext, MappedDistributedSessionBuilder,

src/metrics/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ pub(crate) mod proto;
33
mod task_metrics_collector;
44
mod task_metrics_rewriter;
55
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
6-
pub(crate) use task_metrics_collector::TaskMetricsCollector;
6+
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
7+
pub(crate) use task_metrics_rewriter::TaskMetricsRewriter;

src/metrics/task_metrics_rewriter.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,12 @@ pub struct TaskMetricsRewriter {
3232

3333
impl TaskMetricsRewriter {
3434
/// Create a new TaskMetricsRewriter. The provided metrics will be used to enrich the plan.
35-
#[allow(dead_code)]
3635
pub fn new(metrics: Vec<MetricsSetProto>) -> Self {
3736
Self { metrics, idx: 0 }
3837
}
3938

4039
/// enrich_task_with_metrics rewrites the plan by wrapping nodes. If the length of the provided metrics set vec does not
4140
/// match the number of nodes in the plan, an error will be returned.
42-
#[allow(dead_code)]
4341
pub fn enrich_task_with_metrics(
4442
mut self,
4543
plan: Arc<dyn ExecutionPlan>,

0 commit comments

Comments
 (0)