Skip to content

Commit f6fe903

Browse files
committed
graphviz plan output done
1 parent c523af6 commit f6fe903

File tree

2 files changed

+70
-25
lines changed

2 files changed

+70
-25
lines changed

src/stage/display.rs

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -94,56 +94,101 @@ impl DisplayAs for ExecutionStage {
9494
pub fn display_stage_graphviz(stage: &ExecutionStage) -> Result<String> {
9595
let mut f = String::new();
9696

97+
let num_colors = 5; // this should aggree with the colorscheme chosen from
98+
// https://graphviz.org/doc/info/colors.html
99+
let colorscheme = "spectral5";
100+
97101
writeln!(f, "digraph G {{")?;
98102
writeln!(f, " node[shape=rect];")?;
99103
writeln!(f, " rankdir=BT;")?;
100104
writeln!(f, " ranksep=2;")?;
101-
writeln!(f, " edge[colorscheme=rdylbu11, penwidth=2.0];")?;
105+
writeln!(f, " edge[colorscheme={},penwidth=2.0];", colorscheme)?;
102106

103107
// we'll keep a stack of stage ref, parrent stage ref
104108
let mut stack: Vec<(&ExecutionStage, Option<&ExecutionStage>)> = vec![(stage, None)];
105109

106110
while let Some((stage, parent)) = stack.pop() {
107111
writeln!(f, " subgraph cluster_{} {{", stage.num)?;
112+
writeln!(f, " node[shape=record];")?;
108113
writeln!(f, " label=\"{}\";", stage.name())?;
109-
writeln!(f, " labeljust=l;")?;
114+
writeln!(f, " labeljust=r;")?;
115+
writeln!(f, " labelloc=b;")?; // this will put the label at the top as our
116+
// rankdir=BT
110117

111118
stage.tasks.iter().try_for_each(|task| {
119+
let lab = task
120+
.partition_group
121+
.iter()
122+
.map(|p| format!("<p{}>{}", p, p))
123+
.collect::<Vec<_>>()
124+
.join("|");
112125
writeln!(
113126
f,
114127
" \"{}_{}\"[label = \"{}\"]",
115128
stage.num,
116129
format_pg(&task.partition_group),
117-
format_pg(&task.partition_group)
130+
lab,
118131
)?;
119132

120133
if let Some(our_parent) = parent {
121134
our_parent.tasks.iter().try_for_each(|ptask| {
122-
ptask.partition_group.iter().try_for_each(|partition| {
123-
writeln!(
124-
f,
125-
" \"{}_{}\" -> \"{}_{}\"[tailport=n, headport=s, color={}]",
126-
stage.num,
127-
format_pg(&task.partition_group),
128-
our_parent.num,
129-
format_pg(&ptask.partition_group),
130-
partition + 1
131-
)
132-
})?;
133-
writeln!(
134-
f,
135-
" \"{}_{}\" -> \"{}_{}\"[tailport=n, headport=s, color={}]",
136-
stage.num,
137-
format_pg(&task.partition_group),
138-
our_parent.num,
139-
format_pg(&ptask.partition_group),
140-
&ptask.partition_group[0] + 1,
141-
)
135+
task.partition_group.iter().try_for_each(|partition| {
136+
ptask.partition_group.iter().try_for_each(|ppartition| {
137+
writeln!(
138+
f,
139+
" \"{}_{}\":p{}:n -> \"{}_{}\":p{}:s[color={}]",
140+
stage.num,
141+
format_pg(&task.partition_group),
142+
partition,
143+
our_parent.num,
144+
format_pg(&ptask.partition_group),
145+
ppartition,
146+
(partition) % num_colors + 1
147+
)
148+
})
149+
})
142150
})?;
143151
}
144152

145153
Ok::<(), std::fmt::Error>(())
146154
})?;
155+
156+
// now we try to force the left right nature of tasks to be honored
157+
writeln!(f, " {{")?;
158+
writeln!(f, " rank = same;")?;
159+
stage.tasks.iter().try_for_each(|task| {
160+
writeln!(
161+
f,
162+
" \"{}_{}\"",
163+
stage.num,
164+
format_pg(&task.partition_group)
165+
)?;
166+
167+
Ok::<(), std::fmt::Error>(())
168+
})?;
169+
writeln!(f, " }}")?;
170+
// combined with rank = same, the invisible edges will force the tasks to be
171+
// laid out in a single row within the stage
172+
for i in 0..stage.tasks.len() - 1 {
173+
writeln!(
174+
f,
175+
" \"{}_{}\":w -> \"{}_{}\":e[style=invis]",
176+
stage.num,
177+
format_pg(&stage.tasks[i].partition_group),
178+
stage.num,
179+
format_pg(&stage.tasks[i + 1].partition_group),
180+
)?;
181+
}
182+
183+
// add a node for the plan, its way too big! Alternatives to add it?
184+
/*writeln!(
185+
f,
186+
" \"{}_plan\"[label = \"{}\", shape=box];",
187+
stage.num,
188+
displayable(stage.plan.as_ref()).indent(false)
189+
)?;
190+
*/
191+
147192
writeln!(f, " }}")?;
148193

149194
for child in stage.child_stages_iter() {

tests/stage_planning.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ mod tests {
1717

1818
#[tokio::test]
1919
async fn stage_planning() -> Result<(), Box<dyn Error>> {
20-
let config = SessionConfig::new().with_target_partitions(3);
20+
let config = SessionConfig::new().with_target_partitions(10);
2121

22-
let rule = DistributedPhysicalOptimizerRule::default().with_maximum_partitions_per_task(2);
22+
let rule = DistributedPhysicalOptimizerRule::default().with_maximum_partitions_per_task(4);
2323

2424
let state = SessionStateBuilder::new()
2525
.with_default_features()

0 commit comments

Comments
 (0)