Skip to content

Commit 011970d

Browse files
authored
[IcebergIO] Create tables with Iceberg Table Properties (#35496)
* [IcebergIO]Create tables with Iceberg Table properties * [IcebergIO]Create tables with Iceberg Table properties * [IcebergIO]Create tables with Iceberg Table properties * [IcebergIO]Create tables with Iceberg Table properties
1 parent 3d2e15b commit 011970d

File tree

10 files changed

+187
-6
lines changed

10 files changed

+187
-6
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 3
3+
"modification": 2
44
}

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575

7676
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7777
* Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)).
78+
* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496))
7879
* Milvus enrichment handler added (Python) ([#35216](https://github.com/apache/beam/pull/35216)).
7980
Beam now supports Milvus enrichment handler capabilities for vector, keyword,
8081
and hybrid search operations.

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergTableCreateConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.auto.value.AutoValue;
2121
import java.util.List;
22+
import java.util.Map;
2223
import org.apache.beam.sdk.schemas.Schema;
2324
import org.apache.iceberg.PartitionSpec;
2425
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -40,6 +41,9 @@ public PartitionSpec getPartitionSpec() {
4041
@Pure
4142
public abstract @Nullable List<String> getPartitionFields();
4243

44+
@Pure
45+
public abstract @Nullable Map<String, String> getTableProperties();
46+
4347
@Pure
4448
public static Builder builder() {
4549
return new AutoValue_IcebergTableCreateConfig.Builder();
@@ -51,6 +55,8 @@ public abstract static class Builder {
5155

5256
public abstract Builder setPartitionFields(@Nullable List<String> partitionFields);
5357

58+
public abstract Builder setTableProperties(@Nullable Map<String, String> tableProperties);
59+
5460
@Pure
5561
public abstract IcebergTableCreateConfig build();
5662
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ public static Builder builder() {
124124
+ "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.")
125125
public abstract @Nullable List<String> getPartitionFields();
126126

127+
@SchemaFieldDescription(
128+
"Iceberg table properties to be set on the table when it is created.\n"
129+
+ "For more information on table properties,"
130+
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
131+
public abstract @Nullable Map<String, String> getTableProperties();
132+
127133
@AutoValue.Builder
128134
public abstract static class Builder {
129135
public abstract Builder setTable(String table);
@@ -144,6 +150,8 @@ public abstract static class Builder {
144150

145151
public abstract Builder setPartitionFields(List<String> partitionFields);
146152

153+
public abstract Builder setTableProperties(Map<String, String> tableProperties);
154+
147155
public abstract Configuration build();
148156
}
149157

@@ -209,6 +217,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
209217
FileFormat.PARQUET.toString(),
210218
rows.getSchema(),
211219
configuration.getPartitionFields(),
220+
configuration.getTableProperties(),
212221
configuration.getDrop(),
213222
configuration.getKeep(),
214223
configuration.getOnly()));

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.iceberg;
1919

2020
import java.util.List;
21+
import java.util.Map;
2122
import org.apache.beam.sdk.schemas.Schema;
2223
import org.apache.beam.sdk.util.RowFilter;
2324
import org.apache.beam.sdk.util.RowStringInterpolator;
@@ -33,17 +34,20 @@ class PortableIcebergDestinations implements DynamicDestinations {
3334
private final String fileFormat;
3435

3536
private final @Nullable List<String> partitionFields;
37+
private final @Nullable Map<String, String> tableProperties;
3638

3739
public PortableIcebergDestinations(
3840
String destinationTemplate,
3941
String fileFormat,
4042
Schema inputSchema,
4143
@Nullable List<String> partitionFields,
44+
@Nullable Map<String, String> tableProperties,
4245
@Nullable List<String> fieldsToDrop,
4346
@Nullable List<String> fieldsToKeep,
4447
@Nullable String onlyField) {
4548
this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
4649
this.partitionFields = partitionFields;
50+
this.tableProperties = tableProperties;
4751
RowFilter rf = new RowFilter(inputSchema);
4852

4953
if (fieldsToDrop != null) {
@@ -82,6 +86,7 @@ public IcebergDestination instantiateDestination(String dest) {
8286
IcebergTableCreateConfig.builder()
8387
.setSchema(getDataSchema())
8488
.setPartitionFields(partitionFields)
89+
.setTableProperties(tableProperties)
8590
.build())
8691
.setFileFormat(FileFormat.fromString(fileFormat))
8792
.build();

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
293293
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
294294
PartitionSpec partitionSpec =
295295
createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
296+
Map<String, String> tableProperties =
297+
createConfig != null && createConfig.getTableProperties() != null
298+
? createConfig.getTableProperties()
299+
: Maps.newHashMap();
296300

297301
synchronized (TABLE_CACHE) {
298302
// Create namespace if it does not exist yet
@@ -316,12 +320,13 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
316320
} catch (NoSuchTableException e) { // Otherwise, create the table
317321
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
318322
try {
319-
table = catalog.createTable(identifier, tableSchema, partitionSpec);
323+
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
320324
LOG.info(
321-
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}",
325+
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
322326
identifier,
323327
tableSchema,
324-
partitionSpec);
328+
partitionSpec,
329+
tableProperties);
325330
} catch (AlreadyExistsException ignored) {
326331
// race condition: another worker already created this table
327332
table = catalog.loadTable(identifier);

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,4 +543,109 @@ public void testWriteCreateTableWithPartitionSpec() {
543543
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
544544
assertEquals(expectedSpec, table.spec());
545545
}
546+
547+
@Test
548+
public void testWriteCreateTableWithTablePropertiesSpec() {
549+
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
550+
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();
551+
552+
// Use real Iceberg table property keys
553+
Map<String, String> tableProperties =
554+
ImmutableMap.of(
555+
"write.format.default", "orc",
556+
"commit.retry.num-retries", "5",
557+
"read.split.target-size", "134217728");
558+
559+
Map<String, Object> config =
560+
ImmutableMap.of(
561+
"table",
562+
identifier,
563+
"catalog_properties",
564+
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location),
565+
"table_properties",
566+
tableProperties);
567+
568+
List<Row> rows = new ArrayList<>();
569+
for (int i = 0; i < 10; i++) {
570+
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
571+
rows.add(row);
572+
}
573+
574+
PCollection<Row> result =
575+
testPipeline
576+
.apply("Records To Add", Create.of(rows))
577+
.setRowSchema(schema)
578+
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
579+
.get(SNAPSHOTS_TAG);
580+
581+
PAssert.that(result)
582+
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
583+
testPipeline.run().waitUntilFinish();
584+
585+
// Read back and check records are correct
586+
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
587+
PCollection<Row> readRows =
588+
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
589+
PAssert.that(readRows).containsInAnyOrder(rows);
590+
p.run().waitUntilFinish();
591+
592+
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
593+
// Assert that the table properties are set on the Iceberg table
594+
assertEquals("orc", table.properties().get("write.format.default"));
595+
assertEquals("5", table.properties().get("commit.retry.num-retries"));
596+
assertEquals("134217728", table.properties().get("read.split.target-size"));
597+
}
598+
599+
@Test
600+
public void testWriteCreateTableWithTableProperties() {
601+
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
602+
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();
603+
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
604+
PartitionSpec spec = PartitionSpec.unpartitioned();
605+
Map<String, String> tableProperties =
606+
ImmutableMap.of(
607+
"write.format.default", "orc",
608+
"commit.retry.num-retries", "5",
609+
"read.split.target-size", "134217728");
610+
611+
// Create the table with properties
612+
warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec, tableProperties);
613+
614+
List<Row> rows = new ArrayList<>();
615+
for (int i = 0; i < 10; i++) {
616+
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
617+
rows.add(row);
618+
}
619+
620+
Map<String, Object> config =
621+
ImmutableMap.of(
622+
"table",
623+
identifier,
624+
"catalog_properties",
625+
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));
626+
627+
PCollection<Row> result =
628+
testPipeline
629+
.apply("Records To Add", Create.of(rows))
630+
.setRowSchema(schema)
631+
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
632+
.get(SNAPSHOTS_TAG);
633+
634+
PAssert.that(result)
635+
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
636+
testPipeline.run().waitUntilFinish();
637+
638+
// Read back and check records are correct
639+
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
640+
PCollection<Row> readRows =
641+
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
642+
PAssert.that(readRows).containsInAnyOrder(rows);
643+
p.run().waitUntilFinish();
644+
645+
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
646+
// Assert that the table properties are set on the Iceberg table
647+
assertEquals("orc", table.properties().get("write.format.default"));
648+
assertEquals("5", table.properties().get("commit.retry.num-retries"));
649+
assertEquals("134217728", table.properties().get("read.split.target-size"));
650+
}
546651
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,22 @@ public DataFile writeRecords(
163163
}
164164

165165
public Table createTable(TableIdentifier tableId, Schema schema) {
166-
return createTable(tableId, schema, null);
166+
return createTable(tableId, schema, null, null);
167167
}
168168

169169
public Table createTable(
170170
TableIdentifier tableId, Schema schema, @Nullable PartitionSpec partitionSpec) {
171171
someTableHasBeenCreated = true;
172-
return catalog.createTable(tableId, schema, partitionSpec);
172+
return catalog.createTable(tableId, schema, partitionSpec, null);
173+
}
174+
175+
public Table createTable(
176+
TableIdentifier tableId,
177+
Schema schema,
178+
@Nullable PartitionSpec partitionSpec,
179+
@Nullable Map<String, String> tableProperties) {
180+
someTableHasBeenCreated = true;
181+
return catalog.createTable(tableId, schema, partitionSpec, tableProperties);
173182
}
174183

175184
public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) {

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,11 @@ public void testStreamingWrite() throws IOException {
668668
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
669669
config.put("triggering_frequency_seconds", 4);
670670
config.put("partition_fields", Arrays.asList("bool_field", "modulo_5"));
671+
// Add table properties for testing
672+
Map<String, String> tableProperties = new HashMap<>();
673+
tableProperties.put("write.format.default", "parquet");
674+
tableProperties.put("commit.retry.num-retries", "3");
675+
config.put("table_properties", tableProperties);
671676

672677
// create elements from longs in range [0, 1000)
673678
PCollection<Row> input =
@@ -687,6 +692,8 @@ public void testStreamingWrite() throws IOException {
687692
List<Record> returnedRecords = readRecords(table);
688693
assertThat(
689694
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
695+
assertEquals("parquet", table.properties().get("write.format.default"));
696+
assertEquals("3", table.properties().get("commit.retry.num-retries"));
690697
}
691698

692699
@Test
@@ -971,4 +978,25 @@ public void runReadBetween(boolean useSnapshotBoundary, boolean streaming) throw
971978
PAssert.that(rows).containsInAnyOrder(expectedRows);
972979
pipeline.run().waitUntilFinish();
973980
}
981+
982+
@Test
983+
public void testWriteWithTableProperties() throws IOException {
984+
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
985+
Map<String, String> tableProperties = new HashMap<>();
986+
tableProperties.put("write.format.default", "parquet");
987+
tableProperties.put("commit.retry.num-retries", "3");
988+
config.put("table_properties", tableProperties);
989+
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
990+
input.apply(Managed.write(ICEBERG).withConfig(config));
991+
pipeline.run().waitUntilFinish();
992+
993+
Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
994+
// Read back and check records are correct
995+
List<Record> returnedRecords = readRecords(table);
996+
assertThat(
997+
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
998+
// Assert that the table properties are set on the Iceberg table
999+
assertEquals("parquet", table.properties().get("write.format.default"));
1000+
assertEquals("3", table.properties().get("commit.retry.num-retries"));
1001+
}
9741002
}

website/www/site/content/en/documentation/io/managed-io.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ and Beam SQL is invoked via the Managed API under the hood.
9797
keep (<code>list[<span style="color: green;">str</span>]</code>)<br>
9898
only (<code style="color: green">str</code>)<br>
9999
partition_fields (<code>list[<span style="color: green;">str</span>]</code>)<br>
100+
table_properties (<code>map[<span style="color: green;">str</span>, <span style="color: green;">str</span>]</code>)<br>
100101
triggering_frequency_seconds (<code style="color: #f54251">int32</code>)<br>
101102
</td>
102103
</tr>
@@ -420,6 +421,18 @@ and Beam SQL is invoked via the Managed API under the hood.
420421
For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.
421422
</td>
422423
</tr>
424+
<tr>
425+
<td>
426+
table_properties
427+
</td>
428+
<td>
429+
<code>map[<span style="color: green;">str</span>, <span style="color: green;">str</span>]</code>
430+
</td>
431+
<td>
432+
Iceberg table properties to be set on the table when it is created.
433+
For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.
434+
</td>
435+
</tr>
423436
<tr>
424437
<td>
425438
triggering_frequency_seconds

0 commit comments

Comments
 (0)