Skip to content

Commit 89af694

Browse files
jizezhangalamb
andauthored
remove repartition exec from coalesce batches optimizer (#19239)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18782. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Removes `RepartitionExec` from `CoalesceBatches` optimizer. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 79cfe8e commit 89af694

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1000
-1319
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2906,10 +2906,9 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
29062906
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
29072907
| | ProjectionExec: expr=[b@0 as b, count(Int64(1))@1 as count(*), count(Int64(1))@1 as count(Int64(1))] |
29082908
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(Int64(1))] |
2909-
| | CoalesceBatchesExec: target_batch_size=8192 |
2910-
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2911-
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(Int64(1))] |
2912-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2909+
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2910+
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(Int64(1))] |
2911+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
29132912
| | |
29142913
+---------------+------------------------------------------------------------------------------------------------------------+
29152914
"
@@ -2927,10 +2926,9 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
29272926
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
29282927
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
29292928
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] |
2930-
| | CoalesceBatchesExec: target_batch_size=8192 |
2931-
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2932-
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
2933-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2929+
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
2930+
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
2931+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
29342932
| | |
29352933
+---------------+----------------------------------------------------------------------------+
29362934
"
@@ -3342,10 +3340,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
33423340
| | CoalescePartitionsExec |
33433341
| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] |
33443342
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] |
3345-
| | CoalesceBatchesExec: target_batch_size=8192 |
3346-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3347-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3348-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3343+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3344+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3345+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
33493346
| | DataSourceExec: partitions=1, partition_sizes=[1] |
33503347
| | |
33513348
+---------------+----------------------------------------------------------------------------------------------------------------------------+
@@ -3399,10 +3396,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
33993396
| | CoalescePartitionsExec |
34003397
| | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] |
34013398
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] |
3402-
| | CoalesceBatchesExec: target_batch_size=8192 |
3403-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3404-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3405-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3399+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3400+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3401+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
34063402
| | DataSourceExec: partitions=1, partition_sizes=[1] |
34073403
| | |
34083404
+---------------+----------------------------------------------------------------------------------------------------------------------------+

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,6 @@ async fn explain_analyze_baseline_metrics() {
103103
"output_bytes=",
104104
expected_batch_count_after_repartition
105105
);
106-
107-
assert_metrics!(
108-
&formatted,
109-
"CoalesceBatchesExec: target_batch_size=4096",
110-
"metrics=[output_rows=5, elapsed_compute",
111-
"output_bytes=",
112-
expected_batch_count_after_repartition
113-
);
114106
}
115107

116108
assert_metrics!(
@@ -771,12 +763,11 @@ async fn test_physical_plan_display_indent() {
771763
SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]
772764
ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]
773765
AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]
774-
CoalesceBatchesExec: target_batch_size=4096
775-
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000
776-
AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]
777-
FilterExec: c12@1 < 10
778-
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
779-
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], file_type=csv, has_header=true
766+
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000
767+
AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]
768+
FilterExec: c12@1 < 10
769+
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
770+
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], file_type=csv, has_header=true
780771
"
781772
);
782773
}
@@ -813,12 +804,10 @@ async fn test_physical_plan_display_indent_multi_children() {
813804
actual,
814805
@r"
815806
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
816-
CoalesceBatchesExec: target_batch_size=4096
817-
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
818-
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
819-
CoalesceBatchesExec: target_batch_size=4096
820-
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
821-
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
807+
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
808+
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
809+
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
810+
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
822811
"
823812
);
824813
}

datafusion/core/tests/sql/joins.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,10 @@ async fn join_change_in_planner() -> Result<()> {
7272
actual,
7373
@r"
7474
SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10
75-
CoalesceBatchesExec: target_batch_size=8192
76-
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
77-
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
78-
CoalesceBatchesExec: target_batch_size=8192
79-
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
80-
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
75+
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
76+
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
77+
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
78+
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
8179
"
8280
);
8381
Ok(())
@@ -131,12 +129,10 @@ async fn join_no_order_on_filter() -> Result<()> {
131129
actual,
132130
@r"
133131
SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a3@0 AS Int64) > CAST(a3@1 AS Int64) + 3 AND CAST(a3@0 AS Int64) < CAST(a3@1 AS Int64) + 10
134-
CoalesceBatchesExec: target_batch_size=8192
135-
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
136-
StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
137-
CoalesceBatchesExec: target_batch_size=8192
138-
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
139-
StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
132+
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
133+
StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
134+
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1, maintains_sort_order=true
135+
StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST]
140136
"
141137
);
142138
Ok(())
@@ -172,12 +168,10 @@ async fn join_change_in_planner_without_sort() -> Result<()> {
172168
actual,
173169
@r"
174170
SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10
175-
CoalesceBatchesExec: target_batch_size=8192
176-
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1
177-
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true
178-
CoalesceBatchesExec: target_batch_size=8192
179-
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1
180-
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true
171+
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1
172+
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true
173+
RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1
174+
StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true
181175
"
182176
);
183177
Ok(())

datafusion/physical-optimizer/src/coalesce_batches.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ use std::sync::Arc;
2525
use datafusion_common::assert_eq_or_internal_err;
2626
use datafusion_common::config::ConfigOptions;
2727
use datafusion_common::error::Result;
28-
use datafusion_physical_expr::Partitioning;
2928
use datafusion_physical_plan::{
3029
ExecutionPlan, async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
31-
repartition::RepartitionExec,
3230
};
3331

3432
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -57,23 +55,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
5755
let target_batch_size = config.execution.batch_size;
5856
plan.transform_up(|plan| {
5957
let plan_any = plan.as_any();
60-
let wrap_in_coalesce = plan_any
61-
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
62-
.downcast_ref::<RepartitionExec>()
63-
.map(|repart_exec| {
64-
!matches!(
65-
repart_exec.partitioning().clone(),
66-
Partitioning::RoundRobinBatch(_)
67-
)
68-
})
69-
.unwrap_or(false);
70-
71-
if wrap_in_coalesce {
72-
Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
73-
plan,
74-
target_batch_size,
75-
))))
76-
} else if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() {
58+
if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() {
7759
// Coalesce inputs to async functions to reduce number of async function invocations
7860
let children = async_exec.children();
7961
assert_eq_or_internal_err!(

0 commit comments

Comments
 (0)