@@ -244,6 +244,7 @@ impl NormalizationRule for PushPredicateIntoScan {
244244 meta,
245245 range,
246246 covered_deserializers,
247+ cover_mapping,
247248 } in & mut scan_op. index_infos
248249 {
249250 if range. is_some ( ) {
@@ -265,26 +266,37 @@ impl NormalizationRule for PushPredicateIntoScan {
265266 }
266267 changed = true ;
267268
268- let mut deserializers = Vec :: with_capacity ( meta. column_ids . len ( ) ) ;
269- let mut cover_count = 0 ;
269+ * covered_deserializers = None ;
270+ * cover_mapping = None ;
271+
272+ // try index covered
273+ let mut mapping_slots = vec ! [ usize :: MAX ; scan_op. columns. len( ) ] ;
274+ let mut needs_mapping = false ;
270275 let index_column_types = match & meta. value_ty {
271276 LogicalType :: Tuple ( tys) => tys,
272277 ty => slice:: from_ref ( ty) ,
273278 } ;
274- for ( i, column_id) in meta. column_ids . iter ( ) . enumerate ( ) {
275- for column in scan_op. columns . values ( ) {
276- deserializers. push (
277- if column. id ( ) . map ( |id| id == * column_id) . unwrap_or ( false ) {
278- cover_count += 1 ;
279- column. datatype ( ) . serializable ( )
280- } else {
281- index_column_types[ i] . skip_serializable ( )
282- } ,
283- ) ;
279+ let mut deserializers = Vec :: with_capacity ( meta. column_ids . len ( ) ) ;
280+
281+ for ( idx, column_id) in meta. column_ids . iter ( ) . enumerate ( ) {
282+ if let Some ( ( scan_idx, column) ) =
283+ scan_op. columns . values ( ) . enumerate ( ) . find ( |( _, column) | {
284+ column. id ( ) . map ( |id| id == * column_id) . unwrap_or ( false )
285+ } )
286+ {
287+ mapping_slots[ scan_idx] = idx;
288+ needs_mapping |= scan_idx != idx;
289+ deserializers. push ( column. datatype ( ) . serializable ( ) ) ;
290+ } else {
291+ deserializers. push ( index_column_types[ idx] . skip_serializable ( ) ) ;
284292 }
285293 }
286- if cover_count == scan_op. columns . len ( ) {
294+
295+ if mapping_slots. iter ( ) . all ( |slot| * slot != usize:: MAX ) {
287296 * covered_deserializers = Some ( deserializers) ;
297+ if needs_mapping {
298+ * cover_mapping = Some ( mapping_slots) ;
299+ }
288300 }
289301 }
290302 return Ok ( changed) ;
@@ -354,17 +366,24 @@ impl PushPredicateIntoScan {
354366#[ cfg( all( test, not( target_arch = "wasm32" ) ) ) ]
355367mod tests {
356368 use crate :: binder:: test:: build_t1_table;
369+ use crate :: catalog:: { ColumnCatalog , ColumnDesc , ColumnRef , TableName } ;
357370 use crate :: errors:: DatabaseError ;
358371 use crate :: expression:: range_detacher:: Range ;
359372 use crate :: expression:: { BinaryOperator , ScalarExpression } ;
360373 use crate :: optimizer:: heuristic:: batch:: HepBatchStrategy ;
361374 use crate :: optimizer:: heuristic:: optimizer:: HepOptimizer ;
362375 use crate :: optimizer:: rule:: normalization:: NormalizationRuleImpl ;
376+ use crate :: planner:: operator:: filter:: FilterOperator ;
377+ use crate :: planner:: operator:: table_scan:: TableScanOperator ;
363378 use crate :: planner:: operator:: Operator ;
379+ use crate :: planner:: { Childrens , LogicalPlan } ;
364380 use crate :: storage:: rocksdb:: RocksTransaction ;
381+ use crate :: types:: index:: { IndexInfo , IndexMeta , IndexType } ;
365382 use crate :: types:: value:: DataValue ;
366383 use crate :: types:: LogicalType ;
367- use std:: collections:: Bound ;
384+ use std:: collections:: { BTreeMap , Bound } ;
385+ use std:: sync:: Arc ;
386+ use ulid:: Ulid ;
368387
369388 #[ test]
370389 fn test_push_predicate_into_scan ( ) -> Result < ( ) , DatabaseError > {
@@ -400,6 +419,171 @@ mod tests {
400419 Ok ( ( ) )
401420 }
402421
422+ #[ test]
423+ fn test_cover_mapping_matches_scan_order ( ) -> Result < ( ) , DatabaseError > {
424+ let table_name: TableName = Arc :: from ( "mock_table" ) ;
425+ let c1_id = Ulid :: new ( ) ;
426+ let c2_id = Ulid :: new ( ) ;
427+ let c3_id = Ulid :: new ( ) ;
428+
429+ let mut c1 = ColumnCatalog :: new (
430+ "c1" . to_string ( ) ,
431+ false ,
432+ ColumnDesc :: new ( LogicalType :: Integer , Some ( 0 ) , false , None ) ?,
433+ ) ;
434+ c1. set_ref_table ( table_name. clone ( ) , c1_id, false ) ;
435+ let c1_ref = ColumnRef :: from ( c1. clone ( ) ) ;
436+
437+ let mut c2 = ColumnCatalog :: new (
438+ "c2" . to_string ( ) ,
439+ false ,
440+ ColumnDesc :: new ( LogicalType :: Integer , None , false , None ) ?,
441+ ) ;
442+ c2. set_ref_table ( table_name. clone ( ) , c2_id, false ) ;
443+ let c2_ref = ColumnRef :: from ( c2. clone ( ) ) ;
444+
445+ let mut c3 = ColumnCatalog :: new (
446+ "c3" . to_string ( ) ,
447+ false ,
448+ ColumnDesc :: new ( LogicalType :: Integer , None , false , None ) ?,
449+ ) ;
450+ c3. set_ref_table ( table_name. clone ( ) , c3_id, false ) ;
451+
452+ let mut columns = BTreeMap :: new ( ) ;
453+ columns. insert ( 0 , c1_ref. clone ( ) ) ;
454+ columns. insert ( 1 , c2_ref. clone ( ) ) ;
455+
456+ let index_meta_reordered = Arc :: new ( IndexMeta {
457+ id : 0 ,
458+ column_ids : vec ! [ c2_id, c3_id, c1_id] ,
459+ table_name : table_name. clone ( ) ,
460+ pk_ty : LogicalType :: Integer ,
461+ value_ty : LogicalType :: Tuple ( vec ! [
462+ LogicalType :: Integer ,
463+ LogicalType :: Integer ,
464+ LogicalType :: Integer ,
465+ ] ) ,
466+ name : "idx_c2_c3_c1" . to_string ( ) ,
467+ ty : IndexType :: Composite ,
468+ } ) ;
469+ let index_meta_aligned = Arc :: new ( IndexMeta {
470+ id : 1 ,
471+ column_ids : vec ! [ c1_id, c2_id] ,
472+ table_name : table_name. clone ( ) ,
473+ pk_ty : LogicalType :: Integer ,
474+ value_ty : LogicalType :: Tuple ( vec ! [ LogicalType :: Integer , LogicalType :: Integer ] ) ,
475+ name : "idx_c1_c2" . to_string ( ) ,
476+ ty : IndexType :: Composite ,
477+ } ) ;
478+
479+ let scan_plan = LogicalPlan :: new (
480+ Operator :: TableScan ( TableScanOperator {
481+ table_name : table_name. clone ( ) ,
482+ primary_keys : vec ! [ c1_id] ,
483+ columns,
484+ limit : ( None , None ) ,
485+ index_infos : vec ! [
486+ IndexInfo {
487+ meta: index_meta_reordered,
488+ range: None ,
489+ covered_deserializers: None ,
490+ cover_mapping: None ,
491+ } ,
492+ IndexInfo {
493+ meta: index_meta_aligned,
494+ range: None ,
495+ covered_deserializers: None ,
496+ cover_mapping: None ,
497+ } ,
498+ ] ,
499+ with_pk : false ,
500+ } ) ,
501+ Childrens :: None ,
502+ ) ;
503+
504+ let c1_gt = ScalarExpression :: Binary {
505+ op : BinaryOperator :: Gt ,
506+ left_expr : Box :: new ( ScalarExpression :: column_expr ( c1_ref. clone ( ) ) ) ,
507+ right_expr : Box :: new ( ScalarExpression :: Constant ( DataValue :: Int32 ( 0 ) ) ) ,
508+ evaluator : None ,
509+ ty : LogicalType :: Boolean ,
510+ } ;
511+ let c2_gt = ScalarExpression :: Binary {
512+ op : BinaryOperator :: Gt ,
513+ left_expr : Box :: new ( ScalarExpression :: column_expr ( c2_ref. clone ( ) ) ) ,
514+ right_expr : Box :: new ( ScalarExpression :: Constant ( DataValue :: Int32 ( 0 ) ) ) ,
515+ evaluator : None ,
516+ ty : LogicalType :: Boolean ,
517+ } ;
518+ let predicate = ScalarExpression :: Binary {
519+ op : BinaryOperator :: And ,
520+ left_expr : Box :: new ( c1_gt) ,
521+ right_expr : Box :: new ( c2_gt) ,
522+ evaluator : None ,
523+ ty : LogicalType :: Boolean ,
524+ } ;
525+
526+ let filter_plan = LogicalPlan :: new (
527+ Operator :: Filter ( FilterOperator {
528+ predicate,
529+ is_optimized : false ,
530+ having : false ,
531+ } ) ,
532+ Childrens :: Only ( Box :: new ( scan_plan) ) ,
533+ ) ;
534+
535+ let best_plan = HepOptimizer :: new ( filter_plan)
536+ . batch (
537+ "push_cover_mapping" . to_string ( ) ,
538+ HepBatchStrategy :: once_topdown ( ) ,
539+ vec ! [ NormalizationRuleImpl :: PushPredicateIntoScan ] ,
540+ )
541+ . find_best :: < RocksTransaction > ( None ) ?;
542+
543+ let table_scan = best_plan. childrens . pop_only ( ) ;
544+ if let Operator :: TableScan ( op) = & table_scan. operator {
545+ let index_infos = & op. index_infos ;
546+ assert_eq ! ( index_infos. len( ) , 2 ) ;
547+
548+ // verify the first index (reordered scan columns) still uses mapping
549+ let reordered_index = & index_infos[ 0 ] ;
550+ let deserializers = reordered_index
551+ . covered_deserializers
552+ . as_ref ( )
553+ . expect ( "expected covering deserializers" ) ;
554+ assert_eq ! ( deserializers. len( ) , 3 ) ;
555+ assert_eq ! (
556+ deserializers[ 0 ] ,
557+ c2_ref. datatype( ) . serializable( ) ,
558+ "first serializer should align with c2"
559+ ) ;
560+ assert_eq ! (
561+ deserializers[ 1 ] ,
562+ c3. datatype( ) . skip_serializable( ) ,
563+ "non-projected index column should be skipped"
564+ ) ;
565+ assert_eq ! (
566+ deserializers[ 2 ] ,
567+ c1_ref. datatype( ) . serializable( ) ,
568+ "last serializer should align with c1"
569+ ) ;
570+ let mapping = reordered_index. cover_mapping . as_ref ( ) . map ( |m| m. as_slice ( ) ) ;
571+ assert_eq ! ( mapping, Some ( & [ 2 , 0 ] [ ..] ) ) ;
572+
573+ // verify the second index matches scan order exactly so mapping is omitted
574+ let ordered_index = & index_infos[ 1 ] ;
575+ assert ! ( ordered_index. covered_deserializers. is_some( ) ) ;
576+ assert ! (
577+ ordered_index. cover_mapping. is_none( ) ,
578+ "mapping should be None when index/scan order already match"
579+ ) ;
580+ } else {
581+ unreachable ! ( "expected table scan" ) ;
582+ }
583+
584+ Ok ( ( ) )
585+ }
586+
403587 #[ test]
404588 fn test_push_predicate_through_join_in_left_join ( ) -> Result < ( ) , DatabaseError > {
405589 let table_state = build_t1_table ( ) ?;
0 commit comments