@@ -3,17 +3,23 @@ use crate::queryplanner::planning::WorkerExec;
33use crate :: queryplanner:: query_executor:: ClusterSendExec ;
44use crate :: queryplanner:: tail_limit:: TailLimitExec ;
55use crate :: queryplanner:: topk:: AggregateTopKExec ;
6+ use datafusion:: common:: tree_node:: { Transformed , TreeNode } ;
7+ use datafusion:: common:: { internal_datafusion_err, HashMap } ;
68use datafusion:: config:: ConfigOptions ;
79use datafusion:: error:: DataFusionError ;
8- use datafusion:: physical_expr:: LexOrdering ;
10+ use datafusion:: physical_expr:: { LexOrdering , LexRequirement , PhysicalSortRequirement } ;
911use datafusion:: physical_optimizer:: limit_pushdown:: LimitPushdown ;
1012use datafusion:: physical_optimizer:: PhysicalOptimizerRule as _;
1113use datafusion:: physical_plan:: aggregates:: { AggregateExec , AggregateMode } ;
1214use datafusion:: physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
1315use datafusion:: physical_plan:: limit:: GlobalLimitExec ;
16+ use datafusion:: physical_plan:: projection:: ProjectionExec ;
17+ use datafusion:: physical_plan:: sorts:: sort:: SortExec ;
1418use datafusion:: physical_plan:: sorts:: sort_preserving_merge:: SortPreservingMergeExec ;
1519use datafusion:: physical_plan:: union:: UnionExec ;
16- use datafusion:: physical_plan:: { ExecutionPlan , ExecutionPlanProperties } ;
20+ use datafusion:: physical_plan:: { ExecutionPlan , ExecutionPlanProperties , PhysicalExpr } ;
21+ use itertools:: Itertools as _;
22+ use std:: collections:: HashSet ;
1723use std:: sync:: Arc ;
1824
1925/// Transforms from:
@@ -227,3 +233,205 @@ pub fn add_limit_to_workers(
227233 p. with_new_children ( vec ! [ limit_optimized] )
228234 }
229235}
236+
237+ /// Because we disable `EnforceDistribution`, and because we add `SortPreservingMergeExec` in
238+ /// `ensure_partition_merge_with_acceptable_parent` so that Sorted ("inplace") aggregates work
239+ /// properly (which reduces memory usage), we in some cases have unnecessary
240+ /// `SortPreservingMergeExec` nodes underneath a `Sort` node with a different ordering. Or,
241+ /// perhaps, we added a `GlobalLimitExec` by `add_limit_to_workers` and we can push down the limit
242+ /// into a _matching_ `SortPreservingMergeExec` node.
243+ ///
244+ /// A minor complication: There may be projection nodes in between that rename things.
245+ pub fn replace_suboptimal_merge_sorts (
246+ p : Arc < dyn ExecutionPlan > ,
247+ ) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
248+ if let Some ( sort) = p. as_any ( ) . downcast_ref :: < SortExec > ( ) {
249+ if sort. preserve_partitioning ( ) {
250+ // Let's not handle this.
251+ return Ok ( p) ;
252+ }
253+ let required_ordering = p
254+ . output_ordering ( )
255+ . cloned ( )
256+ . map ( LexRequirement :: from)
257+ . unwrap_or_default ( ) ;
258+ let new_input =
259+ replace_suboptimal_merge_sorts_helper ( & required_ordering, sort. fetch ( ) , sort. input ( ) ) ?;
260+ p. with_new_children ( vec ! [ new_input] )
261+ } else {
262+ Ok ( p)
263+ }
264+ }
265+
266+ /// Replaces SortPreservingMergeExec in the subtree with either a CoalescePartitions (if it doesn't
267+ /// match the ordering) or, if it does match the sort ordering, pushes down fetch information if
268+ /// appropriate.
269+ fn replace_suboptimal_merge_sorts_helper (
270+ required_ordering : & LexRequirement ,
271+ fetch : Option < usize > ,
272+ node : & Arc < dyn ExecutionPlan > ,
273+ ) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
274+ let node_any = node. as_any ( ) ;
275+ if let Some ( spm) = node_any. downcast_ref :: < SortPreservingMergeExec > ( ) {
276+ // A SortPreservingMergeExec that sort_exprs is a prefix of, is an acceptable ordering. But
277+ // if there is no sort_exprs at all, we just use CoalescePartitions.
278+ if !required_ordering. is_empty ( ) {
279+ let spm_req = LexRequirement :: from (
280+ spm. properties ( )
281+ . output_ordering ( )
282+ . cloned ( )
283+ . unwrap_or ( LexOrdering :: default ( ) ) ,
284+ ) ;
285+ if !required_ordering. is_empty ( )
286+ && spm
287+ . properties ( )
288+ . eq_properties
289+ . requirements_compatible ( required_ordering, & spm_req)
290+ {
291+ // Okay, we have a matching SortPreservingMergeExec node!
292+
293+ let mut new_fetch: Option < usize > = fetch;
294+ let new_spm = if let Some ( fetch) = fetch {
295+ if let Some ( spm_fetch) = spm. fetch ( ) {
296+ if fetch < spm_fetch {
297+ Arc :: new ( spm. clone ( ) . with_fetch ( Some ( fetch) ) )
298+ } else {
299+ // spm fetch is tighter.
300+ new_fetch = Some ( spm_fetch) ;
301+ node. clone ( )
302+ }
303+ } else {
304+ Arc :: new ( spm. clone ( ) . with_fetch ( Some ( fetch) ) )
305+ }
306+ } else {
307+ node. clone ( )
308+ } ;
309+
310+ // Pass down spm's ordering, not sort_exprs, because we didn't touch spm besides the fetch..
311+
312+ let new_input = replace_suboptimal_merge_sorts_helper (
313+ & spm_req,
314+ new_fetch,
315+ new_spm
316+ . children ( )
317+ . first ( )
318+ . ok_or ( internal_datafusion_err ! ( "no child" ) ) ?,
319+ ) ?;
320+
321+ return new_spm. with_new_children ( vec ! [ new_input] ) ;
322+ }
323+ }
324+ // sort_exprs is _not_ a prefix of spm.expr()
325+ // Aside: if spm.expr() is a prefix of sort_exprs, maybe SortExec could take advantage.
326+
327+ // So it's not an acceptable ordering. Create a CoalescePartitions, and remove other nested SortPreservingMergeExecs.
328+ let new_input = replace_suboptimal_merge_sorts_helper (
329+ & LexRequirement :: new ( vec ! [ ] ) ,
330+ fetch,
331+ spm. input ( ) ,
332+ ) ?;
333+
334+ return Ok ( Arc :: new ( CoalescePartitionsExec :: new ( new_input) ) ) ;
335+ } else if let Some ( proj) = node_any. downcast_ref :: < ProjectionExec > ( ) {
336+ // TODO: Note that ProjectionExec has a TODO comment in DF's EnforceSorting optimizer (in sort_pushdown.rs).
337+ if let Some ( new_sort_exprs) =
338+ sort_exprs_underneath_projection ( required_ordering, proj. expr ( ) ) ?
339+ {
340+ let new_input =
341+ replace_suboptimal_merge_sorts_helper ( & new_sort_exprs, fetch, proj. input ( ) ) ?;
342+ node. clone ( ) . with_new_children ( vec ! [ new_input] )
343+ } else {
344+ Ok ( node. clone ( ) )
345+ }
346+ } else if let Some ( u) = node_any. downcast_ref :: < UnionExec > ( ) {
347+ let new_children: Result < Vec < _ > , DataFusionError > = u
348+ . inputs ( )
349+ . iter ( )
350+ . map ( |child| replace_suboptimal_merge_sorts_helper ( required_ordering, fetch, child) )
351+ . collect :: < Result < Vec < _ > , DataFusionError > > ( ) ;
352+ let new_children = new_children?;
353+ Ok ( Arc :: new ( UnionExec :: new ( new_children) ) )
354+ } else {
355+ Ok ( node. clone ( ) )
356+ }
357+ }
358+
359+ fn sort_exprs_underneath_projection (
360+ sort_exprs : & LexRequirement ,
361+ proj_expr : & [ ( Arc < dyn PhysicalExpr > , String ) ] ,
362+ ) -> Result < Option < LexRequirement > , DataFusionError > {
363+ let mut sort_expr_columns = HashSet :: < usize > :: new ( ) ;
364+ for expr in sort_exprs. iter ( ) {
365+ record_columns_used ( & mut sort_expr_columns, expr. expr . as_ref ( ) ) ;
366+ }
367+
368+ // sorted() just for determinism
369+ let sort_expr_columns: Vec < usize > = sort_expr_columns. into_iter ( ) . sorted ( ) . collect ( ) ;
370+ let mut replacement_map =
371+ HashMap :: < usize , datafusion:: physical_plan:: expressions:: Column > :: with_capacity (
372+ sort_expr_columns. len ( ) ,
373+ ) ;
374+
375+ for index in sort_expr_columns {
376+ let proj_lookup = proj_expr. get ( index) . ok_or_else ( || {
377+ DataFusionError :: Internal (
378+ "proj_expr lookup in sort_exprs_underneath_projection failed" . to_owned ( ) ,
379+ )
380+ } ) ?;
381+ let Some ( column_expr) = proj_lookup
382+ . 0
383+ . as_any ( )
384+ . downcast_ref :: < datafusion:: physical_plan:: expressions:: Column > ( )
385+ else {
386+ return Ok ( None ) ;
387+ } ;
388+ replacement_map. insert ( index, column_expr. clone ( ) ) ;
389+ }
390+
391+ // Now replace the columns in the sort_exprs with our different ones.
392+ let mut new_sort_exprs = Vec :: with_capacity ( sort_exprs. len ( ) ) ;
393+ for e in sort_exprs. iter ( ) {
394+ let transformed = replace_columns ( & replacement_map, & e. expr ) ?;
395+ new_sort_exprs. push ( PhysicalSortRequirement {
396+ expr : transformed,
397+ options : e. options ,
398+ } ) ;
399+ }
400+
401+ Ok ( Some ( LexRequirement :: new ( new_sort_exprs) ) )
402+ }
403+
404+ fn record_columns_used ( set : & mut HashSet < usize > , expr : & dyn PhysicalExpr ) {
405+ if let Some ( column) = expr
406+ . as_any ( )
407+ . downcast_ref :: < datafusion:: physical_plan:: expressions:: Column > ( )
408+ {
409+ set. insert ( column. index ( ) ) ;
410+ } else {
411+ for child in expr. children ( ) {
412+ record_columns_used ( set, child. as_ref ( ) ) ;
413+ }
414+ }
415+ }
416+
417+ fn replace_columns (
418+ replacement_map : & HashMap < usize , datafusion:: physical_plan:: expressions:: Column > ,
419+ expr : & Arc < dyn PhysicalExpr > ,
420+ ) -> Result < Arc < dyn PhysicalExpr > , DataFusionError > {
421+ Ok (
422+ TreeNode :: transform ( expr. clone ( ) , |node : Arc < dyn PhysicalExpr > | {
423+ if let Some ( column) = node
424+ . as_any ( )
425+ . downcast_ref :: < datafusion:: physical_plan:: expressions:: Column > ( )
426+ {
427+ let replacement = replacement_map. get ( & column. index ( ) ) . ok_or_else ( || {
428+ DataFusionError :: Internal ( "replace_columns has bad replacement_map" . to_owned ( ) )
429+ } ) ?;
430+ Ok ( Transformed :: yes ( Arc :: new ( replacement. clone ( ) ) ) )
431+ } else {
432+ Ok ( Transformed :: no ( node) )
433+ }
434+ } ) ?
435+ . data ,
436+ )
437+ }
0 commit comments