Skip to content

Commit f921a87

Browse files
authored
changes to allow nice graphviz of single node plans too (#136)
1 parent 61d2242 commit f921a87

File tree

3 files changed

+85
-48
lines changed

3 files changed

+85
-48
lines changed

src/execution_plans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ mod stage;
44

55
pub use arrow_flight_read::ArrowFlightReadExec;
66
pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec};
7-
pub use stage::{display_stage_graphviz, ExecutionTask, StageExec};
7+
pub use stage::{display_plan_graphviz, ExecutionTask, StageExec};

src/execution_plans/stage.rs

Lines changed: 83 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,13 @@ fn format_tasks_for_partition_isolator(tasks: &[ExecutionTask]) -> String {
411411
const NUM_COLORS: usize = 6;
412412
const COLOR_SCHEME: &str = "spectral6";
413413

414-
pub fn display_stage_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
414+
/// This will render a regular or distributed datafusion plan as
415+
/// Graphviz dot format.
416+
/// You can view them on https://vis-js.com
417+
///
418+
/// Or it is often useful to expertiment with plan output using
419+
/// https://datafusion-fiddle.vercel.app/
420+
pub fn display_plan_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
415421
let mut f = String::new();
416422

417423
writeln!(
@@ -424,47 +430,59 @@ pub fn display_stage_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
424430
COLOR_SCHEME
425431
)?;
426432

427-
// draw all tasks first
428-
plan.apply(|node| {
429-
let stage = node
430-
.as_any()
431-
.downcast_ref::<StageExec>()
432-
.expect("Expected StageExec");
433-
for task in stage.tasks.iter() {
434-
let partition_group = &task.partition_group;
435-
let p = display_single_task(stage, partition_group)?;
436-
writeln!(f, "{}", p)?;
437-
}
438-
Ok(TreeNodeRecursion::Continue)
439-
})?;
440-
441-
// now draw edges between the tasks
433+
if plan.as_any().downcast_ref::<StageExec>().is_some() {
434+
// draw all tasks first
435+
plan.apply(|node| {
436+
let stage = node
437+
.as_any()
438+
.downcast_ref::<StageExec>()
439+
.expect("Expected StageExec");
440+
for task in stage.tasks.iter() {
441+
let partition_group = &task.partition_group;
442+
let p = display_single_task(stage, partition_group)?;
443+
writeln!(f, "{}", p)?;
444+
}
445+
Ok(TreeNodeRecursion::Continue)
446+
})?;
442447

443-
plan.apply(|node| {
444-
let stage = node
445-
.as_any()
446-
.downcast_ref::<StageExec>()
447-
.expect("Expected StageExec");
448+
// now draw edges between the tasks
448449

449-
for child_stage in stage.child_stages_iter() {
450-
for task in stage.tasks.iter() {
451-
for child_task in child_stage.tasks.iter() {
452-
let edges = display_inter_task_edges(stage, task, child_stage, child_task)?;
453-
writeln!(
454-
f,
455-
"// edges from child stage {} task {} to stage {} task {}\n {}",
456-
child_stage.num,
457-
format_pg(&child_task.partition_group),
458-
stage.num,
459-
format_pg(&task.partition_group),
460-
edges
461-
)?;
450+
plan.apply(|node| {
451+
let stage = node
452+
.as_any()
453+
.downcast_ref::<StageExec>()
454+
.expect("Expected StageExec");
455+
456+
for child_stage in stage.child_stages_iter() {
457+
for task in stage.tasks.iter() {
458+
for child_task in child_stage.tasks.iter() {
459+
let edges = display_inter_task_edges(stage, task, child_stage, child_task)?;
460+
writeln!(
461+
f,
462+
"// edges from child stage {} task {} to stage {} task {}\n {}",
463+
child_stage.num,
464+
format_pg(&child_task.partition_group),
465+
stage.num,
466+
format_pg(&task.partition_group),
467+
edges
468+
)?;
469+
}
462470
}
463471
}
464-
}
465472

466-
Ok(TreeNodeRecursion::Continue)
467-
})?;
473+
Ok(TreeNodeRecursion::Continue)
474+
})?;
475+
} else {
476+
// single plan, not a stage tree
477+
writeln!(f, "node[shape=none]")?;
478+
let p = display_plan(
479+
&plan,
480+
&(0..plan.output_partitioning().partition_count()).collect::<Vec<_>>(),
481+
0,
482+
false,
483+
)?;
484+
writeln!(f, "{}", p)?;
485+
}
468486

469487
writeln!(f, "}}")?;
470488

@@ -497,15 +515,34 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
497515
format_pg(partition_group)
498516
)?;
499517

518+
writeln!(
519+
f,
520+
"{}",
521+
display_plan(&stage.plan, partition_group, stage.num, true)?
522+
)?;
523+
writeln!(f, " }}")?;
524+
writeln!(f, " }}")?;
525+
526+
Ok(f)
527+
}
528+
529+
pub fn display_plan(
530+
plan: &Arc<dyn ExecutionPlan>,
531+
partition_group: &[usize],
532+
stage_num: usize,
533+
distributed: bool,
534+
) -> Result<String> {
500535
// draw all plans
501536
// we need to label the nodes including depth to uniquely identify them within this task
502537
// the tree node API provides depth first traversal, but we need breadth to align with
503538
// how we will draw edges below, so we'll do that.
504-
let mut queue = VecDeque::from([&stage.plan]);
539+
let mut queue = VecDeque::from([plan]);
505540
let mut index = 0;
541+
542+
let mut f = String::new();
506543
while let Some(plan) = queue.pop_front() {
507544
index += 1;
508-
let p = display_single_plan(plan.as_ref(), stage.num, partition_group, index)?;
545+
let p = display_single_plan(plan.as_ref(), stage_num, partition_group, index)?;
509546
writeln!(f, "{}", p)?;
510547
for child in plan.children().iter() {
511548
queue.push_back(child);
@@ -518,7 +555,7 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
518555
Option<&'a Arc<dyn ExecutionPlan>>,
519556
usize,
520557
);
521-
let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(&stage.plan, None, 0usize)]);
558+
let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(plan, None, 0usize)]);
522559
let mut found_isolator = false;
523560
index = 0;
524561
while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() {
@@ -533,7 +570,10 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
533570
if let Some(parent) = maybe_parent {
534571
let output_partitions = plan.output_partitioning().partition_count();
535572

536-
let partitions = if plan
573+
let partitions = if !distributed {
574+
#[deny(clippy::if_same_then_else)]
575+
output_partitions
576+
} else if plan
537577
.as_any()
538578
.downcast_ref::<ArrowFlightReadExec>()
539579
.is_some()
@@ -567,12 +607,12 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
567607
f,
568608
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]",
569609
plan.name(),
570-
stage.num,
610+
stage_num,
571611
node_format_pg(partition_group),
572612
index,
573613
i,
574614
parent.name(),
575-
stage.num,
615+
stage_num,
576616
node_format_pg(partition_group),
577617
parent_idx,
578618
i,
@@ -586,9 +626,6 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
586626
queue.push_back((child, Some(plan), index));
587627
}
588628
}
589-
writeln!(f, " }}")?;
590-
writeln!(f, " }}")?;
591-
592629
Ok(f)
593630
}
594631

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
1717
pub use distributed_ext::DistributedExt;
1818
pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
1919
pub use execution_plans::{
20-
display_stage_graphviz, ArrowFlightReadExec, ExecutionTask, PartitionIsolatorExec, StageExec,
20+
display_plan_graphviz, ArrowFlightReadExec, ExecutionTask, PartitionIsolatorExec, StageExec,
2121
};
2222
pub use flight_service::{
2323
ArrowFlightEndpoint, DefaultSessionBuilder, DistributedSessionBuilder,

0 commit comments

Comments
 (0)