@@ -95,8 +95,7 @@ pub fn materialize_topk(p: LogicalPlan) -> Result<LogicalPlan, DataFusionError>
9595 let out_name = out_schema. field ( out_i) . name ( ) ;
9696
9797 //let mut e = Expr::Column(f.qualified_column());
98- let mut e =
99- p. post_projection [ p. input_columns [ out_i] ] . clone ( ) ;
98+ let mut e = p. post_projection [ out_i] . clone ( ) ;
10099 if out_name != in_field. name ( ) {
101100 e = Expr :: Alias ( Box :: new ( e) , out_name. clone ( ) )
102101 }
@@ -420,3 +419,75 @@ fn make_sort_expr(
420419 _ => col,
421420 }
422421}
422+
423+ #[ cfg( test) ]
424+ mod tests {
425+ use datafusion:: {
426+ arrow:: datatypes:: Field ,
427+ logical_plan:: { col, sum, LogicalPlanBuilder } ,
428+ } ;
429+
430+ use super :: * ;
431+
432+ #[ test]
433+ fn topk_projection_column_switched ( ) {
434+ // A regression test for materialize_topk switching around projection expressions when their
435+ // order does not match the aggregate node's aggregation expression order. (Also, when
436+ // materialize_topk had this bug, the Projection node's DFSchema was left unchanged, making
437+ // it inconsistent with the expressions.)
438+
439+ let table_schema = Schema :: new ( vec ! [
440+ Field :: new( "group_field" , DataType :: Int64 , true ) ,
441+ Field :: new( "agg_sortby" , DataType :: Int64 , true ) ,
442+ Field :: new( "agg_1" , DataType :: Int64 , true ) ,
443+ Field :: new( "agg_2" , DataType :: Int64 , true ) ,
444+ ] ) ;
445+
446+ let scan_node = LogicalPlanBuilder :: scan_empty ( Some ( "table" ) , & table_schema, None )
447+ . unwrap ( )
448+ . build ( )
449+ . unwrap ( ) ;
450+
451+ let cluster_send =
452+ ClusterSendNode :: new ( Arc :: new ( scan_node) , vec ! [ vec![ ] ] , None ) . into_plan ( ) ;
453+
454+ let plan = LogicalPlanBuilder :: from ( cluster_send)
455+ . aggregate (
456+ vec ! [ col( "group_field" ) ] ,
457+ vec ! [ sum( col( "agg_sortby" ) ) , sum( col( "agg_1" ) ) , sum( col( "agg_2" ) ) ] ,
458+ )
459+ . unwrap ( )
460+ . project ( vec ! [
461+ col( "group_field" ) ,
462+ col( "SUM(table.agg_sortby)" ) ,
463+ col( "SUM(table.agg_2)" ) ,
464+ col( "SUM(table.agg_1)" ) ,
465+ ] )
466+ . expect ( "project to be valid" )
467+ . sort ( vec ! [ col( "SUM(table.agg_sortby)" ) . sort( false , false ) ] )
468+ . unwrap ( )
469+ . limit ( 10 )
470+ . unwrap ( )
471+ . build ( )
472+ . unwrap ( ) ;
473+
474+ let before_schema = plan. schema ( ) . clone ( ) ;
475+
476+ let plan = materialize_topk ( plan) . expect ( "materialize_topk to succeed" ) ;
477+
478+ let after_schema = plan. schema ( ) . clone ( ) ;
479+
480+ // Of course the schema shouldn't change.
481+ assert_eq ! ( before_schema, after_schema) ;
482+
483+ // We are testing that topk materialization doesn't switch the field order (of
484+ // SUM(table.agg_2) and SUM(table.agg_1)) in the projection above it.
485+ let expected = "\
486+ Projection: #table.group_field, #SUM(table.agg_sortby), #SUM(table.agg_2), #SUM(table.agg_1)\
487+ \n ClusterAggregateTopK, limit = 10, groupBy = [#table.group_field], aggr = [SUM(#table.agg_sortby), SUM(#table.agg_1), SUM(#table.agg_2)], sortBy = [SortColumn { agg_index: 0, asc: false, nulls_first: false }]\
488+ \n TableScan: table projection=None";
489+ let formatted = format ! ( "{:?}" , plan) ;
490+
491+ assert_eq ! ( expected, formatted) ;
492+ }
493+ }
0 commit comments