@@ -21,36 +21,69 @@ use std::sync::Arc;
2121///
2222/// The latter gives results in more parallelism and less network.
2323pub fn push_aggregate_to_workers (
24- p : Arc < dyn ExecutionPlan > ,
24+ p_final : Arc < dyn ExecutionPlan > ,
2525) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
26+ let p_final_agg: & AggregateExec ;
27+ let p_partial: & Arc < dyn ExecutionPlan > ;
28+ if let Some ( a) = p_final. as_any ( ) . downcast_ref :: < AggregateExec > ( ) {
29+ if matches ! ( a. mode( ) , AggregateMode :: Final | AggregateMode :: FinalPartitioned ) {
30+ p_final_agg = a;
31+ p_partial = a. input ( ) ;
32+ } else {
33+ return Ok ( p_final) ;
34+ }
35+ } else {
36+ return Ok ( p_final) ;
37+ }
38+
2639 let agg;
27- if let Some ( a) = p . as_any ( ) . downcast_ref :: < AggregateExec > ( ) {
40+ if let Some ( a) = p_partial . as_any ( ) . downcast_ref :: < AggregateExec > ( ) {
2841 agg = a;
2942 } else {
30- return Ok ( p ) ;
43+ return Ok ( p_final ) ;
3144 }
3245 if * agg. mode ( ) != AggregateMode :: Partial {
33- return Ok ( p ) ;
46+ return Ok ( p_final ) ;
3447 }
3548
36- if let Some ( cs) = agg. input ( ) . as_any ( ) . downcast_ref :: < ClusterSendExec > ( ) {
49+ let p_final_input: Arc < dyn ExecutionPlan > = if let Some ( cs) = agg. input ( ) . as_any ( ) . downcast_ref :: < ClusterSendExec > ( ) {
50+ let clustersend_input = p_partial. clone ( )
51+ . with_new_children ( vec ! [ cs. input_for_optimizations. clone( ) ] ) ?;
52+
3753 // Router plan, replace partial aggregate with cluster send.
38- Ok ( Arc :: new (
54+ Arc :: new (
3955 cs. with_changed_schema (
40- p. clone ( )
41- . with_new_children ( vec ! [ cs. input_for_optimizations. clone( ) ] ) ?,
56+ clustersend_input,
4257 ) ,
43- ) )
58+ )
4459 } else if let Some ( w) = agg. input ( ) . as_any ( ) . downcast_ref :: < WorkerExec > ( ) {
60+ let worker_input = p_partial. clone ( ) . with_new_children ( vec ! [ w. input. clone( ) ] ) ?;
61+
4562 // Worker plan, execute partial aggregate inside the worker.
46- Ok ( Arc :: new ( WorkerExec {
47- input : p . clone ( ) . with_new_children ( vec ! [ w . input . clone ( ) ] ) ? ,
63+ Arc :: new ( WorkerExec {
64+ input : worker_input ,
4865 max_batch_rows : w. max_batch_rows ,
4966 limit_and_reverse : w. limit_and_reverse . clone ( ) ,
50- } ) )
67+ } )
5168 } else {
52- Ok ( p)
53- }
69+ return Ok ( p_final) ;
70+ } ;
71+
72+ // We change AggregateMode::FinalPartitioned to AggregateMode::Final, because the ClusterSend
73+ // node ends up creating an incompatible partitioning for FinalPartitioned. Some other ideas,
74+ // like adding a RepartitionExec node, would just be redundant with the behavior of
75+ // AggregateExec::Final, and also, tricky to set up with the ideal number of partitions in the
76+ // middle of optimization passes. Having ClusterSend be able to pass through hash partitions in
77+ // some form is another option.
78+ let p_final_input_schema = p_final_input. schema ( ) ;
79+ Ok ( Arc :: new ( AggregateExec :: try_new (
80+ AggregateMode :: Final ,
81+ p_final_agg. group_expr ( ) . clone ( ) ,
82+ p_final_agg. aggr_expr ( ) . to_vec ( ) ,
83+ p_final_agg. filter_expr ( ) . to_vec ( ) ,
84+ p_final_input,
85+ p_final_input_schema,
86+ ) ?) )
5487}
5588
5689// TODO upgrade DF: this one was handled by something else but most likely only in sorted scenario
0 commit comments