Skip to content

Commit a08c2ec

Browse files
viiryaclaude
andcommitted
feat(datafusion): Add Boolean predicate pushdown support
This commit adds comprehensive support for pushing down Boolean predicates to the Iceberg table scan layer, improving query performance by filtering data at the storage level. Changes: - Enhanced expr_to_predicate.rs to handle boolean column expressions: * Bare boolean columns in filters (e.g., WHERE is_active) are converted to column = true predicates * NOT of boolean columns (e.g., WHERE NOT is_active) are converted to column = false predicates * Added Boolean scalar value to Datum conversion - Added comprehensive sqllogictest (boolean_predicate_pushdown.slt) with: * Tests for is_active = true/false with EXPLAIN verification * Tests for is_active != true with EXPLAIN verification * Tests for combined predicates (AND/OR) * Tests for IS NULL/IS NOT NULL on boolean columns - Created test_boolean_table in engine setup for testing - Updated test schedule and show_tables baseline All tests verify that predicates are successfully pushed down to IcebergTableScan, not just executed in FilterExec. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent ac4e4b0 commit a08c2ec

File tree

5 files changed

+196
-2
lines changed

5 files changed

+196
-2
lines changed

crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,18 @@ pub fn convert_filters_to_predicate(filters: &[Expr]) -> Option<Predicate> {
5151
fn convert_filter_to_predicate(expr: &Expr) -> Option<Predicate> {
5252
match to_iceberg_predicate(expr) {
5353
TransformedResult::Predicate(predicate) => Some(predicate),
54-
TransformedResult::Column(_) | TransformedResult::Literal(_) => {
55-
unreachable!("Not a valid expression: {:?}", expr)
54+
TransformedResult::Column(column) => {
55+
// A bare column in a filter context represents a boolean column check
56+
// Convert it to: column = true
57+
Some(Predicate::Binary(BinaryExpression::new(
58+
PredicateOperator::Eq,
59+
column,
60+
Datum::bool(true),
61+
)))
62+
}
63+
TransformedResult::Literal(_) => {
64+
// Literal values in filter context cannot be pushed down
65+
None
5666
}
5767
_ => None,
5868
}
@@ -75,6 +85,14 @@ fn to_iceberg_predicate(expr: &Expr) -> TransformedResult {
7585
let expr = to_iceberg_predicate(exp);
7686
match expr {
7787
TransformedResult::Predicate(p) => TransformedResult::Predicate(!p),
88+
TransformedResult::Column(column) => {
89+
// NOT of a bare boolean column: NOT col => col = false
90+
TransformedResult::Predicate(Predicate::Binary(BinaryExpression::new(
91+
PredicateOperator::Eq,
92+
column,
93+
Datum::bool(false),
94+
)))
95+
}
7896
_ => TransformedResult::NotTransformed,
7997
}
8098
}
@@ -254,6 +272,7 @@ const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
254272
/// Convert a scalar value to an iceberg datum.
255273
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
256274
match value {
275+
ScalarValue::Boolean(Some(v)) => Some(Datum::bool(*v)),
257276
ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)),
258277
ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)),
259278
ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)),
@@ -509,6 +528,23 @@ mod tests {
509528
assert_eq!(predicate, expected_predicate);
510529
}
511530

