diff --git a/src/execution_plans/mod.rs b/src/execution_plans/mod.rs index 7cd5a5d..c8ff514 100644 --- a/src/execution_plans/mod.rs +++ b/src/execution_plans/mod.rs @@ -4,4 +4,4 @@ mod stage; pub use arrow_flight_read::ArrowFlightReadExec; pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec}; -pub use stage::{display_stage_graphviz, ExecutionTask, StageExec}; +pub use stage::{display_plan_graphviz, ExecutionTask, StageExec}; diff --git a/src/execution_plans/stage.rs b/src/execution_plans/stage.rs index a032bad..18ce8f7 100644 --- a/src/execution_plans/stage.rs +++ b/src/execution_plans/stage.rs @@ -411,7 +411,13 @@ fn format_tasks_for_partition_isolator(tasks: &[ExecutionTask]) -> String { const NUM_COLORS: usize = 6; const COLOR_SCHEME: &str = "spectral6"; -pub fn display_stage_graphviz(plan: Arc) -> Result { +/// This will render a regular or distributed datafusion plan as +/// Graphviz dot format. +/// You can view them on https://vis-js.com +/// +/// Or it is often useful to expertiment with plan output using +/// https://datafusion-fiddle.vercel.app/ +pub fn display_plan_graphviz(plan: Arc) -> Result { let mut f = String::new(); writeln!( @@ -424,47 +430,59 @@ pub fn display_stage_graphviz(plan: Arc) -> Result { COLOR_SCHEME )?; - // draw all tasks first - plan.apply(|node| { - let stage = node - .as_any() - .downcast_ref::() - .expect("Expected StageExec"); - for task in stage.tasks.iter() { - let partition_group = &task.partition_group; - let p = display_single_task(stage, partition_group)?; - writeln!(f, "{}", p)?; - } - Ok(TreeNodeRecursion::Continue) - })?; - - // now draw edges between the tasks + if plan.as_any().downcast_ref::().is_some() { + // draw all tasks first + plan.apply(|node| { + let stage = node + .as_any() + .downcast_ref::() + .expect("Expected StageExec"); + for task in stage.tasks.iter() { + let partition_group = &task.partition_group; + let p = display_single_task(stage, partition_group)?; + writeln!(f, "{}", p)?; + } + Ok(TreeNodeRecursion::Continue) + })?; - plan.apply(|node| { - let stage = node - .as_any() - .downcast_ref::() - .expect("Expected StageExec"); + // now draw edges between the tasks - for child_stage in stage.child_stages_iter() { - for task in stage.tasks.iter() { - for child_task in child_stage.tasks.iter() { - let edges = display_inter_task_edges(stage, task, child_stage, child_task)?; - writeln!( - f, - "// edges from child stage {} task {} to stage {} task {}\n {}", - child_stage.num, - format_pg(&child_task.partition_group), - stage.num, - format_pg(&task.partition_group), - edges - )?; + plan.apply(|node| { + let stage = node + .as_any() + .downcast_ref::() + .expect("Expected StageExec"); + + for child_stage in stage.child_stages_iter() { + for task in stage.tasks.iter() { + for child_task in child_stage.tasks.iter() { + let edges = display_inter_task_edges(stage, task, child_stage, child_task)?; + writeln!( + f, + "// edges from child stage {} task {} to stage {} task {}\n {}", + child_stage.num, + format_pg(&child_task.partition_group), + stage.num, + format_pg(&task.partition_group), + edges + )?; + } } } - } - Ok(TreeNodeRecursion::Continue) - })?; + Ok(TreeNodeRecursion::Continue) + })?; + } else { + // single plan, not a stage tree + writeln!(f, "node[shape=none]")?; + let p = display_plan( + &plan, + &(0..plan.output_partitioning().partition_count()).collect::>(), + 0, + false, + )?; + writeln!(f, "{}", p)?; + } writeln!(f, "}}")?; @@ -497,15 +515,34 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu format_pg(partition_group) )?; + writeln!( + f, + "{}", + display_plan(&stage.plan, partition_group, stage.num, true)? + )?; + writeln!(f, " }}")?; + writeln!(f, " }}")?; + + Ok(f) +} + +pub fn display_plan( + plan: &Arc, + partition_group: &[usize], + stage_num: usize, + distributed: bool, +) -> Result { // draw all plans // we need to label the nodes including depth to uniquely identify them within this task // the tree node API provides depth first traversal, but we need breadth to align with // how we will draw edges below, so we'll do that. - let mut queue = VecDeque::from([&stage.plan]); + let mut queue = VecDeque::from([plan]); let mut index = 0; + + let mut f = String::new(); while let Some(plan) = queue.pop_front() { index += 1; - let p = display_single_plan(plan.as_ref(), stage.num, partition_group, index)?; + let p = display_single_plan(plan.as_ref(), stage_num, partition_group, index)?; writeln!(f, "{}", p)?; for child in plan.children().iter() { queue.push_back(child); @@ -518,7 +555,7 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu Option<&'a Arc>, usize, ); - let mut queue: VecDeque = VecDeque::from([(&stage.plan, None, 0usize)]); + let mut queue: VecDeque = VecDeque::from([(plan, None, 0usize)]); let mut found_isolator = false; index = 0; while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() { @@ -533,7 +570,10 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu if let Some(parent) = maybe_parent { let output_partitions = plan.output_partitioning().partition_count(); - let partitions = if plan + let partitions = if !distributed { + #[deny(clippy::if_same_then_else)] + output_partitions + } else if plan .as_any() .downcast_ref::() .is_some() @@ -567,12 +607,12 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu f, " {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]", plan.name(), - stage.num, + stage_num, node_format_pg(partition_group), index, i, parent.name(), - stage.num, + stage_num, node_format_pg(partition_group), parent_idx, i, @@ -586,9 +626,6 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu queue.push_back((child, Some(plan), index)); } } - writeln!(f, " }}")?; - writeln!(f, " }}")?; - Ok(f) } diff --git a/src/lib.rs b/src/lib.rs index dd6b8ca..ccefea8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver}; pub use distributed_ext::DistributedExt; pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule; pub use execution_plans::{ - display_stage_graphviz, ArrowFlightReadExec, ExecutionTask, PartitionIsolatorExec, StageExec, + display_plan_graphviz, ArrowFlightReadExec, ExecutionTask, PartitionIsolatorExec, StageExec, }; pub use flight_service::{ ArrowFlightEndpoint, DefaultSessionBuilder, DistributedSessionBuilder,