Skip to content

Commit 1bdf835

Browse files
wip
1 parent 3fe4b08 commit 1bdf835

File tree

8 files changed

+918
-649
lines changed

8 files changed

+918
-649
lines changed

src/execution_plans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1010
pub use partition_isolator::PartitionIsolatorExec;
1111
pub(crate) use stage::InputStage;
1212
pub use stage::display_plan_graphviz;
13-
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};
13+
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec, DisplayCtx};

src/execution_plans/stage.rs

Lines changed: 150 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::execution_plans::NetworkCoalesceExec;
3+
use crate::metrics::TaskMetricsRewriter;
34
use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec};
45
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
56
use datafusion::error::{DataFusionError, Result};
67
use datafusion::execution::TaskContext;
8+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
79
use datafusion::physical_plan::{
810
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, displayable,
911
};
1012
use datafusion::prelude::SessionContext;
13+
use datafusion::sql::sqlparser::keywords::CHAIN;
1114
use itertools::Itertools;
1215
use rand::Rng;
1316
use std::collections::VecDeque;
@@ -91,6 +94,8 @@ pub struct StageExec {
9194
pub tasks: Vec<ExecutionTask>,
9295
/// tree depth of our location in the stage tree, used for display only
9396
pub depth: usize,
97+
98+
pub display_ctx: Option<DisplayCtx>,
9499
}
95100

96101
/// A [StageExec] that is the input of another [StageExec].
@@ -192,6 +197,7 @@ impl StageExec {
192197
.collect(),
193198
tasks: vec![ExecutionTask { url: None }; n_tasks],
194199
depth: 0,
200+
display_ctx: None,
195201
}
196202
}
197203

@@ -239,6 +245,7 @@ impl StageExec {
239245
inputs: assigned_input_stages,
240246
tasks: assigned_tasks,
241247
depth: self.depth,
248+
display_ctx: self.display_ctx.clone(),
242249
};
243250

