@@ -83,8 +83,11 @@ fn materialize_topk_under_limit_sort(
8383 sort_input : & Arc < LogicalPlan > ,
8484) -> Result < Option < LogicalPlan > , DataFusionError > {
8585 let projection = extract_projections_and_havings ( & sort_input) ?;
86+ let Some ( projection) = projection else {
87+ return Ok ( None ) ;
88+ } ;
8689
87- let aggregate = projection. as_ref ( ) . map ( |p| p . input ) . unwrap_or ( sort_input ) ;
90+ let aggregate: & Arc < LogicalPlan > = projection. input ;
8891 match aggregate. as_ref ( ) {
8992 LogicalPlan :: Aggregate ( Aggregate {
9093 input : cluster_send,
@@ -109,7 +112,7 @@ fn materialize_topk_under_limit_sort(
109112 group_expr. len ( ) ,
110113 & sort_expr,
111114 sort_input. schema ( ) ,
112- projection. as_ref ( ) . map ( |c| c . input_columns . as_slice ( ) ) ,
115+ projection. input_columns . as_slice ( ) ,
113116 ) ? {
114117 sort_columns = sc;
115118 } else {
@@ -130,17 +133,13 @@ fn materialize_topk_under_limit_sort(
130133 group_expr : group_expr. clone ( ) ,
131134 aggregate_expr : aggr_expr. clone ( ) ,
132135 order_by : sort_columns,
133- having_expr : projection
134- . as_ref ( )
135- . map_or ( None , |p| p. having_expr . clone ( ) ) ,
136+ having_expr : projection. having_expr . clone ( ) ,
136137 schema : aggregate_schema. clone ( ) ,
137138 snapshots : cs. snapshots . clone ( ) ,
138139 } ) ,
139140 } ) ;
140- // TODO upgrade DF: Projection should always be Some, now. We are also in some
141- // cases (such as when there was no projection node) creating a spurious
142- // projection.
143- if let Some ( p) = projection {
141+ if projection. has_projection {
142+ let p = projection;
144143 let out_schema = p. schema ;
145144 let mut expr = Vec :: with_capacity ( p. input_columns . len ( ) ) ;
146145 for out_i in 0 ..p. input_columns . len ( ) {
@@ -274,6 +273,9 @@ struct ColumnProjection<'a> {
274273 post_projection : Vec < Expr > ,
275274 // Defined on `input` schema
276275 having_expr : Option < Expr > ,
276+ // True if there is some sort of projection seen.
277+ has_projection : bool ,
278+
277279}
278280
279281fn extract_projections_and_havings (
@@ -372,6 +374,7 @@ fn extract_projections_and_havings(
372374 schema,
373375 post_projection : new_post_projection,
374376 having_expr : inner_column_projection. having_expr ,
377+ has_projection : true ,
375378 } ;
376379
377380 return Ok ( Some ( column_projection) ) ;
@@ -423,6 +426,7 @@ fn extract_projections_and_havings(
423426 transformed_predicate
424427 } ,
425428 ) ,
429+ has_projection : inner_column_projection. has_projection ,
426430 } ;
427431
428432 return Ok ( Some ( column_projection) ) ;
@@ -444,6 +448,7 @@ fn extract_projections_and_havings(
444448 schema : in_schema,
445449 post_projection,
446450 having_expr : None ,
451+ has_projection : false ,
447452 } ;
448453 return Ok ( Some ( column_projection) ) ;
449454 }
@@ -454,7 +459,7 @@ fn extract_sort_columns(
454459 group_key_len : usize ,
455460 sort_expr : & [ SortExpr ] ,
456461 schema : & DFSchema ,
457- projection : Option < & [ usize ] > ,
462+ projection : & [ usize ] ,
458463) -> Result < Option < Vec < SortColumn > > , DataFusionError > {
459464 let mut sort_columns = Vec :: with_capacity ( sort_expr. len ( ) ) ;
460465 for e in sort_expr {
@@ -466,9 +471,7 @@ fn extract_sort_columns(
466471 match expr {
467472 Expr :: Column ( c) => {
468473 let mut index = field_index ( schema, c. relation . as_ref ( ) , & c. name ) ?;
469- if let Some ( p) = projection {
470- index = p[ index] ;
471- }
474+ index = projection[ index] ;
472475 if index < group_key_len {
473476 return Ok ( None ) ;
474477 }
0 commit comments