Skip to content

Commit 12baee1

Browse files
committed
extend a scan projection if some pushed filters become unsupported
Consider the following optimizer-run scenario: 1. `supports_filters_pushdown` returns `Exact` on some filter, e.g. "a = 1", where the column "a" is not required by the query projection. 2. "a" is removed from the table provider projection by "optimize projection" rule. 3. `supports_filters_pushdown` changes a decision and returns `Inexact` on this filter the next time. e.g., input filters are changed and it prefers to use a new one. 4. "a" is not returned to the table provider projection which leads to filter that references a column which is not a part of the input schema. This patch fixes issue introducing the following logic within a filter push-down rule: 1. Collect columns that are not used in the current table provider scan projection, but required for filter expressions. Call it `additional_projection`. 2. If `additional_projection` is empty -- leave logic as is prior the patch. 3. Otherwise extend a table provider projection and wrap a plan with an additional projection node to preserve schema used prior to the rule.
1 parent 21266d6 commit 12baee1

File tree

3 files changed

+132
-21
lines changed

3 files changed

+132
-21
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 128 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ use datafusion_expr::utils::{
3838
conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
3939
};
4040
use datafusion_expr::{
41-
BinaryExpr, Expr, Filter, Operator, Projection, TableProviderFilterPushDown, and, or,
41+
BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, Projection,
42+
TableProviderFilterPushDown, and, or,
4243
};
4344

4445
use crate::optimizer::ApplyOrder;
@@ -1128,7 +1129,7 @@ impl OptimizerRule for PushDownFilter {
11281129
LogicalPlan::TableScan(scan) => {
11291130
let filter_predicates: Vec<_> = split_conjunction(&filter.predicate)
11301131
.into_iter()
1131-
// Add already pushed filters.
1132+
// Add already pushed filters to ensure that the rule is idempotent.
11321133
.chain(scan.filters.iter())
11331134
.unique()
11341135
.collect();
@@ -1162,26 +1163,106 @@ impl OptimizerRule for PushDownFilter {
11621163
let new_scan_filters: Vec<Expr> =
11631164
new_scan_filters.unique().cloned().collect();
11641165

1166+
let source_schema = scan.source.schema();
1167+
let mut additional_projection = HashSet::new();
1168+
11651169
// Compose predicates to be of `Unsupported` or `Inexact` pushdown type, and also include volatile filters
11661170
let new_predicate: Vec<Expr> = zip
1167-
.filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
1171+
.filter(|(expr, res)| {
1172+
if *res == TableProviderFilterPushDown::Exact {
1173+
return false;
1174+
}
1175+
// For each not exactly supported filter we must ensure that all columns are projected,
1176+
// so we collect all columns which are not currently projected.
1177+
expr.apply(|expr| {
1178+
if let Expr::Column(column) = expr
1179+
&& let Ok(idx) = source_schema.index_of(column.name())
1180+
&& scan
1181+
.projection
1182+
.as_ref()
1183+
.is_some_and(|p| !p.contains(&idx))
1184+
{
1185+
additional_projection.insert(idx);
1186+
}
1187+
Ok(TreeNodeRecursion::Continue)
1188+
})
1189+
.unwrap();
1190+
true
1191+
})
11681192
.map(|(pred, _)| pred)
11691193
.chain(volatile_filters)
11701194
.cloned()
11711195
.collect();
11721196

1173-
let new_scan = LogicalPlan::TableScan(TableScan {
1174-
filters: new_scan_filters,
1175-
..scan
1176-
});
1177-
1178-
Transformed::yes(new_scan).transform_data(|new_scan| {
1179-
if let Some(predicate) = conjunction(new_predicate) {
1180-
make_filter(predicate, Arc::new(new_scan)).map(Transformed::yes)
1197+
// Wraps with a filter if some filters are not supported exactly.
1198+
let filtered = move |plan| {
1199+
if let Some(new_predicate) = conjunction(new_predicate) {
1200+
Filter::try_new(new_predicate, Arc::new(plan))
1201+
.map(LogicalPlan::Filter)
11811202
} else {
1182-
Ok(Transformed::no(new_scan))
1203+
Ok(plan)
11831204
}
1184-
})
1205+
};
1206+
1207+
if additional_projection.is_empty() {
1208+
// No additional projection is required.
1209+
let new_scan = LogicalPlan::TableScan(TableScan {
1210+
filters: new_scan_filters,
1211+
..scan
1212+
});
1213+
return filtered(new_scan).map(Transformed::yes);
1214+
}
1215+
1216+
let scan_table_name = &scan.table_name;
1217+
let new_scan = filtered(
1218+
LogicalPlanBuilder::scan_with_filters_fetch(
1219+
scan_table_name.clone(),
1220+
Arc::clone(&scan.source),
1221+
scan.projection.clone().map(|mut projection| {
1222+
// Extend a projection.
1223+
projection.extend(additional_projection);
1224+
projection
1225+
}),
1226+
new_scan_filters,
1227+
scan.fetch,
1228+
)?
1229+
.build()?,
1230+
)?;
1231+
1232+
// Project fields required by the initial projection.
1233+
let new_plan = LogicalPlan::Projection(Projection::try_new_with_schema(
1234+
scan.projection
1235+
.as_ref()
1236+
.map(|projection| {
1237+
projection
1238+
.iter()
1239+
.cloned()
1240+
.map(|idx| {
1241+
Expr::Column(Column::new(
1242+
Some(scan_table_name.clone()),
1243+
source_schema.field(idx).name(),
1244+
))
1245+
})
1246+
.collect()
1247+
})
1248+
.unwrap_or_else(|| {
1249+
source_schema
1250+
.fields()
1251+
.iter()
1252+
.map(|field| {
1253+
Expr::Column(Column::new(
1254+
Some(scan_table_name.clone()),
1255+
field.name(),
1256+
))
1257+
})
1258+
.collect()
1259+
}),
1260+
Arc::new(new_scan),
1261+
// Preserve a projected schema metadata.
1262+
scan.projected_schema,
1263+
)?);
1264+
1265+
Ok(Transformed::yes(new_plan))
11851266
}
11861267
LogicalPlan::Extension(extension_plan) => {
11871268
// This check prevents the Filter from being removed when the extension node has no children,
@@ -3205,7 +3286,7 @@ mod tests {
32053286
let plan = table_scan_with_pushdown_provider_builder(
32063287
TableProviderFilterPushDown::Inexact,
32073288
vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))],
3208-
Some(vec![0]),
3289+
Some(vec![0, 1]),
32093290
)?
32103291
.filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))?
32113292
.project(vec![col("a"), col("b")])?
@@ -3216,7 +3297,7 @@ mod tests {
32163297
@r"
32173298
Projection: a, b
32183299
Filter: a = Int64(10) AND b > Int64(11)
3219-
TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]
3300+
TableScan: test projection=[a, b], partial_filters=[a = Int64(10), b > Int64(11)]
32203301
"
32213302
)
32223303
}
@@ -3226,7 +3307,7 @@ mod tests {
32263307
let plan = table_scan_with_pushdown_provider_builder(
32273308
TableProviderFilterPushDown::Exact,
32283309
vec![],
3229-
Some(vec![0]),
3310+
Some(vec![0, 1]),
32303311
)?
32313312
.filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))?
32323313
.project(vec![col("a"), col("b")])?
@@ -3236,7 +3317,7 @@ mod tests {
32363317
plan,
32373318
@r"
32383319
Projection: a, b
3239-
TableScan: test projection=[a], full_filters=[a = Int64(10), b > Int64(11)]
3320+
TableScan: test projection=[a, b], full_filters=[a = Int64(10), b > Int64(11)]
32403321
"
32413322
)
32423323
}
@@ -4292,4 +4373,34 @@ mod tests {
42924373
"
42934374
)
42944375
}
4376+
4377+
#[test]
4378+
fn test_projection_is_updated_when_filter_becomes_unsupported() -> Result<()> {
4379+
let test_provider = PushDownProvider {
4380+
filter_support: TableProviderFilterPushDown::Unsupported,
4381+
};
4382+
4383+
let projected_schema = test_provider.schema().project(&[0])?;
4384+
let table_scan = LogicalPlan::TableScan(TableScan {
4385+
table_name: "test".into(),
4386+
// Emulate that there were pushed filters but now
4387+
// provider cannot support it.
4388+
filters: vec![col("b").eq(lit(1i64))],
4389+
projected_schema: Arc::new(DFSchema::try_from(projected_schema)?),
4390+
projection: Some(vec![0]),
4391+
source: Arc::new(test_provider),
4392+
fetch: None,
4393+
});
4394+
4395+
let plan = LogicalPlanBuilder::from(table_scan)
4396+
.filter(col("a").eq(lit(1i64)))?
4397+
.build()?;
4398+
4399+
assert_optimized_plan_equal!(plan,
4400+
@r"
4401+
Projection: test.a
4402+
Filter: a = Int64(1) AND b = Int64(1)
4403+
TableScan: test projection=[a, b]"
4404+
)
4405+
}
42954406
}

datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ logical_plan
6060
04)------Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15)
6161
05)--------Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount
6262
06)----------Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG")) AND lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON")
63-
07)------------TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON"), lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)]
63+
07)------------TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], partial_filters=[lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2), lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON")]
6464
08)--------Filter: (part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1)
65-
09)----------TableScan: part projection=[p_partkey, p_brand, p_size, p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)]
65+
09)----------TableScan: part projection=[p_partkey, p_brand, p_size, p_container], partial_filters=[part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15), part.p_size >= Int32(1)]
6666
physical_plan
6767
01)ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@0 as revenue]
6868
02)--AggregateExec: mode=Final, gby=[], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]

datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ logical_plan
6464
06)----------Inner Join: Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_2.avg(customer.c_acctbal)
6565
07)------------Projection: customer.c_phone, customer.c_acctbal
6666
08)--------------LeftAnti Join: customer.c_custkey = __correlated_sq_1.o_custkey
67-
09)----------------Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")])
67+
09)----------------Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")]) AND Boolean(true)
6868
10)------------------TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")]), Boolean(true)]
6969
11)----------------SubqueryAlias: __correlated_sq_1
7070
12)------------------TableScan: orders projection=[o_custkey]
@@ -87,7 +87,7 @@ physical_plan
8787
11)--------------------CoalescePartitionsExec
8888
12)----------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
8989
13)------------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4
90-
14)--------------------------FilterExec: substr(c_phone@1, 1, 2) IN (SET) ([13, 31, 23, 29, 30, 18, 17])
90+
14)--------------------------FilterExec: substr(c_phone@1, 1, 2) IN (SET) ([13, 31, 23, 29, 30, 18, 17]) AND true
9191
15)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
9292
16)------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_phone, c_acctbal], file_type=csv, has_header=false
9393
17)------------------------RepartitionExec: partitioning=Hash([o_custkey@0], 4), input_partitions=4

0 commit comments

Comments
 (0)