diff --git a/src/physical_optimizer.rs b/src/physical_optimizer.rs index 93af9d1..19a7296 100644 --- a/src/physical_optimizer.rs +++ b/src/physical_optimizer.rs @@ -190,7 +190,6 @@ mod tests { assert_snapshot!(plan, @r" ┌───── Stage 1 Task: partitions: 0,unassigned] │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet - │ └────────────────────────────────────────────────── "); } @@ -209,18 +208,15 @@ mod tests { │partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192 │partitions [out:4 ] ArrowFlightReadExec: Stage 2 - │ └────────────────────────────────────────────────── ┌───── Stage 2 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=4 │partitions [out:4 ] ArrowFlightReadExec: Stage 1 - │ └────────────────────────────────────────────────── ┌───── Stage 1 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 │partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet - │ └────────────────────────────────────────────────── "); } @@ -239,20 +235,17 @@ mod tests { │partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192 │partitions [out:4 ] ArrowFlightReadExec: Stage 2 - │ └────────────────────────────────────────────────── ┌───── Stage 2 Task: partitions: 0,1,unassigned],Task: partitions: 2,3,unassigned] │partitions [out:4 <-- in:2 ] RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=2 │partitions [out:2 <-- in:4 ] PartitionIsolatorExec [providing upto 2 partitions] │partitions [out:4 ] ArrowFlightReadExec: Stage 1 - │ └────────────────────────────────────────────────── ┌───── Stage 1 Task: partitions: 0,1,unassigned],Task: partitions: 2,3,unassigned] │partitions [out:4 <-- in:2 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 │partitions [out:2 <-- in:1 ] PartitionIsolatorExec [providing upto 2 partitions] │partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet - │ └────────────────────────────────────────────────── "); } @@ -267,7 +260,6 @@ mod tests { │partitions [out:1 <-- in:1 ] HashJoinExec: mode=Partitioned, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet - │ └────────────────────────────────────────────────── "); } @@ -312,7 +304,6 @@ mod tests { │partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)] │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192 │partitions [out:4 ] ArrowFlightReadExec: Stage 4 - │ └────────────────────────────────────────────────── ┌───── Stage 2 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4 @@ -320,13 +311,10 @@ mod tests { │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192 │partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2] │partitions [out:4 ] ArrowFlightReadExec: Stage 1 - │ └────────────────────────────────────────────────── ┌───── Stage 1 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)] - │ - │ └────────────────────────────────────────────────── ┌───── Stage 4 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4 @@ -334,13 +322,10 @@ mod tests { │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192 │partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2] │partitions [out:4 ] ArrowFlightReadExec: Stage 3 - │ └────────────────────────────────────────────────── ┌───── Stage 3 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)] - │ - │ └────────────────────────────────────────────────── "); } @@ -353,7 +338,6 @@ mod tests { ┌───── Stage 1 Task: partitions: 0,unassigned] │partitions [out:1 <-- in:1 ] SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[false] │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet - │ └────────────────────────────────────────────────── "); } @@ -367,18 +351,15 @@ mod tests { │partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192 │partitions [out:4 ] ArrowFlightReadExec: Stage 2 - │ └────────────────────────────────────────────────── ┌───── Stage 2 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 4), input_partitions=4 │partitions [out:4 ] ArrowFlightReadExec: Stage 1 - │ └────────────────────────────────────────────────── ┌───── Stage 1 Task: partitions: 0..3,unassigned] │partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 │partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet - │ └────────────────────────────────────────────────── "); } diff --git a/src/stage/display.rs b/src/stage/display.rs index b4369d0..794d45d 100644 --- a/src/stage/display.rs +++ b/src/stage/display.rs @@ -48,8 +48,11 @@ impl DisplayAs for ExecutionStage { )?; let plan_str = display_plan_with_partition_in_out(self.plan.as_ref()) .map_err(|_| std::fmt::Error {})?; - let plan_str = - plan_str.replace('\n', &format!("\n{}{}", " ".repeat(self.depth), VERTICAL)); + let plan_str = plan_str + .split('\n') + .filter(|v| !v.is_empty()) + .collect::>() + .join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL)); writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?; write!( f, diff --git a/src/test_utils/plan.rs b/src/test_utils/plan.rs index b545451..69feeef 100644 --- a/src/test_utils/plan.rs +++ b/src/test_utils/plan.rs @@ -3,7 +3,9 @@ use datafusion::common::plan_err; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::error::DataFusionError; use datafusion::physical_expr::Partitioning; +use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; diff --git a/tests/distributed_aggregation.rs b/tests/distributed_aggregation.rs index fa59aa5..1a572f2 100644 --- a/tests/distributed_aggregation.rs +++ b/tests/distributed_aggregation.rs @@ -53,7 +53,6 @@ mod tests { │partitions [out:8 <-- in:8 ] ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))] │partitions [out:8 <-- in:8 ] AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] │partitions [out:8 ] ArrowFlightReadExec: Stage 2 - │ └────────────────────────────────────────────────── ┌───── Stage 2 Task: partitions: 0..2,unassigned] │partitions [out:3 <-- in:3 ] CoalesceBatchesExec: target_batch_size=8192 @@ -61,11 +60,9 @@ mod tests { │partitions [out:3 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 │partitions [out:1 <-- in:1 ] AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] │partitions [out:1 ] ArrowFlightReadExec: Stage 1 - │ └────────────────────────────────────────────────── ┌───── Stage 1 Task: partitions: 0,unassigned] │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet - │ └────────────────────────────────────────────────── ", );