Skip to content

Commit 3c35de5

Browse files
display: use range to represent partitions in a task
This change makes the explain output more concise by changing the stage formatting from ``` Tasks: t0:[p0,p1,p2,p3,p4,p5] t1:[p0,p1,p2,p3,p4,p5] t2:[p0,p1,p2,p3,p4,p5] t3:[p0,p1,p2,p3,p4,p5] ``` to ``` Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5] ``` Closes: #192
1 parent 63d3273 commit 3c35de5

File tree

3 files changed

+199
-196
lines changed

3 files changed

+199
-196
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -462,14 +462,14 @@ mod tests {
462462
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
463463
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
464464
└──────────────────────────────────────────────────
465-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
465+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
466466
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
467467
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
468468
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
469469
│ CoalesceBatchesExec: target_batch_size=8192
470470
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
471471
└──────────────────────────────────────────────────
472-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
472+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
473473
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
474474
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
475475
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -490,14 +490,14 @@ mod tests {
490490
│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
491491
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
492492
└──────────────────────────────────────────────────
493-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
493+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
494494
│ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
495495
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
496496
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
497497
│ CoalesceBatchesExec: target_batch_size=8192
498498
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
499499
└──────────────────────────────────────────────────
500-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
500+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
501501
│ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=4
502502
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
503503
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
@@ -562,13 +562,13 @@ mod tests {
562562
│ CoalesceBatchesExec: target_batch_size=8192
563563
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=2
564564
└──────────────────────────────────────────────────
565-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
565+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
566566
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
567567
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
568568
│ CoalesceBatchesExec: target_batch_size=8192
569569
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
570570
└──────────────────────────────────────────────────
571-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
571+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
572572
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
573573
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
574574
│ CoalesceBatchesExec: target_batch_size=8192
@@ -577,7 +577,7 @@ mod tests {
577577
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
578578
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
579579
└──────────────────────────────────────────────────
580-
┌───── Stage 3 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
580+
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3]
581581
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
582582
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
583583
│ CoalesceBatchesExec: target_batch_size=8192
@@ -598,7 +598,7 @@ mod tests {
598598
│ SortPreservingMergeExec: [MinTemp@0 DESC]
599599
│ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
600600
└──────────────────────────────────────────────────
601-
┌───── Stage 1 ── Tasks: t0:[p0,p1] t1:[p2,p3]
601+
┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
602602
│ SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[true]
603603
│ PartitionIsolatorExec: t0:[p0,p1,__] t1:[__,__,p0]
604604
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
@@ -615,12 +615,12 @@ mod tests {
615615
│ CoalescePartitionsExec
616616
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
617617
└──────────────────────────────────────────────────
618-
┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
618+
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
619619
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
620620
│ CoalesceBatchesExec: target_batch_size=8192
621621
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
622622
└──────────────────────────────────────────────────
623-
┌───── Stage 1 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
623+
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
624624
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
625625
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
626626
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]

src/stage.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::error::Result;
55
use datafusion::execution::TaskContext;
66
use datafusion::physical_plan::display::DisplayableExecutionPlan;
77
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
8-
use itertools::{Either, Itertools};
8+
use itertools::Either;
99
use std::collections::VecDeque;
1010
use std::sync::Arc;
1111
use url::Url;
@@ -334,9 +334,12 @@ fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> Stri
334334
let mut off = 0;
335335
for i in 0..n_tasks {
336336
result += &format!("t{i}:[");
337-
result += &(off..(off + input_partitions))
338-
.map(|v| format!("p{v}"))
339-
.join(",");
337+
let end = off + input_partitions - 1;
338+
if input_partitions == 1 {
339+
result += &format!("p{off}");
340+
} else {
341+
result += &format!("p{off}..p{end}");
342+
}
340343
result += "] ";
341344
off += if hash_shuffle { 0 } else { input_partitions }
342345
}

0 commit comments

Comments
 (0)