531+
#[test]
532+
fn test_scalar_value_to_datum_boolean() {
533+
use datafusion::common::ScalarValue;
534+
535+
// Test boolean true
536+
let datum = super::scalar_value_to_datum(&ScalarValue::Boolean(Some(true)));
537+
assert_eq!(datum, Some(Datum::bool(true)));
538+
539+
// Test boolean false
540+
let datum = super::scalar_value_to_datum(&ScalarValue::Boolean(Some(false)));
541+
assert_eq!(datum, Some(Datum::bool(false)));
542+
543+
// Test None boolean
544+
let datum = super::scalar_value_to_datum(&ScalarValue::Boolean(None));
545+
assert_eq!(datum, None);
546+
}
547+
512548
#[test]
513549
fn test_predicate_conversion_with_like_starts_with() {
514550
let sql = "bar LIKE 'test%'";

crates/sqllogictest/src/engine/datafusion.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl DataFusionEngine {
9696
// Create partitioned test table (unpartitioned tables are now created via SQL)
9797
Self::create_partitioned_table(&catalog, &namespace).await?;
9898
Self::create_binary_table(&catalog, &namespace).await?;
99+
Self::create_boolean_table(&catalog, &namespace).await?;
99100

100101
Ok(Arc::new(
101102
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
@@ -162,4 +163,32 @@ impl DataFusionEngine {
162163

163164
Ok(())
164165
}
166+
167+
/// Create a test table with boolean type column
168+
/// Used for testing boolean predicate pushdown
169+
/// TODO: this can be removed when we support CREATE TABLE
170+
async fn create_boolean_table(
171+
catalog: &impl Catalog,
172+
namespace: &NamespaceIdent,
173+
) -> anyhow::Result<()> {
174+
let schema = Schema::builder()
175+
.with_fields(vec![
176+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
177+
NestedField::optional(2, "is_active", Type::Primitive(PrimitiveType::Boolean)).into(),
178+
NestedField::optional(3, "description", Type::Primitive(PrimitiveType::String)).into(),
179+
])
180+
.build()?;
181+
182+
catalog
183+
.create_table(
184+
namespace,
185+
TableCreation::builder()
186+
.name("test_boolean_table".to_string())
187+
.schema(schema)
188+
.build(),
189+
)
190+
.await?;
191+
192+
Ok(())
193+
}
165194
}

crates/sqllogictest/testdata/schedules/df_test.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ slt = "df_test/binary_predicate_pushdown.slt"
4646
engine = "df"
4747
slt = "df_test/like_predicate_pushdown.slt"
4848

49+
[[steps]]
50+
engine = "df"
51+
slt = "df_test/boolean_predicate_pushdown.slt"
52+
4953
[[steps]]
5054
engine = "df"
5155
slt = "df_test/drop_table.slt"
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Insert test data into test_boolean_table
19+
statement ok
20+
INSERT INTO default.default.test_boolean_table VALUES
21+
(1, true, 'Active user'),
22+
(2, false, 'Inactive user'),
23+
(3, true, 'Premium member'),
24+
(4, false, 'Trial expired'),
25+
(5, true, 'Verified account'),
26+
(6, NULL, 'Pending verification')
27+
28+
# Verify boolean equality predicate is pushed down to IcebergTableScan
29+
query TT
30+
EXPLAIN SELECT * FROM default.default.test_boolean_table WHERE is_active = true
31+
----
32+
logical_plan
33+
01)Filter: default.default.test_boolean_table.is_active
34+
02)--TableScan: default.default.test_boolean_table projection=[id, is_active, description], partial_filters=[default.default.test_boolean_table.is_active]
35+
physical_plan
36+
01)CoalesceBatchesExec: target_batch_size=8192
37+
02)--FilterExec: is_active@1
38+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
39+
04)------CooperativeExec
40+
05)--------IcebergTableScan projection:[id,is_active,description] predicate:[is_active = true]
41+
42+
# Query with is_active = true
43+
query ITT rowsort
44+
SELECT * FROM default.default.test_boolean_table WHERE is_active = true
45+
----
46+
1 true Active user
47+
3 true Premium member
48+
5 true Verified account
49+
50+
# Verify boolean false predicate is pushed down to IcebergTableScan
51+
query TT
52+
EXPLAIN SELECT * FROM default.default.test_boolean_table WHERE is_active = false
53+
----
54+
logical_plan
55+
01)Filter: NOT default.default.test_boolean_table.is_active
56+
02)--TableScan: default.default.test_boolean_table projection=[id, is_active, description], partial_filters=[NOT default.default.test_boolean_table.is_active]
57+
physical_plan
58+
01)CoalesceBatchesExec: target_batch_size=8192
59+
02)--FilterExec: NOT is_active@1
60+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
61+
04)------CooperativeExec
62+
05)--------IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false]
63+
64+
# Query with is_active = false
65+
query ITT rowsort
66+
SELECT * FROM default.default.test_boolean_table WHERE is_active = false
67+
----
68+
2 false Inactive user
69+
4 false Trial expired
70+
71+
# Verify boolean NOT EQUAL predicate is pushed down
72+
query TT
73+
EXPLAIN SELECT * FROM default.default.test_boolean_table WHERE is_active != true
74+
----
75+
logical_plan
76+
01)Filter: NOT default.default.test_boolean_table.is_active
77+
02)--TableScan: default.default.test_boolean_table projection=[id, is_active, description], partial_filters=[NOT default.default.test_boolean_table.is_active]
78+
physical_plan
79+
01)CoalesceBatchesExec: target_batch_size=8192
80+
02)--FilterExec: NOT is_active@1
81+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
82+
04)------CooperativeExec
83+
05)--------IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false]
84+
85+
# Query with is_active != true (includes false and NULL)
86+
query ITT rowsort
87+
SELECT * FROM default.default.test_boolean_table WHERE is_active != true
88+
----
89+
2 false Inactive user
90+
4 false Trial expired
91+
92+
# Test combined boolean predicates with AND
93+
query ITT rowsort
94+
SELECT * FROM default.default.test_boolean_table WHERE is_active = true AND id > 2
95+
----
96+
3 true Premium member
97+
5 true Verified account
98+
99+
# Test combined boolean predicates with OR
100+
query ITT rowsort
101+
SELECT * FROM default.default.test_boolean_table WHERE is_active = true OR id = 2
102+
----
103+
1 true Active user
104+
2 false Inactive user
105+
3 true Premium member
106+
5 true Verified account
107+
108+
# Test IS NULL on boolean column
109+
query ITT
110+
SELECT * FROM default.default.test_boolean_table WHERE is_active IS NULL
111+
----
112+
6 NULL Pending verification
113+
114+
# Test IS NOT NULL on boolean column
115+
query ITT rowsort
116+
SELECT * FROM default.default.test_boolean_table WHERE is_active IS NOT NULL
117+
----
118+
1 true Active user
119+
2 false Inactive user
120+
3 true Premium member
121+
4 false Trial expired
122+
5 true Verified account

crates/sqllogictest/testdata/slts/df_test/show_tables.slt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ datafusion information_schema views VIEW
2828
default default test_binary_table BASE TABLE
2929
default default test_binary_table$manifests BASE TABLE
3030
default default test_binary_table$snapshots BASE TABLE
31+
default default test_boolean_table BASE TABLE
32+
default default test_boolean_table$manifests BASE TABLE
33+
default default test_boolean_table$snapshots BASE TABLE
3134
default default test_partitioned_table BASE TABLE
3235
default default test_partitioned_table$manifests BASE TABLE
3336
default default test_partitioned_table$snapshots BASE TABLE

0 commit comments

Comments
 (0)