Skip to content

Commit fd634ab

Browse files
committed
[IcebergIO]Create tables with Iceberg Table properties
1 parent 755082a commit fd634ab

File tree

8 files changed

+120
-2
lines changed

8 files changed

+120
-2
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 3
3+
"modification": 5
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergTableCreateConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.auto.value.AutoValue;
2121
import java.util.List;
22+
import java.util.Map;
2223
import org.apache.beam.sdk.schemas.Schema;
2324
import org.apache.iceberg.PartitionSpec;
2425
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -40,6 +41,9 @@ public PartitionSpec getPartitionSpec() {
4041
@Pure
4142
public abstract @Nullable List<String> getPartitionFields();
4243

44+
@Pure
45+
public abstract @Nullable Map<String, String> getTableProperties();
46+
4347
@Pure
4448
public static Builder builder() {
4549
return new AutoValue_IcebergTableCreateConfig.Builder();
@@ -51,6 +55,8 @@ public abstract static class Builder {
5155

5256
public abstract Builder setPartitionFields(@Nullable List<String> partitionFields);
5357

58+
public abstract Builder setTableProperties(@Nullable Map<String, String> tableProperties);
59+
5460
@Pure
5561
public abstract IcebergTableCreateConfig build();
5662
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ public static Builder builder() {
124124
+ "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.")
125125
public abstract @Nullable List<String> getPartitionFields();
126126

127+
@SchemaFieldDescription(
128+
"Iceberg table properties to be set on the table when it is created,"
129+
+ "For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
130+
public abstract @Nullable Map<String, String> getTableProperties();
131+
127132
@AutoValue.Builder
128133
public abstract static class Builder {
129134
public abstract Builder setTable(String table);
@@ -144,6 +149,8 @@ public abstract static class Builder {
144149

145150
public abstract Builder setPartitionFields(List<String> partitionFields);
146151

152+
public abstract Builder setTableProperties(Map<String, String> tableProperties);
153+
147154
public abstract Configuration build();
148155
}
149156

@@ -209,6 +216,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
209216
FileFormat.PARQUET.toString(),
210217
rows.getSchema(),
211218
configuration.getPartitionFields(),
219+
configuration.getTableProperties(),
212220
configuration.getDrop(),
213221
configuration.getKeep(),
214222
configuration.getOnly()));

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.iceberg;
1919

2020
import java.util.List;
21+
import java.util.Map;
2122
import org.apache.beam.sdk.schemas.Schema;
2223
import org.apache.beam.sdk.util.RowFilter;
2324
import org.apache.beam.sdk.util.RowStringInterpolator;
@@ -33,17 +34,20 @@ class PortableIcebergDestinations implements DynamicDestinations {
3334
private final String fileFormat;
3435

3536
private final @Nullable List<String> partitionFields;
37+
private final @Nullable Map<String, String> tableProperties;
3638

3739
public PortableIcebergDestinations(
3840
String destinationTemplate,
3941
String fileFormat,
4042
Schema inputSchema,
4143
@Nullable List<String> partitionFields,
44+
@Nullable Map<String, String> tableProperties,
4245
@Nullable List<String> fieldsToDrop,
4346
@Nullable List<String> fieldsToKeep,
4447
@Nullable String onlyField) {
4548
this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
4649
this.partitionFields = partitionFields;
50+
this.tableProperties = tableProperties;
4751
RowFilter rf = new RowFilter(inputSchema);
4852

4953
if (fieldsToDrop != null) {
@@ -82,6 +86,7 @@ public IcebergDestination instantiateDestination(String dest) {
8286
IcebergTableCreateConfig.builder()
8387
.setSchema(getDataSchema())
8488
.setPartitionFields(partitionFields)
89+
.setTableProperties(tableProperties)
8590
.build())
8691
.setFileFormat(FileFormat.fromString(fileFormat))
8792
.build();

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
293293
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
294294
PartitionSpec partitionSpec =
295295
createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
296+
Map<String, String> tableProperties =
297+
(createConfig != null && createConfig.getTableProperties() != null)
298+
? createConfig.getTableProperties()
299+
: Maps.newHashMap();
296300

297301
synchronized (TABLE_CACHE) {
298302
// Create namespace if it does not exist yet
@@ -316,7 +320,7 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
316320
} catch (NoSuchTableException e) { // Otherwise, create the table
317321
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
318322
try {
319-
table = catalog.createTable(identifier, tableSchema, partitionSpec);
323+
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
320324
LOG.info(
321325
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}",
322326
identifier,

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,4 +543,56 @@ public void testWriteCreateTableWithPartitionSpec() {
543543
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
544544
assertEquals(expectedSpec, table.spec());
545545
}
546+
547+
@Test
548+
public void testWriteCreateTableWithTableProperties() {
549+
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
550+
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();
551+
552+
// Use real Iceberg table property keys
553+
Map<String, String> tableProperties =
554+
ImmutableMap.of(
555+
"write.format.default", "orc",
556+
"commit.retry.num-retries", "5",
557+
"read.split.target-size", "134217728");
558+
559+
Map<String, Object> config =
560+
ImmutableMap.of(
561+
"table",
562+
identifier,
563+
"catalog_properties",
564+
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location),
565+
"table_properties",
566+
tableProperties);
567+
568+
List<Row> rows = new ArrayList<>();
569+
for (int i = 0; i < 10; i++) {
570+
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
571+
rows.add(row);
572+
}
573+
574+
PCollection<Row> result =
575+
testPipeline
576+
.apply("Records To Add", Create.of(rows))
577+
.setRowSchema(schema)
578+
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
579+
.get(SNAPSHOTS_TAG);
580+
581+
PAssert.that(result)
582+
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
583+
testPipeline.run().waitUntilFinish();
584+
585+
// Read back and check records are correct
586+
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
587+
PCollection<Row> readRows =
588+
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
589+
PAssert.that(readRows).containsInAnyOrder(rows);
590+
p.run().waitUntilFinish();
591+
592+
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
593+
// Assert that the table properties are set on the Iceberg table
594+
assertEquals("orc", table.properties().get("write.format.default"));
595+
assertEquals("5", table.properties().get("commit.retry.num-retries"));
596+
assertEquals("134217728", table.properties().get("read.split.target-size"));
597+
}
546598
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,11 @@ public void testStreamingWrite() throws IOException {
668668
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
669669
config.put("triggering_frequency_seconds", 4);
670670
config.put("partition_fields", Arrays.asList("bool_field", "modulo_5"));
671+
// Add table properties for testing
672+
Map<String, String> tableProperties = new HashMap<>();
673+
tableProperties.put("write.format.default", "parquet");
674+
tableProperties.put("commit.retry.num-retries", "3");
675+
config.put("table_properties", tableProperties);
671676

672677
// create elements from longs in range [0, 1000)
673678
PCollection<Row> input =
@@ -687,6 +692,8 @@ public void testStreamingWrite() throws IOException {
687692
List<Record> returnedRecords = readRecords(table);
688693
assertThat(
689694
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
695+
assertEquals("parquet", table.properties().get("write.format.default"));
696+
assertEquals("3", table.properties().get("commit.retry.num-retries"));
690697
}
691698

692699
@Test
@@ -971,4 +978,25 @@ public void runReadBetween(boolean useSnapshotBoundary, boolean streaming) throw
971978
PAssert.that(rows).containsInAnyOrder(expectedRows);
972979
pipeline.run().waitUntilFinish();
973980
}
981+
982+
@Test
983+
public void testWriteWithTableProperties() throws IOException {
984+
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
985+
Map<String, String> tableProperties = new HashMap<>();
986+
tableProperties.put("write.format.default", "parquet");
987+
tableProperties.put("commit.retry.num-retries", "3");
988+
config.put("table_properties", tableProperties);
989+
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
990+
input.apply(Managed.write(ICEBERG).withConfig(config));
991+
pipeline.run().waitUntilFinish();
992+
993+
Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
994+
// Read back and check records are correct
995+
List<Record> returnedRecords = readRecords(table);
996+
assertThat(
997+
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
998+
// Assert that the table properties are set on the Iceberg table
999+
assertEquals("parquet", table.properties().get("write.format.default"));
1000+
assertEquals("3", table.properties().get("commit.retry.num-retries"));
1001+
}
9741002
}

website/www/site/content/en/documentation/io/managed-io.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ and Beam SQL is invoked via the Managed API under the hood.
9898
only (<code style="color: green">str</code>)<br>
9999
partition_fields (<code>list[<span style="color: green;">str</span>]</code>)<br>
100100
triggering_frequency_seconds (<code style="color: #f54251">int32</code>)<br>
101+
table_properties (<code>map[<span style="color: green;">str</span>, <span style="color: green;
102+
">str</span>]</code>)<br>
101103
</td>
102104
</tr>
103105
<tr>
@@ -431,6 +433,19 @@ For more information on partition transforms, please visit https://iceberg.apach
431433
For a streaming pipeline, sets the frequency at which snapshots are produced.
432434
</td>
433435
</tr>
436+
<tr>
437+
<td>
438+
table_properties
439+
</td>
440+
<td>
441+
<code>map[<span style="color: green;">str</span>, <span style="color: green;">str</span>]</code>
442+
</td>
443+
<td>
444+
Table Properties set while creating Iceberg Table.
445+
446+
For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties
447+
</td>
448+
</tr>
434449
</table>
435450
</div>
436451

0 commit comments

Comments
 (0)