Skip to content

Commit 6697447

Browse files
committed
extend a scan projection if some pushed filters become unsupported
Consider the following optimizer-run scenario: 1. `supports_filters_pushdown` returns `Exact` on some filter, e.g. "a = 1", where the column "a" is not required by the query projection. 2. "a" is removed from the table provider projection by "optimize projection" rule. 3. `supports_filters_pushdown` changes a decision and returns `Inexact` on this filter the next time. e.g., input filters are changed and it prefers to use a new one. 4. "a" is not returned to the table provider projection which leads to filter that references a column which is not a part of the input schema. This patch fixes issue introducing the following logic within a filter push-down rule: 1. Collect columns that are not used in the current table provider scan projection, but required for filter expressions. Call it `additional_projection`. 2. If `additional_projection` is empty -- leave logic as is prior the patch. 3. Otherwise extend a table provider projection and wrap a plan with an additional projection node to preserve schema used prior to the rule.
1 parent 556a3ee commit 6697447

File tree

1 file changed

+128
-17
lines changed

1 file changed

+128
-17
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 128 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ use datafusion_expr::utils::{
3838
conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
3939
};
4040
use datafusion_expr::{
41-
BinaryExpr, Expr, Filter, Operator, Projection, TableProviderFilterPushDown, and, or,
41+
BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, Projection,
42+
TableProviderFilterPushDown, and, or,
4243
};
4344

