Skip to content

Commit 23d640a

Browse files
display: use range to represent partitions in a task (#194)
This change makes the explain output more concise by changing the stage formatting from ``` Tasks: t0:[p0,p1,p2,p3,p4,p5] t1:[p0,p1,p2,p3,p4,p5] t2:[p0,p1,p2,p3,p4,p5] t3:[p0,p1,p2,p3,p4,p5] ``` to ``` Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5] ``` Closes: #192
1 parent 9241841 commit 23d640a

File tree

6 files changed

+206
-203
lines changed

6 files changed

+206
-203
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -498,14 +498,14 @@ mod tests {
498498
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
499499
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
500500
└──────────────────────────────────────────────────
501-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
501+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
502502
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
503503
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
504504
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
505505
│ CoalesceBatchesExec: target_batch_size=8192
506506
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
507507
└──────────────────────────────────────────────────
508-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
508+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
509509
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
510510
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
511511
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -526,14 +526,14 @@ mod tests {
526526
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
527527
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
528528
└──────────────────────────────────────────────────
529-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
529+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
530530
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
531531
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
532532
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
533533
│ CoalesceBatchesExec: target_batch_size=8192
534534
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
535535
└──────────────────────────────────────────────────
536-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
536+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
537537
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
538538
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
539539
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -598,13 +598,13 @@ mod tests {
598598
│ CoalesceBatchesExec: target_batch_size=8192
599599
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=2
600600
└──────────────────────────────────────────────────
601-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
601+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
602602
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
603603
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
604604
│ CoalesceBatchesExec: target_batch_size=8192
605605
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
606606
└──────────────────────────────────────────────────
607-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
607+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
608608
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
609609
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
610610
│ CoalesceBatchesExec: target_batch_size=8192
@@ -613,7 +613,7 @@ mod tests {
613613
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
614614
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
615615
└──────────────────────────────────────────────────
616-
┌───── Stage 3 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
616+
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3]
617617
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
618618
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
619619
│ CoalesceBatchesExec: target_batch_size=8192
@@ -634,7 +634,7 @@ mod tests {
634634
│ SortPreservingMergeExec: [MinTemp@0 DESC]
635635
│ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
636636
└──────────────────────────────────────────────────
637-
┌───── Stage 1 ── Tasks: t0:[p0,p1] t1:[p2,p3]
637+
┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
638638
│ SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[true]
639639
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
640640
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
@@ -651,12 +651,12 @@ mod tests {
651651
│ CoalescePartitionsExec
652652
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
653653
└──────────────────────────────────────────────────
654-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
654+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
655655
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
656656
│ CoalesceBatchesExec: target_batch_size=8192
657657
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
658658
└──────────────────────────────────────────────────
659-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
659+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
660660
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
661661
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
662662
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]

src/stage.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::error::Result;
55
use datafusion::execution::TaskContext;
66
use datafusion::physical_plan::display::DisplayableExecutionPlan;
77
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
8-
use itertools::{Either, Itertools};
8+
use itertools::Either;
99
use std::collections::VecDeque;
1010
use std::sync::Arc;
1111
use url::Url;
@@ -313,9 +313,12 @@ fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> Stri
313313
let mut off = 0;
314314
for i in 0..n_tasks {
315315
result += &format!("t{i}:[");
316-
result += &(off..(off + input_partitions))
317-
.map(|v| format!("p{v}"))
318-
.join(",");
316+
let end = off + input_partitions - 1;
317+
if input_partitions == 1 {
318+
result += &format!("p{off}");
319+
} else {
320+
result += &format!("p{off}..p{end}");
321+
}
319322
result += "] ";
320323
off += if hash_shuffle { 0 } else { input_partitions }
321324
}

tests/custom_extension_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ mod tests {
6666
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
6767
│ [Stage 2] => NetworkShuffleExec: output_partitions=10, input_tasks=10
6868
└──────────────────────────────────────────────────
69-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t3:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t4:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t5:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t6:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t7:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t8:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t9:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
69+
┌───── Stage 2 ── Tasks: t0:[p0..p9] t1:[p0..p9] t2:[p0..p9] t3:[p0..p9] t4:[p0..p9] t5:[p0..p9] t6:[p0..p9] t7:[p0..p9] t8:[p0..p9] t9:[p0..p9]
7070
│ RepartitionExec: partitioning=Hash([], 10), input_partitions=1
7171
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
7272
│ [Stage 1] => NetworkShuffleExec: output_partitions=1, input_tasks=1
7373
└──────────────────────────────────────────────────
74-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
74+
┌───── Stage 1 ── Tasks: t0:[p0..p9]
7575
│ RepartitionExec: partitioning=Hash([numbers@0], 10), input_partitions=1
7676
│ FilterExec: numbers@0 > 1
7777
│ Int64ListExec: length=6

tests/distributed_aggregation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ mod tests {
5555
│ CoalesceBatchesExec: target_batch_size=8192
5656
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=2
5757
└──────────────────────────────────────────────────
58-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2]
58+
┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p0..p2]
5959
│ RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=2
6060
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
6161
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
@@ -132,13 +132,13 @@ mod tests {
132132
│ CoalescePartitionsExec
133133
│ [Stage 2] => NetworkCoalesceExec: output_partitions=18, input_tasks=6
134134
└──────────────────────────────────────────────────
135-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] t2:[p0,p1,p2] t3:[p0,p1,p2] t4:[p0,p1,p2] t5:[p0,p1,p2]
135+
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2] t2:[p0..p2] t3:[p0..p2] t4:[p0..p2] t5:[p0..p2]
136136
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
137137
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
138138
│ CoalesceBatchesExec: target_batch_size=8192
139139
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
140140
└──────────────────────────────────────────────────
141-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17]
141+
┌───── Stage 1 ── Tasks: t0:[p0..p17] t1:[p0..p17] t2:[p0..p17]
142142
│ RepartitionExec: partitioning=Hash([RainToday@0], 18), input_partitions=1
143143
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
144144
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]

tests/stateful_execution_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ mod tests {
7070
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
7171
│ [Stage 2] => NetworkShuffleExec: output_partitions=10, input_tasks=10
7272
└──────────────────────────────────────────────────
73-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t3:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t4:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t5:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t6:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t7:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t8:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t9:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
73+
┌───── Stage 2 ── Tasks: t0:[p0..p9] t1:[p0..p9] t2:[p0..p9] t3:[p0..p9] t4:[p0..p9] t5:[p0..p9] t6:[p0..p9] t7:[p0..p9] t8:[p0..p9] t9:[p0..p9]
7474
│ RepartitionExec: partitioning=Hash([], 10), input_partitions=1
7575
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
7676
│ [Stage 1] => NetworkShuffleExec: output_partitions=1, input_tasks=1
7777
└──────────────────────────────────────────────────
78-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
78+
┌───── Stage 1 ── Tasks: t0:[p0..p9]
7979
│ RepartitionExec: partitioning=Hash([numbers@0], 10), input_partitions=1
8080
│ FilterExec: numbers@0 > 1
8181
│ StatefulInt64ListExec: length=6

0 commit comments

Comments
 (0)