Skip to content

Commit 21cef58

Browse files
authored
Removes an extra line jump in distributed explains (#95)
1 parent bae85ca commit 21cef58

File tree

4 files changed

+7
-24
lines changed

4 files changed

+7
-24
lines changed

src/physical_optimizer.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ mod tests {
190190
assert_snapshot!(plan, @r"
191191
┌───── Stage 1 Task: partitions: 0,unassigned]
192192
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
193-
194193
└──────────────────────────────────────────────────
195194
");
196195
}
@@ -209,18 +208,15 @@ mod tests {
209208
│partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
210209
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
211210
│partitions [out:4 ] ArrowFlightReadExec: Stage 2
212-
213211
└──────────────────────────────────────────────────
214212
┌───── Stage 2 Task: partitions: 0..3,unassigned]
215213
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=4
216214
│partitions [out:4 ] ArrowFlightReadExec: Stage 1
217-
218215
└──────────────────────────────────────────────────
219216
┌───── Stage 1 Task: partitions: 0..3,unassigned]
220217
│partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
221218
│partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
222219
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
223-
224220
└──────────────────────────────────────────────────
225221
");
226222
}
@@ -239,20 +235,17 @@ mod tests {
239235
│partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
240236
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
241237
│partitions [out:4 ] ArrowFlightReadExec: Stage 2
242-
243238
└──────────────────────────────────────────────────
244239
┌───── Stage 2 Task: partitions: 0,1,unassigned],Task: partitions: 2,3,unassigned]
245240
│partitions [out:4 <-- in:2 ] RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=2
246241
│partitions [out:2 <-- in:4 ] PartitionIsolatorExec [providing upto 2 partitions]
247242
│partitions [out:4 ] ArrowFlightReadExec: Stage 1
248-
249243
└──────────────────────────────────────────────────
250244
┌───── Stage 1 Task: partitions: 0,1,unassigned],Task: partitions: 2,3,unassigned]
251245
│partitions [out:4 <-- in:2 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
252246
│partitions [out:2 <-- in:1 ] PartitionIsolatorExec [providing upto 2 partitions]
253247
│partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
254248
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
255-
256249
└──────────────────────────────────────────────────
257250
");
258251
}
@@ -267,7 +260,6 @@ mod tests {
267260
│partitions [out:1 <-- in:1 ] HashJoinExec: mode=Partitioned, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
268261
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
269262
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
270-
271263
└──────────────────────────────────────────────────
272264
");
273265
}
@@ -312,35 +304,28 @@ mod tests {
312304
│partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
313305
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
314306
│partitions [out:4 ] ArrowFlightReadExec: Stage 4
315-
316307
└──────────────────────────────────────────────────
317308
┌───── Stage 2 Task: partitions: 0..3,unassigned]
318309
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
319310
│partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
320311
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
321312
│partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
322313
│partitions [out:4 ] ArrowFlightReadExec: Stage 1
323-
324314
└──────────────────────────────────────────────────
325315
┌───── Stage 1 Task: partitions: 0..3,unassigned]
326316
│partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
327317
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
328-
329-
330318
└──────────────────────────────────────────────────
331319
┌───── Stage 4 Task: partitions: 0..3,unassigned]
332320
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
333321
│partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
334322
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
335323
│partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
336324
│partitions [out:4 ] ArrowFlightReadExec: Stage 3
337-
338325
└──────────────────────────────────────────────────
339326
┌───── Stage 3 Task: partitions: 0..3,unassigned]
340327
│partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
341328
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
342-
343-
344329
└──────────────────────────────────────────────────
345330
");
346331
}
@@ -353,7 +338,6 @@ mod tests {
353338
┌───── Stage 1 Task: partitions: 0,unassigned]
354339
│partitions [out:1 <-- in:1 ] SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[false]
355340
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
356-
357341
└──────────────────────────────────────────────────
358342
");
359343
}
@@ -367,18 +351,15 @@ mod tests {
367351
│partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
368352
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
369353
│partitions [out:4 ] ArrowFlightReadExec: Stage 2
370-
371354
└──────────────────────────────────────────────────
372355
┌───── Stage 2 Task: partitions: 0..3,unassigned]
373356
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 4), input_partitions=4
374357
│partitions [out:4 ] ArrowFlightReadExec: Stage 1
375-
376358
└──────────────────────────────────────────────────
377359
┌───── Stage 1 Task: partitions: 0..3,unassigned]
378360
│partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
379361
│partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
380362
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
381-
382363
└──────────────────────────────────────────────────
383364
");
384365
}

src/stage/display.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,11 @@ impl DisplayAs for ExecutionStage {
4848
)?;
4949
let plan_str = display_plan_with_partition_in_out(self.plan.as_ref())
5050
.map_err(|_| std::fmt::Error {})?;
51-
let plan_str =
52-
plan_str.replace('\n', &format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
51+
let plan_str = plan_str
52+
.split('\n')
53+
.filter(|v| !v.is_empty())
54+
.collect::<Vec<_>>()
55+
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
5356
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
5457
write!(
5558
f,

src/test_utils/plan.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use datafusion::common::plan_err;
33
use datafusion::common::tree_node::{Transformed, TreeNode};
44
use datafusion::error::DataFusionError;
55
use datafusion::physical_expr::Partitioning;
6+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
67
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
8+
use datafusion::physical_plan::repartition::RepartitionExec;
79
use datafusion::physical_plan::ExecutionPlan;
810
use std::sync::Arc;
911

tests/distributed_aggregation.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,16 @@ mod tests {
5353
│partitions [out:8 <-- in:8 ] ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
5454
│partitions [out:8 <-- in:8 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
5555
│partitions [out:8 ] ArrowFlightReadExec: Stage 2
56-
5756
└──────────────────────────────────────────────────
5857
┌───── Stage 2 Task: partitions: 0..2,unassigned]
5958
│partitions [out:3 <-- in:3 ] CoalesceBatchesExec: target_batch_size=8192
6059
│partitions [out:3 <-- in:3 ] RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3
6160
│partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
6261
│partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
6362
│partitions [out:1 ] ArrowFlightReadExec: Stage 1
64-
6563
└──────────────────────────────────────────────────
6664
┌───── Stage 1 Task: partitions: 0,unassigned]
6765
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
68-
6966
└──────────────────────────────────────────────────
7067
",
7168
);

0 commit comments

Comments
 (0)