Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ mod stage;

pub use arrow_flight_read::ArrowFlightReadExec;
pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec};
pub use stage::{display_stage_graphviz, ExecutionTask, StageExec};
pub use stage::{display_plan_graphviz, ExecutionTask, StageExec};
129 changes: 83 additions & 46 deletions src/execution_plans/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,13 @@ fn format_tasks_for_partition_isolator(tasks: &[ExecutionTask]) -> String {
const NUM_COLORS: usize = 6;
const COLOR_SCHEME: &str = "spectral6";

pub fn display_stage_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
/// This will render a regular or distributed datafusion plan as
/// Graphviz dot format.
/// You can view them on https://vis-js.com
///
/// Or it is often useful to expertiment with plan output using
/// https://datafusion-fiddle.vercel.app/
pub fn display_plan_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
let mut f = String::new();

writeln!(
Expand All @@ -424,47 +430,59 @@ pub fn display_stage_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
COLOR_SCHEME
)?;

// draw all tasks first
plan.apply(|node| {
let stage = node
.as_any()
.downcast_ref::<StageExec>()
.expect("Expected StageExec");
for task in stage.tasks.iter() {
let partition_group = &task.partition_group;
let p = display_single_task(stage, partition_group)?;
writeln!(f, "{}", p)?;
}
Ok(TreeNodeRecursion::Continue)
})?;

// now draw edges between the tasks
if plan.as_any().downcast_ref::<StageExec>().is_some() {
// draw all tasks first
plan.apply(|node| {
let stage = node
.as_any()
.downcast_ref::<StageExec>()
.expect("Expected StageExec");
for task in stage.tasks.iter() {
let partition_group = &task.partition_group;
let p = display_single_task(stage, partition_group)?;
writeln!(f, "{}", p)?;
}
Ok(TreeNodeRecursion::Continue)
})?;

plan.apply(|node| {
let stage = node
.as_any()
.downcast_ref::<StageExec>()
.expect("Expected StageExec");
// now draw edges between the tasks

for child_stage in stage.child_stages_iter() {
for task in stage.tasks.iter() {
for child_task in child_stage.tasks.iter() {
let edges = display_inter_task_edges(stage, task, child_stage, child_task)?;
writeln!(
f,
"// edges from child stage {} task {} to stage {} task {}\n {}",
child_stage.num,
format_pg(&child_task.partition_group),
stage.num,
format_pg(&task.partition_group),
edges
)?;
plan.apply(|node| {
let stage = node
.as_any()
.downcast_ref::<StageExec>()
.expect("Expected StageExec");

for child_stage in stage.child_stages_iter() {
for task in stage.tasks.iter() {
for child_task in child_stage.tasks.iter() {
let edges = display_inter_task_edges(stage, task, child_stage, child_task)?;
writeln!(
f,
"// edges from child stage {} task {} to stage {} task {}\n {}",
child_stage.num,
format_pg(&child_task.partition_group),
stage.num,
format_pg(&task.partition_group),
edges
)?;
}
}
}
}

Ok(TreeNodeRecursion::Continue)
})?;
Ok(TreeNodeRecursion::Continue)
})?;
} else {
// single plan, not a stage tree
writeln!(f, "node[shape=none]")?;
let p = display_plan(
&plan,
&(0..plan.output_partitioning().partition_count()).collect::<Vec<_>>(),
0,
false,
)?;
writeln!(f, "{}", p)?;
}

writeln!(f, "}}")?;

Expand Down Expand Up @@ -497,15 +515,34 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
format_pg(partition_group)
)?;

writeln!(
f,
"{}",
display_plan(&stage.plan, partition_group, stage.num, true)?
)?;
writeln!(f, " }}")?;
writeln!(f, " }}")?;

Ok(f)
}

pub fn display_plan(
plan: &Arc<dyn ExecutionPlan>,
partition_group: &[usize],
stage_num: usize,
distributed: bool,
) -> Result<String> {
// draw all plans
// we need to label the nodes including depth to uniquely identify them within this task
// the tree node API provides depth first traversal, but we need breadth to align with
// how we will draw edges below, so we'll do that.
let mut queue = VecDeque::from([&stage.plan]);
let mut queue = VecDeque::from([plan]);
let mut index = 0;

let mut f = String::new();
while let Some(plan) = queue.pop_front() {
index += 1;
let p = display_single_plan(plan.as_ref(), stage.num, partition_group, index)?;
let p = display_single_plan(plan.as_ref(), stage_num, partition_group, index)?;
writeln!(f, "{}", p)?;
for child in plan.children().iter() {
queue.push_back(child);
Expand All @@ -518,7 +555,7 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
Option<&'a Arc<dyn ExecutionPlan>>,
usize,
);
let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(&stage.plan, None, 0usize)]);
let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(plan, None, 0usize)]);
let mut found_isolator = false;
index = 0;
while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() {
Expand All @@ -533,7 +570,10 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
if let Some(parent) = maybe_parent {
let output_partitions = plan.output_partitioning().partition_count();

let partitions = if plan
let partitions = if !distributed {
#[deny(clippy::if_same_then_else)]
output_partitions
} else if plan
.as_any()
.downcast_ref::<ArrowFlightReadExec>()
.is_some()
Expand Down Expand Up @@ -567,12 +607,12 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
f,
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]",
plan.name(),
stage.num,
stage_num,
node_format_pg(partition_group),
index,
i,
parent.name(),
stage.num,
stage_num,
node_format_pg(partition_group),
parent_idx,
i,
Expand All @@ -586,9 +626,6 @@ pub fn display_single_task(stage: &StageExec, partition_group: &[usize]) -> Resu
queue.push_back((child, Some(plan), index));
}
}
writeln!(f, " }}")?;
writeln!(f, " }}")?;

Ok(f)
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
pub use distributed_ext::DistributedExt;
pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
pub use execution_plans::{
display_stage_graphviz, ArrowFlightReadExec, ExecutionTask, PartitionIsolatorExec, StageExec,
display_plan_graphviz, ArrowFlightReadExec, ExecutionTask, PartitionIsolatorExec, StageExec,
};
pub use flight_service::{
ArrowFlightEndpoint, DefaultSessionBuilder, DistributedSessionBuilder,
Expand Down