@@ -37,7 +37,7 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result<LogicalPlan> {
3737 LogicalPlan :: Limit { n, input } => {
3838 let schema: & Arc < DFSchema > = input. schema ( ) ;
3939
40- let lift_up_result = lift_up_expensive_projections ( input, HashSet :: new ( ) ) ;
40+ let lift_up_result = lift_up_expensive_projections ( input, ColumnRecorder :: default ( ) ) ;
4141 pal_debug ! ( "lift_up_res: {:?}" , lift_up_result) ;
4242 match lift_up_result {
4343 Ok ( ( inner_plan, None ) ) => Ok ( LogicalPlan :: Limit {
@@ -107,8 +107,11 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result<LogicalPlan> {
107107 }
108108}
109109
110+ #[ derive( Default ) ]
110111struct ColumnRecorder {
111- columns : HashSet < Column > ,
112+ /// We use indexmap IndexSet because we want iteration order to be deterministic and
113+ /// specifically, to match left-to-right insertion order.
114+ columns : indexmap:: IndexSet < Column > ,
112115}
113116
114117impl ExpressionVisitor for ColumnRecorder {
@@ -180,21 +183,16 @@ fn looks_expensive(ex: &Expr) -> Result<bool> {
180183
181184fn lift_up_expensive_projections (
182185 plan : & LogicalPlan ,
183- used_columns : HashSet < Column > ,
186+ used_columns : ColumnRecorder ,
184187) -> Result < ( LogicalPlan , Option < Vec < Expr > > ) > {
185188 match plan {
186189 LogicalPlan :: Sort { expr, input } => {
187- let mut recorder = ColumnRecorder {
188- columns : used_columns,
189- } ;
190+ let mut recorder = used_columns;
190191 for ex in expr {
191192 recorder = ex. accept ( recorder) ?;
192193 }
193194
194- let used_columns = recorder. columns ;
195-
196- let ( new_input, lifted_projection) =
197- lift_up_expensive_projections ( & input, used_columns) ?;
195+ let ( new_input, lifted_projection) = lift_up_expensive_projections ( & input, recorder) ?;
198196 pal_debug ! (
199197 "Sort sees result:\n {:?};;;{:?};;;" ,
200198 new_input,
@@ -213,9 +211,7 @@ fn lift_up_expensive_projections(
213211 input,
214212 schema,
215213 } => {
216- let mut column_recorder = ColumnRecorder {
217- columns : HashSet :: new ( ) ,
218- } ;
214+ let mut column_recorder = ColumnRecorder :: default ( ) ;
219215
220216 let mut this_projection_exprs = Vec :: < usize > :: new ( ) ;
221217
@@ -241,7 +237,7 @@ fn lift_up_expensive_projections(
241237 already_retained_cols. push ( ( col. clone ( ) , Some ( alias. clone ( ) ) ) ) ;
242238 }
243239
244- if used_columns. contains ( & field. qualified_column ( ) ) {
240+ if used_columns. columns . contains ( & field. qualified_column ( ) ) {
245241 pal_debug ! (
246242 "Expr {}: used_columns contains field {:?}" ,
247243 i,
@@ -510,6 +506,44 @@ mod tests {
510506 Ok ( ( ) )
511507 }
512508
509+ /// Tests that multiple columns are retained in a deterministic order (and as a nice-to-have,
510+ /// they should be in the left-to-right order of appearance).
511+ #[ test]
512+ fn limit_sorted_plan_with_expensive_expr_retaining_multiple_columns ( ) -> Result < ( ) > {
513+ let table_scan = test_table_scan_abcd ( ) ?;
514+
515+ let case_expr = when ( col ( "d" ) . eq ( lit ( 3 ) ) , col ( "c" ) + lit ( 2 ) ) . otherwise ( lit ( 5 ) ) ?;
516+
517+ let plan = LogicalPlanBuilder :: from ( table_scan)
518+ . project ( [
519+ col ( "a" ) . alias ( "a1" ) ,
520+ col ( "b" ) . alias ( "b1" ) ,
521+ case_expr. alias ( "c1" ) ,
522+ ] ) ?
523+ . sort ( [ col ( "a1" ) . sort ( true , true ) ] ) ?
524+ . limit ( 50 ) ?
525+ . build ( ) ?;
526+
527+ let expected = "Limit: 50\
528+ \n Sort: #a1 ASC NULLS FIRST\
529+ \n Projection: #test.a AS a1, #test.b AS b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\
530+ \n TableScan: test projection=None";
531+
532+ let formatted = format ! ( "{:?}" , plan) ;
533+ assert_eq ! ( formatted, expected) ;
534+
535+ // We are testing that test.d deterministically comes before test.c in the inner Projection.
536+ let optimized_expected = "Projection: #a1, #b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\
537+ \n Limit: 50\
538+ \n Sort: #a1 ASC NULLS FIRST\
539+ \n Projection: #test.a AS a1, #test.b AS b1, #test.d, #test.c\
540+ \n TableScan: test projection=None";
541+
542+ assert_optimized_plan_eq ( & plan, optimized_expected) ;
543+
544+ Ok ( ( ) )
545+ }
546+
513547 /// Tests that we re-alias fields in the lifted up projection.
514548 #[ test]
515549 fn limit_sorted_plan_with_nonaliased_expensive_expr_optimized ( ) -> Result < ( ) > {
@@ -659,4 +693,15 @@ mod tests {
659693 pub fn test_table_scan ( ) -> Result < LogicalPlan > {
660694 test_table_scan_with_name ( "test" )
661695 }
696+
697+ pub fn test_table_scan_abcd ( ) -> Result < LogicalPlan > {
698+ let name = "test" ;
699+ let schema = Schema :: new ( vec ! [
700+ Field :: new( "a" , DataType :: UInt32 , false ) ,
701+ Field :: new( "b" , DataType :: UInt32 , false ) ,
702+ Field :: new( "c" , DataType :: UInt32 , false ) ,
703+ Field :: new( "d" , DataType :: UInt32 , false ) ,
704+ ] ) ;
705+ LogicalPlanBuilder :: scan_empty ( Some ( name) , & schema, None ) ?. build ( )
706+ }
662707}
0 commit comments