Skip to content

Commit e1f41df

Browse files
committed
Bring back graphviz visualization
1 parent 12b1196 commit e1f41df

File tree

6 files changed

+486
-478
lines changed

6 files changed

+486
-478
lines changed

src/execution_plans/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ mod stage;
77
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
88
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
99
pub use partition_isolator::PartitionIsolatorExec;
10+
pub use stage::display_plan_graphviz;
1011
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};

src/execution_plans/partition_isolator.rs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl PartitionIsolatorExec {
8383
return Err(limit_tasks_err(input_partitions));
8484
}
8585

86-
let partition_count = (input_partitions as f64 / n_tasks as f64).ceil() as usize;
86+
let partition_count = Self::partition_groups(input_partitions, n_tasks)[0].len();
8787

8888
let properties = pending
8989
.input
@@ -105,14 +105,34 @@ impl PartitionIsolatorExec {
105105
Self::new_pending(input).ready(n_tasks)
106106
}
107107

108-
pub(crate) fn partition_group(&self, task_i: usize, n_tasks: usize) -> Vec<usize> {
109-
let Self::Ready(ready) = self else {
110-
return vec![];
111-
};
108+
pub(crate) fn partition_groups(input_partitions: usize, n_tasks: usize) -> Vec<Vec<usize>> {
109+
let q = input_partitions / n_tasks;
110+
let r = input_partitions % n_tasks;
111+
112+
let mut off = 0;
113+
(0..n_tasks)
114+
.map(|i| q + if i < r { 1 } else { 0 })
115+
.map(|n| {
116+
let result = (off..(off + n)).collect();
117+
off += n;
118+
result
119+
})
120+
.collect()
121+
}
112122

113-
let input_partitions = ready.input.output_partitioning().partition_count();
114-
let ppt = (input_partitions as f64 / n_tasks as f64).ceil() as usize;
115-
((ppt * task_i)..(ppt * (task_i + 1))).collect()
123+
pub(crate) fn partition_group(
124+
input_partitions: usize,
125+
task_i: usize,
126+
n_tasks: usize,
127+
) -> Vec<usize> {
128+
Self::partition_groups(input_partitions, n_tasks)[task_i].clone()
129+
}
130+
131+
pub(crate) fn input(&self) -> &Arc<dyn ExecutionPlan> {
132+
match self {
133+
PartitionIsolatorExec::Pending(v) => &v.input,
134+
PartitionIsolatorExec::Ready(v) => &v.input,
135+
}
116136
}
117137
}
118138

@@ -139,10 +159,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
139159
}
140160

141161
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142-
match self {
143-
PartitionIsolatorExec::Pending(pending) => vec![&pending.input],
144-
PartitionIsolatorExec::Ready(ready) => vec![&ready.input],
145-
}
162+
vec![self.input()]
146163
}
147164

148165
fn with_new_children(
@@ -176,21 +193,18 @@ impl ExecutionPlan for PartitionIsolatorExec {
176193
let task_context = DistributedTaskContext::from_ctx(&context);
177194
let stage = StageExec::from_ctx(&context)?;
178195

179-
let partition_group = self.partition_group(task_context.task_index, stage.tasks.len());
196+
let input_partitions = self_ready.input.output_partitioning().partition_count();
180197

181-
let partitions_in_input = self_ready
182-
.input
183-
.properties()
184-
.output_partitioning()
185-
.partition_count();
198+
let partition_group =
199+
Self::partition_group(input_partitions, task_context.task_index, stage.tasks.len());
186200

187201
// if our partition group is [7,8,9] and we are asked for parittion 1,
188202
// then look up that index in our group and execute that partition, in this
189203
// example partition 8
190204

191205
let output_stream = match partition_group.get(partition) {
192206
Some(actual_partition_number) => {
193-
if *actual_partition_number >= partitions_in_input {
207+
if *actual_partition_number >= input_partitions {
194208
//trace!("{} returning empty stream", ctx_name);
195209
Ok(
196210
Box::pin(EmptyRecordBatchStream::new(self_ready.input.schema()))

0 commit comments

Comments
 (0)