4445
use crate::optimizer::ApplyOrder;
@@ -1128,7 +1129,7 @@ impl OptimizerRule for PushDownFilter {
11281129
LogicalPlan::TableScan(scan) => {
11291130
let filter_predicates: Vec<_> = split_conjunction(&filter.predicate)
11301131
.into_iter()
1131-
// Add already pushed filters.
1132+
// Add already pushed filters to ensure that the rule is idempotent.
11321133
.chain(scan.filters.iter())
11331134
.unique()
11341135
.collect();
@@ -1162,26 +1163,106 @@ impl OptimizerRule for PushDownFilter {
11621163
let new_scan_filters: Vec<Expr> =
11631164
new_scan_filters.unique().cloned().collect();
11641165

1166+
let source_schema = scan.source.schema();
1167+
let mut additional_projection = HashSet::new();
1168+
11651169
// Compose predicates to be of `Unsupported` or `Inexact` pushdown type, and also include volatile filters
11661170
let new_predicate: Vec<Expr> = zip
1167-
.filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
1171+
.filter(|(expr, res)| {
1172+
if *res == TableProviderFilterPushDown::Exact {
1173+
return false;
1174+
}
1175+
// For each not exactly supported filter we must ensure that all columns are projected,
1176+
// so we collect all columns which are not currently projected.
1177+
expr.apply(|expr| {
1178+
if let Expr::Column(column) = expr
1179+
&& let Ok(idx) = source_schema.index_of(column.name())
1180+
&& scan
1181+
.projection
1182+
.as_ref()
1183+
.is_some_and(|p| !p.contains(&idx))
1184+
{
1185+
additional_projection.insert(idx);
1186+
}
1187+
Ok(TreeNodeRecursion::Continue)
1188+
})
1189+
.unwrap();
1190+
true
1191+
})
11681192
.map(|(pred, _)| pred)
11691193
.chain(volatile_filters)
11701194
.cloned()
11711195
.collect();
11721196

1173-
let new_scan = LogicalPlan::TableScan(TableScan {
1174-
filters: new_scan_filters,
1175-
..scan
1176-
});
1177-
1178-
Transformed::yes(new_scan).transform_data(|new_scan| {
1179-
if let Some(predicate) = conjunction(new_predicate) {
1180-
make_filter(predicate, Arc::new(new_scan)).map(Transformed::yes)
1197+
// Wraps with a filter if some filters are not supported exactly.
1198+
let filtered = move |plan| {
1199+
if let Some(new_predicate) = conjunction(new_predicate) {
1200+
Filter::try_new(new_predicate, Arc::new(plan))
1201+
.map(LogicalPlan::Filter)
11811202
} else {
1182-
Ok(Transformed::no(new_scan))
1203+
Ok(plan)
11831204
}
1184-
})
1205+
};
1206+
1207+
if additional_projection.is_empty() {
1208+
// No additional projection is required.
1209+
let new_scan = LogicalPlan::TableScan(TableScan {
1210+
filters: new_scan_filters,
1211+
..scan
1212+
});
1213+
return filtered(new_scan).map(Transformed::yes);
1214+
}
1215+
1216+
let scan_table_name = &scan.table_name;
1217+
let new_scan = filtered(
1218+
LogicalPlanBuilder::scan_with_filters_fetch(
1219+
scan_table_name.clone(),
1220+
Arc::clone(&scan.source),
1221+
scan.projection.clone().map(|mut projection| {
1222+
// Extend a projection.
1223+
projection.extend(additional_projection);
1224+
projection
1225+
}),
1226+
new_scan_filters,
1227+
scan.fetch,
1228+
)?
1229+
.build()?,
1230+
)?;
1231+
1232+
// Project fields required by the initial projection.
1233+
let new_plan = LogicalPlan::Projection(Projection::try_new_with_schema(
1234+
scan.projection
1235+
.as_ref()
1236+
.map(|projection| {
1237+
projection
1238+
.iter()
1239+
.cloned()
1240+
.map(|idx| {
1241+
Expr::Column(Column::new(
1242+
Some(scan_table_name.clone()),
1243+
source_schema.field(idx).name(),
1244+
))
1245+
})
1246+
.collect()
1247+
})
1248+
.unwrap_or_else(|| {
1249+
source_schema
1250+
.fields()
1251+
.iter()
1252+
.map(|field| {
1253+
Expr::Column(Column::new(
1254+
Some(scan_table_name.clone()),
1255+
field.name(),
1256+
))
1257+
})
1258+
.collect()
1259+
}),
1260+
Arc::new(new_scan),
1261+
// Preserve a projected schema metadata.
1262+
scan.projected_schema,
1263+
)?);
1264+
1265+
Ok(Transformed::yes(new_plan))
11851266
}
11861267
LogicalPlan::Extension(extension_plan) => {
11871268
// This check prevents the Filter from being removed when the extension node has no children,
@@ -3207,7 +3288,7 @@ mod tests {
32073288
let plan = table_scan_with_pushdown_provider_builder(
32083289
TableProviderFilterPushDown::Inexact,
32093290
vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))],
3210-
Some(vec![0]),
3291+
Some(vec![0, 1]),
32113292
)?
32123293
.filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))?
32133294
.project(vec![col("a"), col("b")])?
@@ -3218,7 +3299,7 @@ mod tests {
32183299
@r"
32193300
Projection: a, b
32203301
Filter: a = Int64(10) AND b > Int64(11)
3221-
TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]
3302+
TableScan: test projection=[a, b], partial_filters=[a = Int64(10), b > Int64(11)]
32223303
"
32233304
)
32243305
}
@@ -3228,7 +3309,7 @@ mod tests {
32283309
let plan = table_scan_with_pushdown_provider_builder(
32293310
TableProviderFilterPushDown::Exact,
32303311
vec![],
3231-
Some(vec![0]),
3312+
Some(vec![0, 1]),
32323313
)?
32333314
.filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))?
32343315
.project(vec![col("a"), col("b")])?
@@ -3238,7 +3319,7 @@ mod tests {
32383319
plan,
32393320
@r"
32403321
Projection: a, b
3241-
TableScan: test projection=[a], full_filters=[a = Int64(10), b > Int64(11)]
3322+
TableScan: test projection=[a, b], full_filters=[a = Int64(10), b > Int64(11)]
32423323
"
32433324
)
32443325
}
@@ -4294,4 +4375,34 @@ mod tests {
42944375
"
42954376
)
42964377
}
4378+
4379+
#[test]
4380+
fn test_projection_is_updated_when_filter_becomes_unsupported() -> Result<()> {
4381+
let test_provider = PushDownProvider {
4382+
filter_support: TableProviderFilterPushDown::Unsupported,
4383+
};
4384+
4385+
let projected_schema = test_provider.schema().project(&[0])?;
4386+
let table_scan = LogicalPlan::TableScan(TableScan {
4387+
table_name: "test".into(),
4388+
// Emulate that there were pushed filters but now
4389+
// provider cannot support it.
4390+
filters: vec![col("b").eq(lit(1i64))],
4391+
projected_schema: Arc::new(DFSchema::try_from(projected_schema)?),
4392+
projection: Some(vec![0]),
4393+
source: Arc::new(test_provider),
4394+
fetch: None,
4395+
});
4396+
4397+
let plan = LogicalPlanBuilder::from(table_scan)
4398+
.filter(col("a").eq(lit(1i64)))?
4399+
.build()?;
4400+
4401+
assert_optimized_plan_equal!(plan,
4402+
@r"
4403+
Projection: test.a
4404+
Filter: a = Int64(1) AND b = Int64(1)
4405+
TableScan: test projection=[a, b]"
4406+
)
4407+
}
42974408
}

0 commit comments

Comments
 (0)