Skip to content

Commit c523af6

Browse files
committed
wip save on graphviz formatting
1 parent d6f90e3 commit c523af6

File tree

3 files changed

+58
-8
lines changed

3 files changed

+58
-8
lines changed

src/stage/display.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion::{
2929
},
3030
};
3131

32-
use crate::common::util::display_plan_with_partition_in_out;
32+
use crate::{common::util::display_plan_with_partition_in_out, task::format_pg};
3333

3434
use super::ExecutionStage;
3535

@@ -96,14 +96,59 @@ pub fn display_stage_graphviz(stage: &ExecutionStage) -> Result<String> {
9696

9797
writeln!(f, "digraph G {{")?;
9898
writeln!(f, " node[shape=rect];")?;
99+
writeln!(f, " rankdir=BT;")?;
100+
writeln!(f, " ranksep=2;")?;
101+
writeln!(f, " edge[colorscheme=rdylbu11, penwidth=2.0];")?;
99102

100-
let mut stack = vec![stage];
103+
// we'll keep a stack of stage ref, parrent stage ref
104+
let mut stack: Vec<(&ExecutionStage, Option<&ExecutionStage>)> = vec![(stage, None)];
101105

102-
while !stack.is_empty() {
103-
writeln!(f, " subgraph cluster_{} {{];", stage.num)?;
106+
while let Some((stage, parent)) = stack.pop() {
107+
writeln!(f, " subgraph cluster_{} {{", stage.num)?;
104108
writeln!(f, " label=\"{}\";", stage.name())?;
105-
writeln!(f, " labeljust=l")?;
109+
writeln!(f, " labeljust=l;")?;
110+
111+
stage.tasks.iter().try_for_each(|task| {
112+
writeln!(
113+
f,
114+
" \"{}_{}\"[label = \"{}\"]",
115+
stage.num,
116+
format_pg(&task.partition_group),
117+
format_pg(&task.partition_group)
118+
)?;
119+
120+
if let Some(our_parent) = parent {
121+
our_parent.tasks.iter().try_for_each(|ptask| {
122+
ptask.partition_group.iter().try_for_each(|partition| {
123+
writeln!(
124+
f,
125+
" \"{}_{}\" -> \"{}_{}\"[tailport=n, headport=s, color={}]",
126+
stage.num,
127+
format_pg(&task.partition_group),
128+
our_parent.num,
129+
format_pg(&ptask.partition_group),
130+
partition + 1
131+
)
132+
})?;
133+
writeln!(
134+
f,
135+
" \"{}_{}\" -> \"{}_{}\"[tailport=n, headport=s, color={}]",
136+
stage.num,
137+
format_pg(&task.partition_group),
138+
our_parent.num,
139+
format_pg(&ptask.partition_group),
140+
&ptask.partition_group[0] + 1,
141+
)
142+
})?;
143+
}
144+
145+
Ok::<(), std::fmt::Error>(())
146+
})?;
106147
writeln!(f, " }}")?;
148+
149+
for child in stage.child_stages_iter() {
150+
stack.push((child, Some(stage)));
151+
}
107152
}
108153

109154
writeln!(f, "}}")?;

src/task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl Display for ExecutionTask {
106106
}
107107
}
108108

109-
fn format_pg(partition_group: &[u64]) -> String {
109+
pub(crate) fn format_pg(partition_group: &[u64]) -> String {
110110
if partition_group.len() > 2 {
111111
format!(
112112
"{}..{}",

tests/stage_planning.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@ mod tests {
1010
use datafusion::physical_plan::{displayable, execute_stream};
1111
use datafusion::prelude::{SessionConfig, SessionContext};
1212
use datafusion_distributed::physical_optimizer::DistributedPhysicalOptimizerRule;
13+
use datafusion_distributed::stage::{display_stage_graphviz, ExecutionStage};
1314
use futures::TryStreamExt;
1415
use std::error::Error;
1516
use std::sync::Arc;
1617

1718
#[tokio::test]
1819
async fn stage_planning() -> Result<(), Box<dyn Error>> {
19-
let rule = DistributedPhysicalOptimizerRule::default(); //.with_maximum_partitions_per_task(3);
20+
let config = SessionConfig::new().with_target_partitions(3);
2021

21-
let config = SessionConfig::new().with_target_partitions(10);
22+
let rule = DistributedPhysicalOptimizerRule::default().with_maximum_partitions_per_task(2);
2223

2324
let state = SessionStateBuilder::new()
2425
.with_default_features()
@@ -57,6 +58,10 @@ mod tests {
5758
let physical_str = displayable(physical.as_ref()).indent(true);
5859
println!("\n\nPhysical Plan:\n{}", physical_str);
5960

61+
let physical_str =
62+
display_stage_graphviz(physical.as_any().downcast_ref::<ExecutionStage>().unwrap())?;
63+
println!("\n\nPhysical Plan:\n{}", physical_str);
64+
6065
assert_snapshot!(physical_str,
6166
@r"
6267
",

0 commit comments

Comments
 (0)