Skip to content

Commit fa0152b

Browse files
authored
[IcebergIO] Use InternalRecordWrapper partition util (#33701)
* Use Iceberg InternalRecordWrapper partition util * add test * spotless
1 parent cc9cc22 commit fa0152b

File tree

4 files changed

+38
-33
lines changed

4 files changed

+38
-33
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 2
3+
"modification": 3
44
}

sdks/java/io/iceberg/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ dependencies {
5555
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
5656
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
5757
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
58+
implementation "org.apache.iceberg:iceberg-data:$iceberg_version"
5859
implementation library.java.hadoop_common
5960
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
6061

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,11 @@
5050
import org.apache.iceberg.Table;
5151
import org.apache.iceberg.catalog.Catalog;
5252
import org.apache.iceberg.catalog.TableIdentifier;
53-
import org.apache.iceberg.data.GenericRecord;
53+
import org.apache.iceberg.data.InternalRecordWrapper;
5454
import org.apache.iceberg.data.Record;
5555
import org.apache.iceberg.exceptions.AlreadyExistsException;
5656
import org.apache.iceberg.exceptions.NoSuchTableException;
57-
import org.apache.iceberg.expressions.Literal;
58-
import org.apache.iceberg.transforms.Transform;
5957
import org.apache.iceberg.transforms.Transforms;
60-
import org.apache.iceberg.types.Types;
6158
import org.checkerframework.checker.nullness.qual.Nullable;
6259
import org.slf4j.Logger;
6360
import org.slf4j.LoggerFactory;
@@ -106,12 +103,14 @@ class DestinationState {
106103
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
107104
private final Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
108105
private final List<Exception> exceptions = Lists.newArrayList();
106+
private final InternalRecordWrapper wrapper; // wrapper that facilitates partitioning
109107

110108
DestinationState(IcebergDestination icebergDestination, Table table) {
111109
this.icebergDestination = icebergDestination;
112110
this.schema = table.schema();
113111
this.spec = table.spec();
114112
this.routingPartitionKey = new PartitionKey(spec, schema);
113+
this.wrapper = new InternalRecordWrapper(schema.asStruct());
115114
this.table = table;
116115
for (PartitionField partitionField : spec.fields()) {
117116
partitionFieldMap.put(partitionField.name(), partitionField);
@@ -156,7 +155,7 @@ class DestinationState {
156155
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
157156
*/
158157
boolean write(Record record) {
159-
routingPartitionKey.partition(getPartitionableRecord(record));
158+
routingPartitionKey.partition(wrapper.wrap(record));
160159

161160
@Nullable RecordWriter writer = writers.getIfPresent(routingPartitionKey);
162161
if (writer == null && openWriters >= maxNumWriters) {
@@ -207,30 +206,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
207206
e);
208207
}
209208
}
210-
211-
/**
212-
* Resolves an input {@link Record}'s partition values and returns another {@link Record} that
213-
* can be applied to the destination's {@link PartitionSpec}.
214-
*/
215-
private Record getPartitionableRecord(Record record) {
216-
if (spec.isUnpartitioned()) {
217-
return record;
218-
}
219-
Record output = GenericRecord.create(schema);
220-
for (PartitionField partitionField : spec.fields()) {
221-
Transform<?, ?> transform = partitionField.transform();
222-
Types.NestedField field = schema.findField(partitionField.sourceId());
223-
String name = field.name();
224-
Object value = record.getField(name);
225-
@Nullable Literal<Object> literal = Literal.of(value.toString()).to(field.type());
226-
if (literal == null || transform.isVoid() || transform.isIdentity()) {
227-
output.setField(name, value);
228-
} else {
229-
output.setField(name, literal.value());
230-
}
231-
}
232-
return output;
233-
}
234209
}
235210

236211
/**

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.iceberg;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2021
import static org.hamcrest.MatcherAssert.assertThat;
2122
import static org.hamcrest.Matchers.containsInAnyOrder;
2223
import static org.hamcrest.Matchers.containsString;
@@ -27,9 +28,12 @@
2728
import static org.junit.Assert.assertTrue;
2829

2930
import java.io.IOException;
31+
import java.net.URLEncoder;
32+
import java.nio.charset.StandardCharsets;
3033
import java.time.LocalDate;
3134
import java.time.LocalDateTime;
3235
import java.util.ArrayList;
36+
import java.util.Arrays;
3337
import java.util.HashMap;
3438
import java.util.List;
3539
import java.util.Map;
@@ -451,10 +455,27 @@ public void testIdentityPartitioning() throws IOException {
451455
.addFloatField("float")
452456
.addDoubleField("double")
453457
.addStringField("str")
458+
.addLogicalTypeField("date", SqlTypes.DATE)
459+
.addLogicalTypeField("time", SqlTypes.TIME)
460+
.addLogicalTypeField("datetime", SqlTypes.DATETIME)
461+
.addDateTimeField("datetime_tz")
454462
.build();
455-
463+
String timestamp = "2025-01-21T13:18:20.053";
464+
LocalDateTime localDateTime = LocalDateTime.parse(timestamp);
456465
Row row =
457-
Row.withSchema(primitiveTypeSchema).addValues(true, 1, 1L, 1.23f, 4.56, "str").build();
466+
Row.withSchema(primitiveTypeSchema)
467+
.addValues(
468+
true,
469+
1,
470+
1L,
471+
1.23f,
472+
4.56,
473+
"str",
474+
localDateTime.toLocalDate(),
475+
localDateTime.toLocalTime(),
476+
localDateTime,
477+
DateTime.parse(timestamp))
478+
.build();
458479
org.apache.iceberg.Schema icebergSchema =
459480
IcebergUtils.beamSchemaToIcebergSchema(primitiveTypeSchema);
460481
PartitionSpec spec =
@@ -465,6 +486,10 @@ public void testIdentityPartitioning() throws IOException {
465486
.identity("float")
466487
.identity("double")
467488
.identity("str")
489+
.identity("date")
490+
.identity("time")
491+
.identity("datetime")
492+
.identity("datetime_tz")
468493
.build();
469494
WindowedValue<IcebergDestination> dest =
470495
getWindowedDestination("identity_partitioning", icebergSchema, spec);
@@ -479,8 +504,12 @@ public void testIdentityPartitioning() throws IOException {
479504
assertEquals(1, dataFile.getRecordCount());
480505
// build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str
481506
List<String> expectedPartitions = new ArrayList<>();
507+
List<String> dateTypes = Arrays.asList("date", "time", "datetime", "datetime_tz");
482508
for (Schema.Field field : primitiveTypeSchema.getFields()) {
483-
Object val = row.getValue(field.getName());
509+
Object val = checkStateNotNull(row.getValue(field.getName()));
510+
if (dateTypes.contains(field.getName())) {
511+
val = URLEncoder.encode(val.toString(), StandardCharsets.UTF_8.toString());
512+
}
484513
expectedPartitions.add(field.getName() + "=" + val);
485514
}
486515
String expectedPartitionPath = String.join("/", expectedPartitions);

0 commit comments

Comments
 (0)