@@ -96,7 +96,7 @@ pub fn materialize_topk(p: LogicalPlan) -> Result<LogicalPlan, DataFusionError>
9696
9797 //let mut e = Expr::Column(f.qualified_column());
9898 let mut e =
99- p. post_projection [ p . input_columns [ out_i] ] . clone ( ) ;
99+ p. post_projection [ out_i] . clone ( ) ;
100100 if out_name != in_field. name ( ) {
101101 e = Expr :: Alias ( Box :: new ( e) , out_name. clone ( ) )
102102 }
@@ -420,3 +420,62 @@ fn make_sort_expr(
420420 _ => col,
421421 }
422422}
423+
424+ #[ cfg( test) ]
425+ mod tests {
426+ use datafusion:: { arrow:: datatypes:: Field , logical_plan:: { col, sum, LogicalPlanBuilder } } ;
427+
428+ use super :: * ;
429+
430+ #[ test]
431+ fn topk_projection_column_switched ( ) {
432+ // A regression test for materialize_topk switching around projection expressions when their
433+ // order does not match the aggregate node's aggregation expression order. (Also, when
434+ // materialize_topk had this bug, the Projection node's DFSchema was left unchanged, making
435+ // it inconsistent with the expressions.)
436+
437+ let table_schema = Schema :: new ( vec ! [
438+ Field :: new( "group_field" , DataType :: Int64 , true ) ,
439+ Field :: new( "agg_sortby" , DataType :: Int64 , true ) ,
440+ Field :: new( "agg_1" , DataType :: Int64 , true ) ,
441+ Field :: new( "agg_2" , DataType :: Int64 , true ) ,
442+ ] ) ;
443+
444+ let scan_node = LogicalPlanBuilder :: scan_empty ( Some ( "table" ) , & table_schema, None ) . unwrap ( ) . build ( ) . unwrap ( ) ;
445+
446+ let cluster_send = ClusterSendNode :: new (
447+ Arc :: new ( scan_node) ,
448+ vec ! [ vec![ ] ] ,
449+ None ,
450+ )
451+ . into_plan ( ) ;
452+
453+ let plan = LogicalPlanBuilder :: from ( cluster_send)
454+ . aggregate ( vec ! [ col( "group_field" ) ] , vec ! [ sum( col( "agg_sortby" ) ) , sum( col( "agg_1" ) ) , sum( col( "agg_2" ) ) ] ) . unwrap ( )
455+ . project ( vec ! [ col( "group_field" ) , col( "SUM(table.agg_sortby)" ) , col( "SUM(table.agg_2)" ) , col( "SUM(table.agg_1)" ) ] ) . expect ( "project to be valid" )
456+ . sort ( vec ! [ col( "SUM(table.agg_sortby)" ) . sort( false , false ) ] ) . unwrap ( )
457+ . limit ( 10 ) . unwrap ( )
458+ . build ( ) . unwrap ( ) ;
459+
460+ let before_schema = plan. schema ( ) . clone ( ) ;
461+
462+ let plan = materialize_topk ( plan) . expect ( "materialize_topk to succeed" ) ;
463+
464+ let after_schema = plan. schema ( ) . clone ( ) ;
465+
466+ // Of course the schema shouldn't change.
467+ assert_eq ! ( before_schema, after_schema) ;
468+
469+ // We are testing that topk materialization doesn't switch the field order (of
470+ // SUM(table.agg_2) and SUM(table.agg_1)) in the projection above it.
471+ let expected = "\
472+ Projection: #table.group_field, #SUM(table.agg_sortby), #SUM(table.agg_2), #SUM(table.agg_1)\
473+ \n ClusterAggregateTopK, limit = 10, groupBy = [#table.group_field], aggr = [SUM(#table.agg_sortby), SUM(#table.agg_1), SUM(#table.agg_2)], sortBy = [SortColumn { agg_index: 0, asc: false, nulls_first: false }]\
474+ \n TableScan: table projection=None";
475+ let formatted = format ! ( "{:?}" , plan) ;
476+
477+ assert_eq ! ( expected, formatted) ;
478+
479+ }
480+
481+ }
0 commit comments