Skip to content

Commit 0ffd2f2

Browse files
viiryaclaude
andauthored
feat(datafusion): Add Timestamp scalar value conversion for predicate pushdown (apache#2069)
## Which issue does this PR close? - Closes #. ## What changes are included in this PR? Add support for converting DataFusion Timestamp scalar values to Iceberg Datum for predicate pushdown. This enables timestamp literal comparisons to be pushed down to the storage layer when DataFusion provides timestamp literals in ScalarValue form, improving query performance. Changes: - Add conversion for 2 timestamp time units (Microsecond, Nanosecond) to Iceberg's native representations - Add comprehensive unit tests covering all time units and edge cases - Add sqllogictest (timestamp_predicate_pushdown.slt) to validate end-to-end timestamp predicate filtering and pushdown behavior Implementation details: - Iceberg uses microseconds and nanoseconds as the native timestamp representations - Timezone information in ScalarValue is preserved but not used in conversion (Iceberg timestamp type is timezone-agnostic) ## Are these changes tested? --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent c56411a commit 0ffd2f2

File tree

5 files changed

+278
-1
lines changed

5 files changed

+278
-1
lines changed

crates/iceberg/src/arrow/schema.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
2424
use arrow_array::{
2525
BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
2626
FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27-
TimestampMicrosecondArray,
27+
TimestampMicrosecondArray, TimestampNanosecondArray,
2828
};
2929
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
3030
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
@@ -742,6 +742,12 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send
742742
(PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
743743
TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
744744
))),
745+
(PrimitiveType::TimestampNs, PrimitiveLiteral::Long(value)) => {
746+
Ok(Arc::new(TimestampNanosecondArray::new_scalar(*value)))
747+
}
748+
(PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
749+
TimestampNanosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
750+
))),
745751
(PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
746752
let array = Decimal128Array::from_value(*value, 1)
747753
.with_precision_and_scale(*precision as _, *scale as _)

crates/iceberg/src/spec/values/datum.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,18 @@ impl PartialOrd for Datum {
233233
PrimitiveType::Timestamptz,
234234
PrimitiveType::Timestamptz,
235235
) => val.partial_cmp(other_val),
236+
(
237+
PrimitiveLiteral::Long(val),
238+
PrimitiveLiteral::Long(other_val),
239+
PrimitiveType::TimestampNs,
240+
PrimitiveType::TimestampNs,
241+
) => val.partial_cmp(other_val),
242+
(
243+
PrimitiveLiteral::Long(val),
244+
PrimitiveLiteral::Long(other_val),
245+
PrimitiveType::TimestamptzNs,
246+
PrimitiveType::TimestamptzNs,
247+
) => val.partial_cmp(other_val),
236248
(
237249
PrimitiveLiteral::String(val),
238250
PrimitiveLiteral::String(other_val),

crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator {
269269
}
270270

271271
const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
272+
272273
/// Convert a scalar value to an iceberg datum.
273274
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
274275
match value {
@@ -285,6 +286,13 @@ fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
285286
ScalarValue::LargeBinary(Some(v)) => Some(Datum::binary(v.clone())),
286287
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
287288
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
289+
// Timestamp conversions
290+
// Note: TimestampSecond and TimestampMillisecond are not handled here because
291+
// DataFusion's type coercion always converts them to match the column type
292+
// (either TimestampMicrosecond or TimestampNanosecond) before predicate pushdown.
293+
// See unit tests for how those conversions would work if needed.
294+
ScalarValue::TimestampMicrosecond(Some(v), _) => Some(Datum::timestamp_micros(*v)),
295+
ScalarValue::TimestampNanosecond(Some(v), _) => Some(Datum::timestamp_nanos(*v)),
288296
_ => None,
289297
}
290298
}
@@ -501,6 +509,42 @@ mod tests {
501509
assert_eq!(predicate, None);
502510
}
503511

512+
#[test]
513+
fn test_scalar_value_to_datum_timestamp() {
514+
use datafusion::common::ScalarValue;
515+
516+
// Test TimestampMicrosecond - maps directly to Datum::timestamp_micros
517+
let ts_micros = 1672876800000000i64; // 2023-01-05 00:00:00 UTC in microseconds
518+
let datum =
519+
super::scalar_value_to_datum(&ScalarValue::TimestampMicrosecond(Some(ts_micros), None));
520+
assert_eq!(datum, Some(Datum::timestamp_micros(ts_micros)));
521+
522+
// Test TimestampNanosecond - maps to Datum::timestamp_nanos to preserve precision
523+
let ts_nanos = 1672876800000000500i64; // 2023-01-05 00:00:00.000000500 UTC in nanoseconds
524+
let datum =
525+
super::scalar_value_to_datum(&ScalarValue::TimestampNanosecond(Some(ts_nanos), None));
526+
assert_eq!(datum, Some(Datum::timestamp_nanos(ts_nanos)));
527+
528+
// Test None timestamp
529+
let datum = super::scalar_value_to_datum(&ScalarValue::TimestampMicrosecond(None, None));
530+
assert_eq!(datum, None);
531+
532+
// Note: TimestampSecond and TimestampMillisecond are not supported because
533+
// DataFusion's type coercion converts them to TimestampMicrosecond or TimestampNanosecond
534+
// before they reach scalar_value_to_datum in SQL queries.
535+
//
536+
// These return None (not pushed down):
537+
let ts_seconds = 1672876800i64; // 2023-01-05 00:00:00 UTC in seconds
538+
let datum =
539+
super::scalar_value_to_datum(&ScalarValue::TimestampSecond(Some(ts_seconds), None));
540+
assert_eq!(datum, None);
541+
542+
let ts_millis = 1672876800000i64; // 2023-01-05 00:00:00 UTC in milliseconds
543+
let datum =
544+
super::scalar_value_to_datum(&ScalarValue::TimestampMillisecond(Some(ts_millis), None));
545+
assert_eq!(datum, None);
546+
}
547+
504548
#[test]
505549
fn test_scalar_value_to_datum_binary() {
506550
use datafusion::common::ScalarValue;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[engines]
19+
df = { type = "datafusion" }
20+
21+
[[steps]]
22+
engine = "df"
23+
slt = "df_test/timestamp_predicate_pushdown.slt"
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Test timestamp predicate pushdown behavior
19+
#
20+
# When you CREATE TABLE with TIMESTAMP in DataFusion, it creates a TimestampNs column
21+
# (nanosecond precision) in Iceberg, since DataFusion's default TIMESTAMP type is nanoseconds.
22+
#
23+
# We use Datum::timestamp_nanos() for nanosecond timestamp predicates to preserve
24+
# full precision. This allows predicates to be correctly pushed down to Iceberg.
25+
26+
# Create test table with timestamp column
27+
statement ok
28+
CREATE TABLE default.default.test_timestamp_table (id INT NOT NULL, ts TIMESTAMP)
29+
30+
# Insert test data with timestamps
31+
# We use CAST to convert string timestamps to proper timestamp values
32+
query I
33+
INSERT INTO default.default.test_timestamp_table
34+
VALUES
35+
(1, CAST('2023-01-01 00:00:00' AS TIMESTAMP)),
36+
(2, CAST('2023-01-05 12:30:00' AS TIMESTAMP)),
37+
(3, CAST('2023-01-10 15:45:30' AS TIMESTAMP)),
38+
(4, CAST('2023-01-15 09:00:00' AS TIMESTAMP)),
39+
(5, CAST('2023-01-20 18:20:10' AS TIMESTAMP))
40+
----
41+
5
42+
43+
# Verify timestamp equality predicate IS pushed down
44+
query TT
45+
EXPLAIN SELECT * FROM default.default.test_timestamp_table WHERE ts = CAST('2023-01-05 12:30:00' AS TIMESTAMP)
46+
----
47+
logical_plan
48+
01)Filter: default.default.test_timestamp_table.ts = TimestampNanosecond(1672921800000000000, None)
49+
02)--TableScan: default.default.test_timestamp_table projection=[id, ts], partial_filters=[default.default.test_timestamp_table.ts = TimestampNanosecond(1672921800000000000, None)]
50+
physical_plan
51+
01)CoalesceBatchesExec: target_batch_size=8192
52+
02)--FilterExec: ts@1 = 1672921800000000000
53+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
54+
04)------CooperativeExec
55+
05)--------IcebergTableScan projection:[id,ts] predicate:[ts = 2023-01-05 12:30:00]
56+
57+
# Verify timestamp equality filtering works
58+
query I?
59+
SELECT * FROM default.default.test_timestamp_table WHERE ts = CAST('2023-01-05 12:30:00' AS TIMESTAMP)
60+
----
61+
2 2023-01-05T12:30:00
62+
63+
# Verify timestamp greater than predicate IS pushed down
64+
query TT
65+
EXPLAIN SELECT * FROM default.default.test_timestamp_table WHERE ts > CAST('2023-01-10 00:00:00' AS TIMESTAMP)
66+
----
67+
logical_plan
68+
01)Filter: default.default.test_timestamp_table.ts > TimestampNanosecond(1673308800000000000, None)
69+
02)--TableScan: default.default.test_timestamp_table projection=[id, ts], partial_filters=[default.default.test_timestamp_table.ts > TimestampNanosecond(1673308800000000000, None)]
70+
physical_plan
71+
01)CoalesceBatchesExec: target_batch_size=8192
72+
02)--FilterExec: ts@1 > 1673308800000000000
73+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
74+
04)------CooperativeExec
75+
05)--------IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-10 00:00:00]
76+
77+
# Verify timestamp greater than filtering
78+
query I? rowsort
79+
SELECT * FROM default.default.test_timestamp_table WHERE ts > CAST('2023-01-10 00:00:00' AS TIMESTAMP)
80+
----
81+
3 2023-01-10T15:45:30
82+
4 2023-01-15T09:00:00
83+
5 2023-01-20T18:20:10
84+
85+
# Test timestamp less than or equal filtering
86+
query I? rowsort
87+
SELECT * FROM default.default.test_timestamp_table WHERE ts <= CAST('2023-01-05 12:30:00' AS TIMESTAMP)
88+
----
89+
1 2023-01-01T00:00:00
90+
2 2023-01-05T12:30:00
91+
92+
# Verify timestamp range predicate (AND of two comparisons) IS pushed down
93+
query TT
94+
EXPLAIN SELECT * FROM default.default.test_timestamp_table
95+
WHERE ts >= CAST('2023-01-05 00:00:00' AS TIMESTAMP)
96+
AND ts <= CAST('2023-01-15 23:59:59' AS TIMESTAMP)
97+
----
98+
logical_plan
99+
01)Filter: default.default.test_timestamp_table.ts >= TimestampNanosecond(1672876800000000000, None) AND default.default.test_timestamp_table.ts <= TimestampNanosecond(1673827199000000000, None)
100+
02)--TableScan: default.default.test_timestamp_table projection=[id, ts], partial_filters=[default.default.test_timestamp_table.ts >= TimestampNanosecond(1672876800000000000, None), default.default.test_timestamp_table.ts <= TimestampNanosecond(1673827199000000000, None)]
101+
physical_plan
102+
01)CoalesceBatchesExec: target_batch_size=8192
103+
02)--FilterExec: ts@1 >= 1672876800000000000 AND ts@1 <= 1673827199000000000
104+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
105+
04)------CooperativeExec
106+
05)--------IcebergTableScan projection:[id,ts] predicate:[(ts >= 2023-01-05 00:00:00) AND (ts <= 2023-01-15 23:59:59)]
107+
108+
# Test timestamp range predicate filtering
109+
query I? rowsort
110+
SELECT * FROM default.default.test_timestamp_table
111+
WHERE ts >= CAST('2023-01-05 00:00:00' AS TIMESTAMP)
112+
AND ts <= CAST('2023-01-15 23:59:59' AS TIMESTAMP)
113+
----
114+
2 2023-01-05T12:30:00
115+
3 2023-01-10T15:45:30
116+
4 2023-01-15T09:00:00
117+
118+
# Test timestamp predicate combined with other predicates
119+
query I? rowsort
120+
SELECT * FROM default.default.test_timestamp_table
121+
WHERE ts >= CAST('2023-01-10 00:00:00' AS TIMESTAMP) AND id < 5
122+
----
123+
3 2023-01-10T15:45:30
124+
4 2023-01-15T09:00:00
125+
126+
# Test timestamp NOT EQUAL predicate
127+
query I? rowsort
128+
SELECT * FROM default.default.test_timestamp_table WHERE ts != CAST('2023-01-05 12:30:00' AS TIMESTAMP)
129+
----
130+
1 2023-01-01T00:00:00
131+
3 2023-01-10T15:45:30
132+
4 2023-01-15T09:00:00
133+
5 2023-01-20T18:20:10
134+
135+
# Test timestamp less than filtering
136+
query I? rowsort
137+
SELECT * FROM default.default.test_timestamp_table WHERE ts < CAST('2023-01-05 00:00:00' AS TIMESTAMP)
138+
----
139+
1 2023-01-01T00:00:00
140+
141+
# Clean up: Drop the test table
142+
statement ok
143+
DROP TABLE default.default.test_timestamp_table
144+
145+
# ============================================================================
146+
# Test timestamp predicate pushdown with different precisions
147+
# ============================================================================
148+
149+
# Test with TIMESTAMP(6) - microsecond precision
150+
statement ok
151+
CREATE TABLE default.default.test_timestamp_micros (id INT NOT NULL, ts TIMESTAMP(6))
152+
153+
query I
154+
INSERT INTO default.default.test_timestamp_micros
155+
VALUES
156+
(1, CAST('2023-01-01 00:00:00' AS TIMESTAMP)),
157+
(2, CAST('2023-01-05 12:30:00' AS TIMESTAMP))
158+
----
159+
2
160+
161+
# Verify microsecond timestamp predicate is pushed down
162+
query TT
163+
EXPLAIN SELECT * FROM default.default.test_timestamp_micros WHERE ts > CAST('2023-01-01 00:00:00' AS TIMESTAMP)
164+
----
165+
logical_plan
166+
01)Filter: default.default.test_timestamp_micros.ts > TimestampMicrosecond(1672531200000000, None)
167+
02)--TableScan: default.default.test_timestamp_micros projection=[id, ts], partial_filters=[default.default.test_timestamp_micros.ts > TimestampMicrosecond(1672531200000000, None)]
168+
physical_plan
169+
01)CoalesceBatchesExec: target_batch_size=8192
170+
02)--FilterExec: ts@1 > 1672531200000000
171+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
172+
04)------CooperativeExec
173+
05)--------IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-01 00:00:00]
174+
175+
query I?
176+
SELECT * FROM default.default.test_timestamp_micros WHERE ts > CAST('2023-01-01 00:00:00' AS TIMESTAMP)
177+
----
178+
2 2023-01-05T12:30:00
179+
180+
statement ok
181+
DROP TABLE default.default.test_timestamp_micros
182+
183+
# Test with TIMESTAMP(3) - millisecond precision
184+
# This should fail because Iceberg doesn't support millisecond precision
185+
statement error DataFusion error: External error: DataInvalid => Unsupported Arrow data type: Timestamp\(ms\)
186+
CREATE TABLE default.default.test_timestamp_millis (id INT NOT NULL, ts TIMESTAMP(3))
187+
188+
# Test with TIMESTAMP(0) - second precision
189+
# This should fail because Iceberg doesn't support second precision
190+
statement error DataFusion error: External error: DataInvalid => Unsupported Arrow data type: Timestamp\(s\)
191+
CREATE TABLE default.default.test_timestamp_seconds (id INT NOT NULL, ts TIMESTAMP(0))
192+

0 commit comments

Comments
 (0)