Skip to content

Commit 5db3ce0

Browse files
committed
Better display of partitions in network shuffle tasks
1 parent 8ae7c1a commit 5db3ce0

File tree

4 files changed

+60
-68
lines changed

4 files changed

+60
-68
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -413,14 +413,14 @@ mod tests {
413413
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
414414
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
415415
└──────────────────────────────────────────────────
416-
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p4,p5,p6,p7]
416+
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
417417
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
418418
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
419419
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
420420
│ CoalesceBatchesExec: target_batch_size=8192
421421
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=2, input_tasks=2
422422
└──────────────────────────────────────────────────
423-
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p8,p9,p10,p11,p12,p13,p14,p15]
423+
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
424424
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
425425
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
426426
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -441,14 +441,14 @@ mod tests {
441441
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
442442
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
443443
└──────────────────────────────────────────────────
444-
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p4,p5,p6,p7]
444+
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
445445
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
446446
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
447447
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
448448
│ CoalesceBatchesExec: target_batch_size=8192
449449
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=2, input_tasks=2
450450
└──────────────────────────────────────────────────
451-
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p8,p9,p10,p11,p12,p13,p14,p15]
451+
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
452452
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
453453
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
454454
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -511,13 +511,13 @@ mod tests {
511511
│ CoalesceBatchesExec: target_batch_size=8192
512512
│ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2
513513
└──────────────────────────────────────────────────
514-
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p4,p5,p6,p7]
514+
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
515515
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
516516
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
517517
│ CoalesceBatchesExec: target_batch_size=8192
518518
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=2, input_tasks=2
519519
└──────────────────────────────────────────────────
520-
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p8,p9,p10,p11,p12,p13,p14,p15]
520+
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
521521
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
522522
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
523523
│ CoalesceBatchesExec: target_batch_size=8192
@@ -526,7 +526,7 @@ mod tests {
526526
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]
527527
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
528528
└──────────────────────────────────────────────────
529-
┌───── Stage 3 Tasks: t0:[p0,p1,p2,p3] t1:[p4,p5,p6,p7]
529+
┌───── Stage 3 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
530530
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
531531
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
532532
│ CoalesceBatchesExec: target_batch_size=8192
@@ -565,7 +565,7 @@ mod tests {
565565
│ CoalesceBatchesExec: target_batch_size=8192
566566
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=1, input_tasks=2
567567
└──────────────────────────────────────────────────
568-
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3] t1:[p4,p5,p6,p7]
568+
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
569569
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 4), input_partitions=4
570570
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
571571
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]

src/execution_plans/stage.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ impl ExecutionPlan for StageExec {
291291
}
292292

293293
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
294+
use datafusion::physical_expr::Partitioning;
294295
/// Be able to display a nice tree for stages.
295296
///
296297
/// The challenge to doing this at the moment is that `TreeRenderVistor`
@@ -382,13 +383,7 @@ impl DisplayAs for StageExec {
382383
LTCORNER,
383384
HORIZONTAL.repeat(5),
384385
self.name,
385-
format_tasks_for_stage(
386-
self.tasks.len(),
387-
self.plan
388-
.properties()
389-
.output_partitioning()
390-
.partition_count()
391-
)
386+
format_tasks_for_stage(self.tasks.len(), &self.plan)
392387
)?;
393388

394389
let mut plan_str = String::new();
@@ -412,19 +407,16 @@ impl DisplayAs for StageExec {
412407
DisplayFormatType::TreeRender => write!(
413408
f,
414409
"{}",
415-
format_tasks_for_stage(
416-
self.tasks.len(),
417-
self.plan
418-
.properties()
419-
.output_partitioning()
420-
.partition_count()
421-
)
410+
format_tasks_for_stage(self.tasks.len(), &self.plan)
422411
),
423412
}
424413
}
425414
}
426415

427-
fn format_tasks_for_stage(n_tasks: usize, input_partitions: usize) -> String {
416+
fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> String {
417+
let partitioning = head.properties().output_partitioning();
418+
let input_partitions = partitioning.partition_count();
419+
let hash_shuffle = matches!(partitioning, Partitioning::Hash(_, _));
428420
let mut result = "Tasks: ".to_string();
429421
let mut off = 0;
430422
for i in 0..n_tasks {
@@ -433,7 +425,7 @@ fn format_tasks_for_stage(n_tasks: usize, input_partitions: usize) -> String {
433425
.map(|v| format!("p{v}"))
434426
.join(",");
435427
result += "] ";
436-
off += input_partitions
428+
off += if hash_shuffle { 0 } else { input_partitions }
437429
}
438430
result
439431
}

tests/distributed_aggregation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ mod tests {
5656
│ CoalesceBatchesExec: target_batch_size=8192
5757
│ NetworkShuffleExec read_from=Stage 1, output_partitions=3, n_tasks=1, input_tasks=2
5858
└──────────────────────────────────────────────────
59-
┌───── Stage 1 Tasks: t0:[p0,p1,p2] t1:[p3,p4,p5]
59+
┌───── Stage 1 Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2]
6060
│ RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=2
6161
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
6262
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]

0 commit comments

Comments
 (0)