Skip to content

Commit 89556c2

Browse files
committed
fix: no longer remove needed coalesce
1 parent 0661ed7 commit 89556c2

File tree

4 files changed

+30
-49
lines changed

4 files changed

+30
-49
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
21-
use crate::physical_optimizer::sanity_checker::{
22-
assert_sanity_check, assert_sanity_check_err,
23-
};
21+
use crate::physical_optimizer::sanity_checker::assert_sanity_check;
2422
use crate::physical_optimizer::test_utils::{
2523
aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic,
2624
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
@@ -2307,40 +2305,27 @@ async fn test_preserve_needed_coalesce() -> Result<()> {
23072305
let plan = sort_exec(sort_key, plan);
23082306

23092307
// Starting plan: as in our test case.
2310-
assert_eq!(
2311-
get_plan_string(&plan),
2312-
vec![
2313-
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2314-
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2315-
" CoalescePartitionsExec",
2316-
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2317-
" UnionExec",
2318-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2319-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2320-
],
2321-
);
2308+
let starting_plan = vec![
2309+
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2310+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2311+
" CoalescePartitionsExec",
2312+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2313+
" UnionExec",
2314+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2315+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2316+
];
2317+
assert_eq!(get_plan_string(&plan), starting_plan);
2318+
23222319
// Test: plan is valid.
23232320
assert_sanity_check(&plan, true);
23242321

2325-
// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
2322+
// EnforceSorting will not remove the coalesce, as it's required.
23262323
let optimizer = EnforceSorting::new();
23272324
let optimized = optimizer.optimize(plan, &Default::default())?;
2328-
assert_eq!(
2329-
get_plan_string(&optimized),
2330-
vec![
2331-
"SortPreservingMergeExec: [a@0 ASC]",
2332-
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
2333-
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2334-
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2335-
" UnionExec",
2336-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2337-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2338-
],
2339-
);
2325+
assert_eq!(get_plan_string(&optimized), starting_plan);
23402326

2341-
// Bug: Plan is now invalid.
2342-
let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)";
2343-
assert_sanity_check_err(&optimized, err);
2327+
// Test: plan is valid.
2328+
assert_sanity_check(&optimized, true);
23442329

23452330
Ok(())
23462331
}

datafusion/core/tests/physical_optimizer/sanity_checker.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,6 @@ pub(crate) fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool)
397397
);
398398
}
399399

400-
/// Assert reason for sanity check failure.
401-
pub(crate) fn assert_sanity_check_err(plan: &Arc<dyn ExecutionPlan>, err: &str) {
402-
let sanity_checker = SanityCheckPlan::new();
403-
let opts = ConfigOptions::default();
404-
let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err();
405-
assert!(error.message().contains(err));
406-
}
407-
408400
/// Check if the plan we created is as expected by comparing the plan
409401
/// formatted as a string.
410402
fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) {

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ use crate::enforce_sorting::sort_pushdown::{
4747
assign_initial_requirements, pushdown_sorts, SortPushDown,
4848
};
4949
use crate::utils::{
50-
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
51-
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
50+
add_sort_above, add_sort_above_with_check, is_aggregate, is_coalesce_partitions,
51+
is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
5252
};
5353
use crate::PhysicalOptimizerRule;
5454

@@ -138,27 +138,25 @@ fn is_coalesce_to_remove(
138138
node: &Arc<dyn ExecutionPlan>,
139139
parent: &Arc<dyn ExecutionPlan>,
140140
) -> bool {
141-
node.as_any()
142-
.downcast_ref::<CoalescePartitionsExec>()
141+
node.as_any().downcast_ref::<CoalescePartitionsExec>()
143142
.map(|_coalesce| {
144143
// TODO(wiedld): find a more generalized approach that does not rely on
145144
// pattern matching the structure of the DAG
146145
// Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of:
147146
// * Repartition -> Coalesce -> Repartition
147+
// * Coalesce -> AggregateExec(input=hash-partitioned)
148148

149-
let parent_req_single_partition = matches!(
150-
parent.required_input_distribution()[0],
151-
Distribution::SinglePartition
152-
);
149+
let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition)
150+
// handle aggregates with input=hashPartitioning with a single output partition
151+
|| (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1);
153152

154153
// node above does not require single distribution
155154
!parent_req_single_partition
156155
// it doesn't immediately repartition
157156
|| is_repartition(parent)
158157
// any adjacent Coalesce->Sort can be replaced
159158
|| is_sort(parent)
160-
})
161-
.unwrap_or(false)
159+
}).unwrap_or(false)
162160
}
163161

164162
fn update_coalesce_ctx_children(

datafusion/physical-optimizer/src/utils.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919

2020
use datafusion_physical_expr::LexRequirement;
2121
use datafusion_physical_expr_common::sort_expr::LexOrdering;
22+
use datafusion_physical_plan::aggregates::AggregateExec;
2223
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
2324
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2425
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -100,6 +101,11 @@ pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
100101
plan.as_any().is::<RepartitionExec>()
101102
}
102103

104+
/// Checks whether the given operator is an [`AggregateExec`].
105+
pub fn is_aggregate(plan: &Arc<dyn ExecutionPlan>) -> bool {
106+
plan.as_any().is::<AggregateExec>()
107+
}
108+
103109
/// Checks whether the given operator is a limit;
104110
/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
105111
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {

0 commit comments

Comments
 (0)