@@ -38,7 +38,8 @@ use datafusion_expr::utils::{
3838 conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
3939} ;
4040use datafusion_expr:: {
41- BinaryExpr , Expr , Filter , Operator , Projection , TableProviderFilterPushDown , and, or,
41+ BinaryExpr , Expr , Filter , LogicalPlanBuilder , Operator , Projection ,
42+ TableProviderFilterPushDown , and, or,
4243} ;
4344
4445use crate :: optimizer:: ApplyOrder ;
@@ -1128,7 +1129,7 @@ impl OptimizerRule for PushDownFilter {
11281129 LogicalPlan :: TableScan ( scan) => {
11291130 let filter_predicates: Vec < _ > = split_conjunction ( & filter. predicate )
11301131 . into_iter ( )
1131- // Add already pushed filters.
1132+ // Add already pushed filters to ensure that the rule is idempotent .
11321133 . chain ( scan. filters . iter ( ) )
11331134 . unique ( )
11341135 . collect ( ) ;
@@ -1162,26 +1163,106 @@ impl OptimizerRule for PushDownFilter {
11621163 let new_scan_filters: Vec < Expr > =
11631164 new_scan_filters. unique ( ) . cloned ( ) . collect ( ) ;
11641165
1166+ let source_schema = scan. source . schema ( ) ;
1167+ let mut additional_projection = HashSet :: new ( ) ;
1168+
11651169 // Compose predicates to be of `Unsupported` or `Inexact` pushdown type, and also include volatile filters
11661170 let new_predicate: Vec < Expr > = zip
1167- . filter ( |( _, res) | res != & TableProviderFilterPushDown :: Exact )
1171+ . filter ( |( expr, res) | {
1172+ if * res == TableProviderFilterPushDown :: Exact {
1173+ return false ;
1174+ }
1175+ // For each not exactly supported filter we must ensure that all columns are projected,
1176+ // so we collect all columns which are not currently projected.
1177+ expr. apply ( |expr| {
1178+ if let Expr :: Column ( column) = expr
1179+ && let Ok ( idx) = source_schema. index_of ( column. name ( ) )
1180+ && scan
1181+ . projection
1182+ . as_ref ( )
1183+ . is_some_and ( |p| !p. contains ( & idx) )
1184+ {
1185+ additional_projection. insert ( idx) ;
1186+ }
1187+ Ok ( TreeNodeRecursion :: Continue )
1188+ } )
1189+ . unwrap ( ) ;
1190+ true
1191+ } )
11681192 . map ( |( pred, _) | pred)
11691193 . chain ( volatile_filters)
11701194 . cloned ( )
11711195 . collect ( ) ;
11721196
1173- let new_scan = LogicalPlan :: TableScan ( TableScan {
1174- filters : new_scan_filters,
1175- ..scan
1176- } ) ;
1177-
1178- Transformed :: yes ( new_scan) . transform_data ( |new_scan| {
1179- if let Some ( predicate) = conjunction ( new_predicate) {
1180- make_filter ( predicate, Arc :: new ( new_scan) ) . map ( Transformed :: yes)
1197+ // Wraps with a filter if some filters are not supported exactly.
1198+ let filtered = move |plan| {
1199+ if let Some ( new_predicate) = conjunction ( new_predicate) {
1200+ Filter :: try_new ( new_predicate, Arc :: new ( plan) )
1201+ . map ( LogicalPlan :: Filter )
11811202 } else {
1182- Ok ( Transformed :: no ( new_scan ) )
1203+ Ok ( plan )
11831204 }
1184- } )
1205+ } ;
1206+
1207+ if additional_projection. is_empty ( ) {
1208+ // No additional projection is required.
1209+ let new_scan = LogicalPlan :: TableScan ( TableScan {
1210+ filters : new_scan_filters,
1211+ ..scan
1212+ } ) ;
1213+ return filtered ( new_scan) . map ( Transformed :: yes) ;
1214+ }
1215+
1216+ let scan_table_name = & scan. table_name ;
1217+ let new_scan = filtered (
1218+ LogicalPlanBuilder :: scan_with_filters_fetch (
1219+ scan_table_name. clone ( ) ,
1220+ Arc :: clone ( & scan. source ) ,
1221+ scan. projection . clone ( ) . map ( |mut projection| {
1222+ // Extend a projection.
1223+ projection. extend ( additional_projection) ;
1224+ projection
1225+ } ) ,
1226+ new_scan_filters,
1227+ scan. fetch ,
1228+ ) ?
1229+ . build ( ) ?,
1230+ ) ?;
1231+
1232+ // Project fields required by the initial projection.
1233+ let new_plan = LogicalPlan :: Projection ( Projection :: try_new_with_schema (
1234+ scan. projection
1235+ . as_ref ( )
1236+ . map ( |projection| {
1237+ projection
1238+ . iter ( )
1239+ . cloned ( )
1240+ . map ( |idx| {
1241+ Expr :: Column ( Column :: new (
1242+ Some ( scan_table_name. clone ( ) ) ,
1243+ source_schema. field ( idx) . name ( ) ,
1244+ ) )
1245+ } )
1246+ . collect ( )
1247+ } )
1248+ . unwrap_or_else ( || {
1249+ source_schema
1250+ . fields ( )
1251+ . iter ( )
1252+ . map ( |field| {
1253+ Expr :: Column ( Column :: new (
1254+ Some ( scan_table_name. clone ( ) ) ,
1255+ field. name ( ) ,
1256+ ) )
1257+ } )
1258+ . collect ( )
1259+ } ) ,
1260+ Arc :: new ( new_scan) ,
1261+ // Preserve a projected schema metadata.
1262+ scan. projected_schema ,
1263+ ) ?) ;
1264+
1265+ Ok ( Transformed :: yes ( new_plan) )
11851266 }
11861267 LogicalPlan :: Extension ( extension_plan) => {
11871268 // This check prevents the Filter from being removed when the extension node has no children,
@@ -3207,7 +3288,7 @@ mod tests {
32073288 let plan = table_scan_with_pushdown_provider_builder (
32083289 TableProviderFilterPushDown :: Inexact ,
32093290 vec ! [ col( "a" ) . eq( lit( 10i64 ) ) , col( "b" ) . gt( lit( 11i64 ) ) ] ,
3210- Some ( vec ! [ 0 ] ) ,
3291+ Some ( vec ! [ 0 , 1 ] ) ,
32113292 ) ?
32123293 . filter ( and ( col ( "a" ) . eq ( lit ( 10i64 ) ) , col ( "b" ) . gt ( lit ( 11i64 ) ) ) ) ?
32133294 . project ( vec ! [ col( "a" ) , col( "b" ) ] ) ?
@@ -3218,7 +3299,7 @@ mod tests {
32183299 @r"
32193300 Projection: a, b
32203301 Filter: a = Int64(10) AND b > Int64(11)
3221- TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]
3302+ TableScan: test projection=[a, b ], partial_filters=[a = Int64(10), b > Int64(11)]
32223303 "
32233304 )
32243305 }
@@ -3228,7 +3309,7 @@ mod tests {
32283309 let plan = table_scan_with_pushdown_provider_builder (
32293310 TableProviderFilterPushDown :: Exact ,
32303311 vec ! [ ] ,
3231- Some ( vec ! [ 0 ] ) ,
3312+ Some ( vec ! [ 0 , 1 ] ) ,
32323313 ) ?
32333314 . filter ( and ( col ( "a" ) . eq ( lit ( 10i64 ) ) , col ( "b" ) . gt ( lit ( 11i64 ) ) ) ) ?
32343315 . project ( vec ! [ col( "a" ) , col( "b" ) ] ) ?
@@ -3238,7 +3319,7 @@ mod tests {
32383319 plan,
32393320 @r"
32403321 Projection: a, b
3241- TableScan: test projection=[a], full_filters=[a = Int64(10), b > Int64(11)]
3322+ TableScan: test projection=[a, b ], full_filters=[a = Int64(10), b > Int64(11)]
32423323 "
32433324 )
32443325 }
@@ -4294,4 +4375,34 @@ mod tests {
42944375 "
42954376 )
42964377 }
4378+
4379+ #[ test]
4380+ fn test_projection_is_updated_when_filter_becomes_unsupported ( ) -> Result < ( ) > {
4381+ let test_provider = PushDownProvider {
4382+ filter_support : TableProviderFilterPushDown :: Unsupported ,
4383+ } ;
4384+
4385+ let projeted_schema = test_provider. schema ( ) . project ( & [ 0 ] ) ?;
4386+ let table_scan = LogicalPlan :: TableScan ( TableScan {
4387+ table_name : "test" . into ( ) ,
4388+ // Emulate that there were pushed filters but now
4389+ // provider cannot support it.
4390+ filters : vec ! [ col( "b" ) . eq( lit( 1i64 ) ) ] ,
4391+ projected_schema : Arc :: new ( DFSchema :: try_from ( projeted_schema) ?) ,
4392+ projection : Some ( vec ! [ 0 ] ) ,
4393+ source : Arc :: new ( test_provider) ,
4394+ fetch : None ,
4395+ } ) ;
4396+
4397+ let plan = LogicalPlanBuilder :: from ( table_scan)
4398+ . filter ( col ( "a" ) . eq ( lit ( 1i64 ) ) ) ?
4399+ . build ( ) ?;
4400+
4401+ assert_optimized_plan_equal ! ( plan,
4402+ @r"
4403+ Projection: test.a
4404+ Filter: a = Int64(1) AND b = Int64(1)
4405+ TableScan: test projection=[a, b]"
4406+ )
4407+ }
42974408}
0 commit comments