1717
1818use std:: sync:: Arc ;
1919
20+ use crate :: physical_optimizer:: enforce_distribution:: {
21+ parquet_exec_with_stats, projection_exec_with_alias,
22+ } ;
23+ use crate :: physical_optimizer:: sanity_checker:: {
24+ assert_sanity_check, assert_sanity_check_err,
25+ } ;
2026use crate :: physical_optimizer:: test_utils:: {
2127 aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic,
2228 bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
@@ -28,6 +34,7 @@ use crate::physical_optimizer::test_utils::{
2834 spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec ,
2935} ;
3036
37+ use datafusion_physical_plan:: aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ;
3138use datafusion_physical_plan:: displayable;
3239use arrow:: compute:: SortOptions ;
3340use arrow:: datatypes:: SchemaRef ;
@@ -55,6 +62,8 @@ use datafusion_physical_plan::sorts::sort::SortExec;
5562
5663use rstest:: rstest;
5764
65+ use super :: test_utils:: schema;
66+
5867/// Create a csv exec for tests
5968fn csv_exec_ordered (
6069 schema : & SchemaRef ,
@@ -2280,3 +2289,85 @@ async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()>
22802289 assert_optimized ! ( expected_input, expected_no_change, physical_plan, true ) ;
22812290 Ok ( ( ) )
22822291}
2292+
2293+ fn single_partition_aggregate (
2294+ input : Arc < dyn ExecutionPlan > ,
2295+ alias_pairs : Vec < ( String , String ) > ,
2296+ ) -> Arc < dyn ExecutionPlan > {
2297+ let schema = schema ( ) ;
2298+ let group_by = alias_pairs
2299+ . iter ( )
2300+ . map ( |( column, alias) | ( col ( column, & input. schema ( ) ) . unwrap ( ) , alias. to_string ( ) ) )
2301+ . collect :: < Vec < _ > > ( ) ;
2302+ let group_by = PhysicalGroupBy :: new_single ( group_by) ;
2303+
2304+ Arc :: new (
2305+ AggregateExec :: try_new (
2306+ AggregateMode :: SinglePartitioned ,
2307+ group_by,
2308+ vec ! [ ] ,
2309+ vec ! [ ] ,
2310+ input,
2311+ schema,
2312+ )
2313+ . unwrap ( ) ,
2314+ )
2315+ }
2316+
2317+ #[ tokio:: test]
2318+ async fn test_preserve_needed_coalesce ( ) -> Result < ( ) > {
2319+ // Input to EnforceSorting, from our test case.
2320+ let plan = projection_exec_with_alias (
2321+ union_exec ( vec ! [ parquet_exec_with_stats( ) ; 2 ] ) ,
2322+ vec ! [
2323+ ( "a" . to_string( ) , "a" . to_string( ) ) ,
2324+ ( "b" . to_string( ) , "value" . to_string( ) ) ,
2325+ ] ,
2326+ ) ;
2327+ let plan = Arc :: new ( CoalescePartitionsExec :: new ( plan) ) ;
2328+ let schema = schema ( ) ;
2329+ let sort_key = LexOrdering :: new ( vec ! [ PhysicalSortExpr {
2330+ expr: col( "a" , & schema) . unwrap( ) ,
2331+ options: SortOptions :: default ( ) ,
2332+ } ] ) ;
2333+ let plan: Arc < dyn ExecutionPlan > =
2334+ single_partition_aggregate ( plan, vec ! [ ( "a" . to_string( ) , "a1" . to_string( ) ) ] ) ;
2335+ let plan = sort_exec ( sort_key, plan) ;
2336+
2337+ // Starting plan: as in our test case.
2338+ assert_eq ! (
2339+ get_plan_string( & plan) ,
2340+ vec![
2341+ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]" ,
2342+ " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]" ,
2343+ " CoalescePartitionsExec" ,
2344+ " ProjectionExec: expr=[a@0 as a, b@1 as value]" ,
2345+ " UnionExec" ,
2346+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet" ,
2347+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet" ,
2348+ ] ,
2349+ ) ;
2350+ assert_sanity_check ( & plan, true ) ;
2351+
2352+ // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
2353+ let optimizer = EnforceSorting :: new ( ) ;
2354+ let optimized = optimizer. optimize ( plan, & Default :: default ( ) ) ?;
2355+ assert_eq ! (
2356+ get_plan_string( & optimized) ,
2357+ vec![
2358+ "SortPreservingMergeExec: [a@0 ASC]" ,
2359+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]" ,
2360+ " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]" ,
2361+ " ProjectionExec: expr=[a@0 as a, b@1 as value]" ,
2362+ " UnionExec" ,
2363+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet" ,
2364+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet" ,
2365+ ] ,
2366+ ) ;
2367+
2368+ // Plan is now invalid.
2369+ let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)" ;
2370+ assert_sanity_check_err ( & optimized, err) ;
2371+
2372+ Ok ( ( ) )
2373+ }
0 commit comments