Skip to content

Commit 727f513

Browse files
[IcebergIO] robust handling for filtering time types; expose new features to YAML SDK (#35515)
* robust handling for filter time types; expose new features to YAML sdk * fix test * pylint * pydocs * fix test * pydocs * Update sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java Co-authored-by: Derrick Williams <[email protected]> --------- Co-authored-by: Derrick Williams <[email protected]>
1 parent ccf9522 commit 727f513

File tree

7 files changed

+352
-37
lines changed

7 files changed

+352
-37
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 3
3+
"modification": 2
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@
1717
*/
1818
package org.apache.beam.sdk.io.iceberg;
1919

20+
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
21+
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
2022
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
2123
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2224

2325
import java.math.BigDecimal;
2426
import java.time.LocalDate;
2527
import java.time.LocalDateTime;
2628
import java.time.LocalTime;
29+
import java.time.format.DateTimeFormatter;
30+
import java.time.format.DateTimeFormatterBuilder;
31+
import java.time.format.DateTimeParseException;
32+
import java.util.Arrays;
2733
import java.util.HashSet;
2834
import java.util.List;
2935
import java.util.Map;
@@ -41,6 +47,10 @@
4147
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator;
4248
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParseException;
4349
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParser;
50+
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
51+
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.DateString;
52+
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.TimeString;
53+
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.TimestampString;
4454
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
4555
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
4656
import org.apache.iceberg.Schema;
@@ -312,6 +322,7 @@ private static Expression convertFieldAndLiteral(
312322
}
313323

314324
private static Object convertLiteral(SqlLiteral literal, String field, TypeID type) {
325+
SqlTypeName typeName = literal.getTypeName();
315326
switch (type) {
316327
case BOOLEAN:
317328
return literal.getValueAs(Boolean.class);
@@ -328,17 +339,69 @@ private static Object convertLiteral(SqlLiteral literal, String field, TypeID ty
328339
case STRING:
329340
return literal.getValueAs(String.class);
330341
case DATE:
331-
LocalDate date = LocalDate.parse(literal.getValueAs(String.class));
342+
LocalDate date;
343+
if (SqlTypeName.STRING_TYPES.contains(typeName)) {
344+
date = LocalDate.parse(literal.getValueAs(String.class));
345+
} else if (SqlTypeName.DATE.equals(typeName)) {
346+
DateString dateValue = literal.getValueAs(DateString.class);
347+
date = LocalDate.parse(dateValue.toString());
348+
} else {
349+
throw new IllegalArgumentException("Unexpected date type: " + literal.getTypeName());
350+
}
332351
return DateTimeUtil.daysFromDate(date);
333352
case TIME:
334-
LocalTime time = LocalTime.parse(literal.getValueAs(String.class));
353+
LocalTime time;
354+
if (SqlTypeName.STRING_TYPES.contains(typeName)) {
355+
time = LocalTime.parse(literal.getValueAs(String.class));
356+
} else if (SqlTypeName.TIME.equals(typeName)) {
357+
TimeString timeString = literal.getValueAs(TimeString.class);
358+
time = LocalTime.parse(timeString.toString());
359+
} else {
360+
throw new IllegalArgumentException("Unexpected date type: " + literal.getTypeName());
361+
}
335362
return DateTimeUtil.microsFromTime(time);
336363
case TIMESTAMP:
337-
LocalDateTime dateTime = LocalDateTime.parse(literal.getValueAs(String.class));
338-
return DateTimeUtil.microsFromTimestamp(dateTime);
364+
LocalDateTime datetime;
365+
if (SqlTypeName.STRING_TYPES.contains(typeName)) {
366+
String value = literal.getValueAs(String.class);
367+
datetime = getLocalDateTime(value);
368+
} else if (SqlTypeName.DATE.equals(typeName)) {
369+
DateString dateString = literal.getValueAs(DateString.class);
370+
datetime = LocalDateTime.of(LocalDate.parse(dateString.toString()), LocalTime.MIN);
371+
} else if (SqlTypeName.TIMESTAMP.equals(typeName)) {
372+
TimestampString timestampString = literal.getValueAs(TimestampString.class);
373+
datetime = getLocalDateTime(timestampString.toString());
374+
} else {
375+
throw new IllegalArgumentException("Unexpected timestamp type: " + literal.getTypeName());
376+
}
377+
return DateTimeUtil.microsFromTimestamp(datetime);
339378
default:
340379
throw new IllegalArgumentException(
341380
String.format("Unsupported filter type in field '%s': %s", field, type));
342381
}
343382
}
383+
384+
private static LocalDateTime getLocalDateTime(String value) {
385+
LocalDateTime datetime;
386+
for (DateTimeFormatter formatter : DATE_TIME_FORMATTERS) {
387+
try {
388+
datetime = LocalDateTime.parse(value, formatter);
389+
return datetime;
390+
} catch (DateTimeParseException ignored) {
391+
}
392+
}
393+
return LocalDateTime.of(LocalDate.parse(value), LocalTime.MIN);
394+
}
395+
396+
private static final List<DateTimeFormatter> DATE_TIME_FORMATTERS =
397+
Arrays.asList(
398+
DateTimeFormatter.ISO_LOCAL_DATE_TIME, // e.g., 2023-10-26T10:30:00[.SSSSSSSSS]
399+
new DateTimeFormatterBuilder()
400+
.parseCaseInsensitive()
401+
.append(ISO_LOCAL_DATE)
402+
.appendLiteral(' ')
403+
.append(ISO_LOCAL_TIME)
404+
.toFormatter(), // e.g. 2023-10-26 10:30:00[.SSSSSSSSS]
405+
ISO_LOCAL_DATE // For cases where you only have a date, then combine with a default time
406+
);
344407
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.iceberg.Table;
4040
import org.apache.iceberg.TableProperties;
4141
import org.apache.iceberg.data.IdentityPartitionConverters;
42+
import org.apache.iceberg.data.InternalRecordWrapper;
4243
import org.apache.iceberg.data.Record;
4344
import org.apache.iceberg.data.parquet.GenericParquetReaders;
4445
import org.apache.iceberg.encryption.EncryptedFiles;
@@ -198,10 +199,12 @@ static List<SnapshotInfo> snapshotsBetween(
198199

199200
public static CloseableIterable<Record> maybeApplyFilter(
200201
CloseableIterable<Record> iterable, IcebergScanConfig scanConfig) {
202+
InternalRecordWrapper wrapper =
203+
new InternalRecordWrapper(scanConfig.getRequiredSchema().asStruct());
201204
Expression filter = scanConfig.getFilter();
202205
Evaluator evaluator = scanConfig.getEvaluator();
203206
if (filter != null && evaluator != null && filter.op() != Expression.Operation.TRUE) {
204-
return CloseableIterable.filter(iterable, evaluator::eval);
207+
return CloseableIterable.filter(iterable, record -> evaluator.eval(wrapper.wrap(record)));
205208
}
206209
return iterable;
207210
}

0 commit comments

Comments
 (0)