244251
Ok(assigned_stage)
@@ -292,9 +299,27 @@ impl ExecutionPlan for StageExec {
292299

293300
fn with_new_children(
294301
self: Arc<Self>,
295-
_children: Vec<Arc<dyn ExecutionPlan>>,
302+
children: Vec<Arc<dyn ExecutionPlan>>,
296303
) -> Result<Arc<dyn ExecutionPlan>> {
297-
plan_err!("with_new_children() not supported for StageExec")
304+
let num_children = children.len();
305+
let child_stage_execs = children
306+
.into_iter()
307+
.filter(|child| child.as_any().downcast_ref::<StageExec>().is_some())
308+
.map(|child| child.as_any().downcast_ref::<StageExec>().unwrap().clone()).collect::<Vec<_>>();
309+
if child_stage_execs.len() != num_children {
310+
return plan_err!("not all children are StageExec");
311+
}
312+
let stage = StageExec{
313+
query_id: self.query_id.clone(),
314+
num: self.num,
315+
name: self.name.clone(),
316+
plan: self.plan.clone(),
317+
inputs: child_stage_execs.into_iter().map(|s| InputStage::Decoded(Arc::new(s))).collect(),
318+
tasks: self.tasks.clone(),
319+
depth: self.depth,
320+
display_ctx: self.display_ctx.clone(),
321+
};
322+
Ok(Arc::new(stage))
298323
}
299324

300325
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
@@ -347,6 +372,10 @@ impl ExecutionPlan for StageExec {
347372
use bytes::Bytes;
348373
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
349374
use datafusion::physical_expr::Partitioning;
375+
use datafusion::common::HashMap;
376+
use crate::metrics::proto::MetricsSetProto;
377+
use crate::protobuf::StageKey;
378+
350379
/// Be able to display a nice tree for stages.
351380
///
352381
/// The challenge to doing this at the moment is that `TreeRenderVistor`
@@ -367,9 +396,29 @@ const LDCORNER: &str = "└"; // Left bottom corner
367396
const VERTICAL: &str = "│"; // Vertical line
368397
const HORIZONTAL: &str = "─"; // Horizontal line
369398

399+
// Context used to display a StageExec, tasks, and plans.
400+
#[derive(Debug, Clone)]
401+
pub struct DisplayCtx {
402+
metrics: Arc<HashMap<StageKey, Vec<MetricsSetProto>>>,
403+
}
404+
405+
impl DisplayCtx {
406+
pub fn new(metrics: HashMap<StageKey, Vec<MetricsSetProto>>) -> Self {
407+
Self {
408+
metrics: Arc::new(metrics),
409+
}
410+
}
411+
}
412+
370413
impl StageExec {
371414
fn format(&self, plan: &dyn ExecutionPlan, indent: usize, f: &mut String) -> std::fmt::Result {
372-
let mut node_str = displayable(plan).one_line().to_string();
415+
// println!("plan {:?}", plan);
416+
let mut node_str = match &self.display_ctx {
417+
None => displayable(plan).one_line().to_string(),
418+
Some(_) => {
419+
DisplayableExecutionPlan::with_metrics(plan).one_line().to_string()
420+
}
421+
};
373422
node_str.pop();
374423
write!(f, "{} {node_str}", " ".repeat(indent))?;
375424

@@ -430,32 +479,90 @@ impl DisplayAs for StageExec {
430479
write!(f, "{}", self.name)
431480
}
432481
DisplayFormatType::Verbose => {
433-
writeln!(
434-
f,
435-
"{}{} {} {}",
436-
LTCORNER,
437-
HORIZONTAL.repeat(5),
438-
self.name,
439-
format_tasks_for_stage(self.tasks.len(), &self.plan)
440-
)?;
482+
match &self.display_ctx {
483+
None => {
484+
writeln!(
485+
f,
486+
"{}{} {} {}",
487+
LTCORNER,
488+
HORIZONTAL.repeat(5),
489+
self.name,
490+
format_tasks_for_stage(self.tasks.len(), &self.plan)
491+
)?;
441492

442-
let mut plan_str = String::new();
443-
self.format(self.plan.as_ref(), 0, &mut plan_str)?;
444-
let plan_str = plan_str
445-
.split('\n')
446-
.filter(|v| !v.is_empty())
447-
.collect::<Vec<_>>()
448-
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
449-
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
450-
write!(
451-
f,
452-
"{}{}{}",
453-
" ".repeat(self.depth),
454-
LDCORNER,
455-
HORIZONTAL.repeat(50)
456-
)?;
493+
let mut plan_str = String::new();
494+
self.format(self.plan.as_ref(), 0, &mut plan_str)?;
495+
let plan_str = plan_str
496+
.split('\n')
497+
.filter(|v| !v.is_empty())
498+
.collect::<Vec<_>>()
499+
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
500+
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
501+
// Add bottom border
502+
write!(
503+
f,
504+
"{}{}{}",
505+
" ".repeat(self.depth),
506+
LDCORNER,
507+
HORIZONTAL.repeat(50)
508+
)?;
457509

458-
Ok(())
510+
Ok(())
511+
}
512+
Some(display_ctx) => {
513+
for (i, _) in self.tasks.iter().enumerate() {
514+
if i > 0 {
515+
writeln!(f)?;
516+
}
517+
writeln!(
518+
f,
519+
"{}{}{}{} {}",
520+
" ".repeat(self.depth),
521+
LTCORNER,
522+
HORIZONTAL.repeat(5),
523+
format!(" {} ", self.name),
524+
format_task_for_stage(i, &self.plan),
525+
)?;
526+
// Uniquely identify the task.
527+
let key = StageKey {
528+
query_id: self.query_id.to_string(),
529+
stage_id: self.num as u64,
530+
task_number: i as u64,
531+
};
532+
533+
let mut plan_str = String::new();
534+
let plan = match display_ctx.metrics.get(&key) {
535+
Some(metrics) => {
536+
let result = TaskMetricsRewriter::new(metrics.to_owned()).enrich_task_with_metrics(self.plan.clone());
537+
if let Err(e) = result {
538+
write!(f, "Error enriching task with metrics: {}", e)?;
539+
return Err(std::fmt::Error);
540+
}
541+
result.unwrap()
542+
}
543+
None => {
544+
self.plan.clone()
545+
}
546+
};
547+
self.format(plan.as_ref(), 0, &mut plan_str)?;
548+
let plan_str = plan_str
549+
.split('\n')
550+
.filter(|v| !v.is_empty())
551+
.collect::<Vec<_>>()
552+
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
553+
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
554+
// Add bottom border
555+
write!(
556+
f,
557+
"{}{}{}",
558+
" ".repeat(self.depth),
559+
LDCORNER,
560+
HORIZONTAL.repeat(50)
561+
)?;
562+
}
563+
return Ok(());
564+
}
565+
}
459566
}
460567
DisplayFormatType::TreeRender => write!(
461568
f,
@@ -483,6 +590,22 @@ fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> Stri
483590
result
484591
}
485592

593+
fn format_task_for_stage(task_number: usize, head: &Arc<dyn ExecutionPlan>) -> String {
594+
let partitioning = head.properties().output_partitioning();
595+
let input_partitions = partitioning.partition_count();
596+
let hash_shuffle = matches!(partitioning, Partitioning::Hash(_, _));
597+
let off = task_number * if hash_shuffle { 0 } else { input_partitions };
598+
599+
let mut result = "Task ".to_string();
600+
result += &format!("t{task_number}:[");
601+
result += &(off..(off + input_partitions))
602+
.map(|v| format!("p{v}"))
603+
.join(",");
604+
result += "] ";
605+
606+
result
607+
}
608+
486609
fn format_tasks_for_partition_isolator(
487610
isolator: &PartitionIsolatorExec,
488611
tasks: &[ExecutionTask],

src/explain.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
2+
use crate::execution_plans::{DisplayCtx, StageExec};
3+
use crate::metrics::proto::df_metrics_set_to_proto;
4+
use crate::protobuf::StageKey;
5+
use std::sync::Arc;
6+
use datafusion::physical_plan::ExecutionPlan;
7+
use crate::metrics::TaskMetricsCollector;
8+
use datafusion::error::DataFusionError;
9+
use crate::metrics::MetricsCollectorResult;
10+
use crate::metrics::proto::MetricsSetProto;
11+
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
12+
use datafusion::common::tree_node::Transformed;
13+
use datafusion::common::tree_node::TreeNodeRecursion;
14+
15+
pub struct DisplayCtxReWriter {
16+
display_ctx: DisplayCtx,
17+
}
18+
19+
impl DisplayCtxReWriter {
20+
/// Create a new TaskMetricsRewriter. The provided metrics will be used to enrich the plan.
21+
pub fn new(display_ctx: DisplayCtx) -> Self {
22+
Self { display_ctx }
23+
}
24+
25+
/// populate injects the display context into the [StageExec] nodes in the plan.
26+
pub fn rewrite(
27+
mut self,
28+
plan: Arc<dyn ExecutionPlan>,
29+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
30+
let transformed = plan.rewrite(&mut self)?;
31+
Ok(transformed.data)
32+
}
33+
}
34+
35+
impl TreeNodeRewriter for DisplayCtxReWriter {
36+
type Node = Arc<dyn ExecutionPlan>;
37+
38+
fn f_down(&mut self, plan: Self::Node) -> Result<Transformed<Self::Node>, DataFusionError> {
39+
match plan.as_any().downcast_ref::<StageExec>() {
40+
Some(stage_exec) => {
41+
let mut copy = stage_exec.clone();
42+
copy.display_ctx = Some(self.display_ctx.clone());
43+
Ok(Transformed::new(Arc::new(copy), true, TreeNodeRecursion::Continue))
44+
},
45+
None => Err(DataFusionError::Internal("expected stage exec".to_string())),
46+
}
47+
}
48+
}
49+
50+
51+
pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataFusionError> {
52+
let plan = match executed.as_any().downcast_ref::<StageExec>() {
53+
None => executed,
54+
Some(stage_exec) => {
55+
let MetricsCollectorResult{task_metrics, mut input_task_metrics} = TaskMetricsCollector::new()
56+
.collect(stage_exec.plan.clone())?;
57+
input_task_metrics.insert(StageKey{
58+
query_id: stage_exec.query_id.to_string(),
59+
stage_id: stage_exec.num as u64,
60+
task_number: 0,
61+
}, task_metrics.into_iter()
62+
.map(|metrics| df_metrics_set_to_proto(&metrics))
63+
.collect::<Result<Vec<MetricsSetProto>, DataFusionError>>()?);
64+
65+
let display_ctx = DisplayCtx::new(input_task_metrics);
66+
DisplayCtxReWriter::new(display_ctx).rewrite(executed.clone())?
67+
},
68+
};
69+
70+
Ok(DisplayableExecutionPlan::new(plan.as_ref()).indent(true).to_string())
71+
}
72+

src/flight_service/do_get.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ mod tests {
324324
inputs: vec![],
325325
tasks,
326326
depth: 0,
327+
display_ctx: None,
327328
};
328329

329330
let task_keys = [

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod distributed_physical_optimizer_rule;
88
mod execution_plans;
99
mod flight_service;
1010
mod metrics;
11+
mod explain;
1112

1213
mod protobuf;
1314
#[cfg(any(feature = "integration", test))]
@@ -26,3 +27,4 @@ pub use flight_service::{
2627
DistributedSessionBuilderContext, MappedDistributedSessionBuilder,
2728
MappedDistributedSessionBuilderExt,
2829
};
30+
pub use explain::explain_analyze;

src/metrics/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@ pub(crate) mod proto;
33
mod task_metrics_collector;
44
mod task_metrics_rewriter;
55
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
6-
pub(crate) use task_metrics_collector::TaskMetricsCollector;
6+
pub(crate) use task_metrics_collector::{TaskMetricsCollector, MetricsCollectorResult};
7+
pub(crate) use task_metrics_rewriter::TaskMetricsRewriter;
8+

src/protobuf/stage_proto.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ pub(crate) fn stage_from_proto(
207207
inputs,
208208
tasks: decode_tasks(msg.tasks)?,
209209
depth: 0,
210+
display_ctx: None,
210211
})
211212
}
212213

@@ -273,6 +274,7 @@ mod tests {
273274
inputs: vec![],
274275
tasks: vec![],
275276
depth: 0,
277+
display_ctx: None,
276278
};
277279

278280
// Convert to proto message

0 commit comments

Comments
 (0)