Skip to content

Commit 9783fd5

Browse files
display: use range to represent partitions in a task
This change makes the explain output more concise by changing the stage formatting from ``` Tasks: t0:[p0,p1,p2,p3,p4,p5] t1:[p0,p1,p2,p3,p4,p5] t2:[p0,p1,p2,p3,p4,p5] t3:[p0,p1,p2,p3,p4,p5] ``` to ``` Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5] ``` Closes: #192
1 parent 63d3273 commit 9783fd5

File tree

6 files changed

+206
-203
lines changed

6 files changed

+206
-203
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -462,14 +462,14 @@ mod tests {
462462
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
463463
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
464464
└──────────────────────────────────────────────────
465-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
465+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
466466
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
467467
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
468468
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
469469
│ CoalesceBatchesExec: target_batch_size=8192
470470
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
471471
└──────────────────────────────────────────────────
472-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
472+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
473473
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
474474
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
475475
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -490,14 +490,14 @@ mod tests {
490490
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
491491
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
492492
└──────────────────────────────────────────────────
493-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
493+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
494494
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
495495
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
496496
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
497497
│ CoalesceBatchesExec: target_batch_size=8192
498498
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
499499
└──────────────────────────────────────────────────
500-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
500+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
501501
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
502502
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
503503
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -562,13 +562,13 @@ mod tests {
562562
│ CoalesceBatchesExec: target_batch_size=8192
563563
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=2
564564
└──────────────────────────────────────────────────
565-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
565+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
566566
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
567567
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
568568
│ CoalesceBatchesExec: target_batch_size=8192
569569
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
570570
└──────────────────────────────────────────────────
571-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
571+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
572572
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
573573
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
574574
│ CoalesceBatchesExec: target_batch_size=8192
@@ -577,7 +577,7 @@ mod tests {
577577
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
578578
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
579579
└──────────────────────────────────────────────────
580-
┌───── Stage 3 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
580+
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3]
581581
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
582582
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
583583
│ CoalesceBatchesExec: target_batch_size=8192
@@ -598,7 +598,7 @@ mod tests {
598598
│ SortPreservingMergeExec: [MinTemp@0 DESC]
599599
│ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
600600
└──────────────────────────────────────────────────
601-
┌───── Stage 1 ── Tasks: t0:[p0,p1] t1:[p2,p3]
601+
┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
602602
│ SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[true]
603603
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
604604
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
@@ -615,12 +615,12 @@ mod tests {
615615
│ CoalescePartitionsExec
616616
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
617617
└──────────────────────────────────────────────────
618-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
618+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
619619
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
620620
│ CoalesceBatchesExec: target_batch_size=8192
621621
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
622622
└──────────────────────────────────────────────────
623-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
623+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
624624
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
625625
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
626626
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]

src/stage.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::error::Result;
55
use datafusion::execution::TaskContext;
66
use datafusion::physical_plan::display::DisplayableExecutionPlan;
77
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
8-
use itertools::{Either, Itertools};
8+
use itertools::Either;
99
use std::collections::VecDeque;
1010
use std::sync::Arc;
1111
use url::Url;
@@ -334,9 +334,12 @@ fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> Stri
334334
let mut off = 0;
335335
for i in 0..n_tasks {
336336
result += &format!("t{i}:[");
337-
result += &(off..(off + input_partitions))
338-
.map(|v| format!("p{v}"))
339-
.join(",");
337+
let end = off + input_partitions - 1;
338+
if input_partitions == 1 {
339+
result += &format!("p{off}");
340+
} else {
341+
result += &format!("p{off}..p{end}");
342+
}
340343
result += "] ";
341344
off += if hash_shuffle { 0 } else { input_partitions }
342345
}

tests/custom_extension_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ mod tests {
6666
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
6767
│ [Stage 2] => NetworkShuffleExec: output_partitions=10, input_tasks=10
6868
└──────────────────────────────────────────────────
69-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t3:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t4:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t5:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t6:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t7:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t8:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t9:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
69+
┌───── Stage 2 ── Tasks: t0:[p0..p9] t1:[p0..p9] t2:[p0..p9] t3:[p0..p9] t4:[p0..p9] t5:[p0..p9] t6:[p0..p9] t7:[p0..p9] t8:[p0..p9] t9:[p0..p9]
7070
│ RepartitionExec: partitioning=Hash([], 10), input_partitions=1
7171
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
7272
│ [Stage 1] => NetworkShuffleExec: output_partitions=1, input_tasks=1
7373
└──────────────────────────────────────────────────
74-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
74+
┌───── Stage 1 ── Tasks: t0:[p0..p9]
7575
│ RepartitionExec: partitioning=Hash([numbers@0], 10), input_partitions=1
7676
│ FilterExec: numbers@0 > 1
7777
│ Int64ListExec: length=6

tests/distributed_aggregation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ mod tests {
5555
│ CoalesceBatchesExec: target_batch_size=8192
5656
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=2
5757
└──────────────────────────────────────────────────
58-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2]
58+
┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p0..p2]
5959
│ RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=2
6060
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
6161
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
@@ -132,13 +132,13 @@ mod tests {
132132
│ CoalescePartitionsExec
133133
│ [Stage 2] => NetworkCoalesceExec: output_partitions=18, input_tasks=6
134134
└──────────────────────────────────────────────────
135-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] t2:[p0,p1,p2] t3:[p0,p1,p2] t4:[p0,p1,p2] t5:[p0,p1,p2]
135+
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2] t2:[p0..p2] t3:[p0..p2] t4:[p0..p2] t5:[p0..p2]
136136
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
137137
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
138138
│ CoalesceBatchesExec: target_batch_size=8192
139139
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3
140140
└──────────────────────────────────────────────────
141-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17]
141+
┌───── Stage 1 ── Tasks: t0:[p0..p17] t1:[p0..p17] t2:[p0..p17]
142142
│ RepartitionExec: partitioning=Hash([RainToday@0], 18), input_partitions=1
143143
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
144144
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]

tests/stateful_execution_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ mod tests {
7070
│ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10
7171
│ [Stage 2] => NetworkShuffleExec: output_partitions=10, input_tasks=10
7272
└──────────────────────────────────────────────────
73-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t3:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t4:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t5:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t6:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t7:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t8:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t9:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
73+
┌───── Stage 2 ── Tasks: t0:[p0..p9] t1:[p0..p9] t2:[p0..p9] t3:[p0..p9] t4:[p0..p9] t5:[p0..p9] t6:[p0..p9] t7:[p0..p9] t8:[p0..p9] t9:[p0..p9]
7474
│ RepartitionExec: partitioning=Hash([], 10), input_partitions=1
7575
│ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false]
7676
│ [Stage 1] => NetworkShuffleExec: output_partitions=1, input_tasks=1
7777
└──────────────────────────────────────────────────
78-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9]
78+
┌───── Stage 1 ── Tasks: t0:[p0..p9]
7979
│ RepartitionExec: partitioning=Hash([numbers@0], 10), input_partitions=1
8080
│ FilterExec: numbers@0 > 1
8181
│ StatefulInt64ListExec: length=6

0 commit comments

Comments
 (0)