Skip to content

Commit 20ce7a5

Browse files
viiryaclaude
andauthored
feat(datafusion): Add Binary scalar value conversion for predicate pushdown (apache#2048)
## Which issue does this PR close? - Closes #. ## What changes are included in this PR? Add support for converting Binary and LargeBinary DataFusion ScalarValue types to Iceberg Datum, enabling binary predicates to be pushed down to the Iceberg storage layer. This conversion allows SQL queries with binary hex literals (X'...') to push predicates down to Iceberg, improving query performance by filtering data at the storage level rather than in DataFusion. The integration test verifies that binary predicates are successfully pushed down end-to-end: - Without conversion: predicate stays in FilterExec with predicate:[] - With conversion: predicate pushed to IcebergTableScan Other scalar types (Boolean, Timestamp, Decimal) were investigated but excluded because they are not reachable through practical usage: - Boolean: DataFusion aggressively optimizes comparisons (e.g., x=true becomes just x) before reaching the converter - Timestamp/Decimal: SQL literals are converted to strings/other types before reaching the converter ## Are these changes tested? --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent ee21563 commit 20ce7a5

File tree

5 files changed

+133
-0
lines changed

5 files changed

+133
-0
lines changed

crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
212212
ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
213213
ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
214214
ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
215+
ScalarValue::Binary(Some(v)) => Some(Datum::binary(v.clone())),
216+
ScalarValue::LargeBinary(Some(v)) => Some(Datum::binary(v.clone())),
215217
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
216218
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
217219
_ => None,
@@ -429,4 +431,31 @@ mod tests {
429431
let predicate = convert_to_iceberg_predicate(sql);
430432
assert_eq!(predicate, None);
431433
}
434+
435+
#[test]
436+
fn test_scalar_value_to_datum_binary() {
437+
use datafusion::common::ScalarValue;
438+
439+
let bytes = vec![1u8, 2u8, 3u8];
440+
let datum = super::scalar_value_to_datum(&ScalarValue::Binary(Some(bytes.clone())));
441+
assert_eq!(datum, Some(Datum::binary(bytes.clone())));
442+
443+
let datum = super::scalar_value_to_datum(&ScalarValue::LargeBinary(Some(bytes.clone())));
444+
assert_eq!(datum, Some(Datum::binary(bytes)));
445+
446+
let datum = super::scalar_value_to_datum(&ScalarValue::Binary(None));
447+
assert_eq!(datum, None);
448+
}
449+
450+
#[test]
451+
fn test_predicate_conversion_with_binary() {
452+
let sql = "foo = 1 and bar = X'0102'";
453+
let predicate = convert_to_iceberg_predicate(sql).unwrap();
454+
// Binary literals are converted to Datum::binary
455+
// Note: SQL literal 1 is converted to Long by DataFusion
456+
let expected_predicate = Reference::new("foo")
457+
.equal_to(Datum::long(1))
458+
.and(Reference::new("bar").equal_to(Datum::binary(vec![1u8, 2u8])));
459+
assert_eq!(predicate, expected_predicate);
460+
}
432461
}

crates/sqllogictest/src/engine/datafusion.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ impl DataFusionEngine {
9595

9696
// Create partitioned test table (unpartitioned tables are now created via SQL)
9797
Self::create_partitioned_table(&catalog, &namespace).await?;
98+
Self::create_binary_table(&catalog, &namespace).await?;
9899

99100
Ok(Arc::new(
100101
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
@@ -134,4 +135,31 @@ impl DataFusionEngine {
134135

135136
Ok(())
136137
}
138+
139+
/// Create a test table with binary type column
140+
/// Used for testing binary predicate pushdown
141+
/// TODO: this can be removed when we support CREATE TABLE
142+
async fn create_binary_table(
143+
catalog: &impl Catalog,
144+
namespace: &NamespaceIdent,
145+
) -> anyhow::Result<()> {
146+
let schema = Schema::builder()
147+
.with_fields(vec![
148+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
149+
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Binary)).into(),
150+
])
151+
.build()?;
152+
153+
catalog
154+
.create_table(
155+
namespace,
156+
TableCreation::builder()
157+
.name("test_binary_table".to_string())
158+
.schema(schema)
159+
.build(),
160+
)
161+
.await?;
162+
163+
Ok(())
164+
}
137165
}

crates/sqllogictest/testdata/schedules/df_test.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,7 @@ slt = "df_test/create_table.slt"
2929
[[steps]]
3030
engine = "df"
3131
slt = "df_test/insert_into.slt"
32+
33+
[[steps]]
34+
engine = "df"
35+
slt = "df_test/binary_predicate_pushdown.slt"
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Test that binary predicates are pushed down to IcebergTableScan
19+
# This validates that ScalarValue::Binary is correctly converted to Datum::binary
20+
21+
# Verify EXPLAIN shows binary predicate is pushed down to IcebergTableScan
22+
query TT
23+
EXPLAIN SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
24+
----
25+
logical_plan
26+
01)Filter: default.default.test_binary_table.data = LargeBinary("1,2")
27+
02)--TableScan: default.default.test_binary_table projection=[id, data], partial_filters=[default.default.test_binary_table.data = LargeBinary("1,2")]
28+
physical_plan
29+
01)CoalesceBatchesExec: target_batch_size=8192
30+
02)--FilterExec: data@1 = 0102
31+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
32+
04)------CooperativeExec
33+
05)--------IcebergTableScan projection:[id,data] predicate:[data = 0102]
34+
35+
# Verify empty result from empty table
36+
query I?
37+
SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
38+
----
39+
40+
# Insert test data
41+
query I
42+
INSERT INTO default.default.test_binary_table VALUES (1, X'0102')
43+
----
44+
1
45+
46+
# Verify binary predicate filtering works
47+
query I?
48+
SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
49+
----
50+
1 0102
51+
52+
# Insert more rows with different binary values
53+
query I
54+
INSERT INTO default.default.test_binary_table VALUES (2, X'0304'), (3, X'0102'), (4, X'0506')
55+
----
56+
3
57+
58+
# Verify binary predicate filters correctly
59+
query I? rowsort
60+
SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
61+
----
62+
1 0102
63+
3 0102
64+
65+
# Verify different binary value
66+
query I?
67+
SELECT * FROM default.default.test_binary_table WHERE data = X'0506'
68+
----
69+
4 0506

crates/sqllogictest/testdata/slts/df_test/show_tables.slt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ datafusion information_schema routines VIEW
2525
datafusion information_schema schemata VIEW
2626
datafusion information_schema tables VIEW
2727
datafusion information_schema views VIEW
28+
default default test_binary_table BASE TABLE
29+
default default test_binary_table$manifests BASE TABLE
30+
default default test_binary_table$snapshots BASE TABLE
2831
default default test_partitioned_table BASE TABLE
2932
default default test_partitioned_table$manifests BASE TABLE
3033
default default test_partitioned_table$snapshots BASE TABLE

0 commit comments

Comments
 (0)