@@ -37,7 +37,7 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result<LogicalPlan> {
3737 LogicalPlan :: Limit { n, input } => {
3838 let schema: & Arc < DFSchema > = input. schema ( ) ;
3939
40- let lift_up_result = lift_up_expensive_projections ( input, HashSet :: new ( ) ) ;
40+ let lift_up_result = lift_up_expensive_projections ( input, ColumnRecorder :: default ( ) ) ;
4141 pal_debug ! ( "lift_up_res: {:?}" , lift_up_result) ;
4242 match lift_up_result {
4343 Ok ( ( inner_plan, None ) ) => Ok ( LogicalPlan :: Limit {
@@ -107,15 +107,38 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result<LogicalPlan> {
107107 }
108108}
109109
110- struct ColumnRecorder {
111- columns : HashSet < Column > ,
110+ /// A `Vec<Column>` -- or, when we don't need that, a `()`.
111+ trait ColumnCollector {
112+ fn push ( & mut self , column : & Column ) ;
112113}
113114
114- impl ExpressionVisitor for ColumnRecorder {
115+ impl ColumnCollector for ( ) {
116+ fn push ( & mut self , _column : & Column ) { }
117+ }
118+
119+ impl ColumnCollector for Vec < Column > {
120+ fn push ( & mut self , column : & Column ) {
121+ self . push ( column. clone ( ) ) ;
122+ }
123+ }
124+
125+ #[ derive( Default ) ]
126+ struct ColumnRecorder < T : ColumnCollector > {
127+ column_hash : HashSet < Column > ,
128+ /// The purpose is to store a `Vec<Column>` in the order that the columns were seen, so that
129+ /// the intermediate projection layer looks "natural" instead of having columns in some sorted
130+ /// order or nondeterministic hash table ordering.
131+ collector : T ,
132+ }
133+
134+ impl < T : ColumnCollector > ExpressionVisitor for ColumnRecorder < T > {
115135 fn pre_visit ( mut self , expr : & Expr ) -> Result < Recursion < Self > > {
116136 match expr {
117137 Expr :: Column ( c) => {
118- self . columns . insert ( c. clone ( ) ) ;
138+ let inserted = self . column_hash . insert ( c. clone ( ) ) ;
139+ if inserted {
140+ self . collector . push ( c) ;
141+ }
119142 }
120143 Expr :: ScalarVariable ( _var_names) => {
121144 // expr_to_columns, with its ColumnNameVisitor includes ScalarVariable for some
@@ -180,21 +203,16 @@ fn looks_expensive(ex: &Expr) -> Result<bool> {
180203
181204fn lift_up_expensive_projections (
182205 plan : & LogicalPlan ,
183- used_columns : HashSet < Column > ,
206+ used_columns : ColumnRecorder < ( ) > ,
184207) -> Result < ( LogicalPlan , Option < Vec < Expr > > ) > {
185208 match plan {
186209 LogicalPlan :: Sort { expr, input } => {
187- let mut recorder = ColumnRecorder {
188- columns : used_columns,
189- } ;
210+ let mut recorder = used_columns;
190211 for ex in expr {
191212 recorder = ex. accept ( recorder) ?;
192213 }
193214
194- let used_columns = recorder. columns ;
195-
196- let ( new_input, lifted_projection) =
197- lift_up_expensive_projections ( & input, used_columns) ?;
215+ let ( new_input, lifted_projection) = lift_up_expensive_projections ( & input, recorder) ?;
198216 pal_debug ! (
199217 "Sort sees result:\n {:?};;;{:?};;;" ,
200218 new_input,
@@ -213,8 +231,9 @@ fn lift_up_expensive_projections(
213231 input,
214232 schema,
215233 } => {
216- let mut column_recorder = ColumnRecorder {
217- columns : HashSet :: new ( ) ,
234+ let mut column_recorder = ColumnRecorder :: < Vec < Column > > {
235+ column_hash : HashSet :: new ( ) ,
236+ collector : Vec :: new ( ) ,
218237 } ;
219238
220239 let mut this_projection_exprs = Vec :: < usize > :: new ( ) ;
@@ -241,7 +260,7 @@ fn lift_up_expensive_projections(
241260 already_retained_cols. push ( ( col. clone ( ) , Some ( alias. clone ( ) ) ) ) ;
242261 }
243262
244- if used_columns. contains ( & field. qualified_column ( ) ) {
263+ if used_columns. column_hash . contains ( & field. qualified_column ( ) ) {
245264 pal_debug ! (
246265 "Expr {}: used_columns contains field {:?}" ,
247266 i,
@@ -296,7 +315,7 @@ fn lift_up_expensive_projections(
296315 let mut expensive_expr_column_replacements = Vec :: < ( Column , Column ) > :: new ( ) ;
297316
298317 let mut generated_col_number = 0 ;
299- let needed_columns = column_recorder. columns ;
318+ let needed_columns = column_recorder. collector ;
300319 ' outer: for col in needed_columns {
301320 pal_debug ! ( "Processing column {:?} in needed_columns" , col) ;
302321
@@ -510,6 +529,44 @@ mod tests {
510529 Ok ( ( ) )
511530 }
512531
532+ /// Tests that multiple columns are retained in a deterministic order (and as a nice-to-have,
533+ /// they should be in the left-to-right order of appearance).
534+ #[ test]
535+ fn limit_sorted_plan_with_expensive_expr_retaining_multiple_columns ( ) -> Result < ( ) > {
536+ let table_scan = test_table_scan_abcd ( ) ?;
537+
538+ let case_expr = when ( col ( "d" ) . eq ( lit ( 3 ) ) , col ( "c" ) + lit ( 2 ) ) . otherwise ( lit ( 5 ) ) ?;
539+
540+ let plan = LogicalPlanBuilder :: from ( table_scan)
541+ . project ( [
542+ col ( "a" ) . alias ( "a1" ) ,
543+ col ( "b" ) . alias ( "b1" ) ,
544+ case_expr. alias ( "c1" ) ,
545+ ] ) ?
546+ . sort ( [ col ( "a1" ) . sort ( true , true ) ] ) ?
547+ . limit ( 50 ) ?
548+ . build ( ) ?;
549+
550+ let expected = "Limit: 50\
551+ \n Sort: #a1 ASC NULLS FIRST\
552+ \n Projection: #test.a AS a1, #test.b AS b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\
553+ \n TableScan: test projection=None";
554+
555+ let formatted = format ! ( "{:?}" , plan) ;
556+ assert_eq ! ( formatted, expected) ;
557+
558+ // We are testing that test.d deterministically comes before test.c in the inner Projection.
559+ let optimized_expected = "Projection: #a1, #b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\
560+ \n Limit: 50\
561+ \n Sort: #a1 ASC NULLS FIRST\
562+ \n Projection: #test.a AS a1, #test.b AS b1, #test.d, #test.c\
563+ \n TableScan: test projection=None";
564+
565+ assert_optimized_plan_eq ( & plan, optimized_expected) ;
566+
567+ Ok ( ( ) )
568+ }
569+
513570 /// Tests that we re-alias fields in the lifted up projection.
514571 #[ test]
515572 fn limit_sorted_plan_with_nonaliased_expensive_expr_optimized ( ) -> Result < ( ) > {
@@ -659,4 +716,15 @@ mod tests {
659716 pub fn test_table_scan ( ) -> Result < LogicalPlan > {
660717 test_table_scan_with_name ( "test" )
661718 }
719+
720+ pub fn test_table_scan_abcd ( ) -> Result < LogicalPlan > {
721+ let name = "test" ;
722+ let schema = Schema :: new ( vec ! [
723+ Field :: new( "a" , DataType :: UInt32 , false ) ,
724+ Field :: new( "b" , DataType :: UInt32 , false ) ,
725+ Field :: new( "c" , DataType :: UInt32 , false ) ,
726+ Field :: new( "d" , DataType :: UInt32 , false ) ,
727+ ] ) ;
728+ LogicalPlanBuilder :: scan_empty ( Some ( name) , & schema, None ) ?. build ( )
729+ }
662730}
0 commit comments