Skip to content

Commit a7b77ad

Browse files
committed
[IcebergIO]Create tables with Iceberg Table properties
1 parent ee74a3e commit a7b77ad

File tree

7 files changed

+85
-13
lines changed

7 files changed

+85
-13
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 5
3+
"modification": 2
44
}

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575

7676
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7777
* Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)).
78+
* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496))
7879

7980
## Breaking Changes
8081

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,26 @@ public org.apache.iceberg.catalog.Catalog catalog() {
8484
}
8585

8686
public void createTable(
87-
String tableIdentifier, Schema tableSchema, @Nullable List<String> partitionFields) {
87+
String tableIdentifier,
88+
Schema tableSchema,
89+
@Nullable List<String> partitionFields,
90+
@Nullable Map<String, String> tableProperties) {
8891
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
8992
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
9093
PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
9194
try {
92-
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
95+
catalog()
96+
.createTable(
97+
icebergIdentifier,
98+
icebergSchema,
99+
icebergSpec,
100+
tableProperties == null ? Maps.newHashMap() : tableProperties);
93101
LOG.info(
94-
"Created table '{}' with schema: {}\n, partition spec: {}",
102+
"Created table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
95103
icebergIdentifier,
96104
icebergSchema,
97-
icebergSpec);
105+
icebergSpec,
106+
tableProperties);
98107
} catch (AlreadyExistsException e) {
99108
throw new TableAlreadyExistsException(e);
100109
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
293293
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
294294
PartitionSpec partitionSpec =
295295
createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
296-
// Ensure table properties are not null, Iceberg's createTable does not accept null values.
297296
Map<String, String> tableProperties =
298-
(createConfig != null && createConfig.getTableProperties() != null)
297+
createConfig != null && createConfig.getTableProperties() != null
299298
? createConfig.getTableProperties()
300299
: Maps.newHashMap();
301300

@@ -323,10 +322,11 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
323322
try {
324323
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
325324
LOG.info(
326-
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}",
325+
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
327326
identifier,
328327
tableSchema,
329-
partitionSpec);
328+
partitionSpec,
329+
tableProperties);
330330
} catch (AlreadyExistsException ignored) {
331331
// race condition: another worker already created this table
332332
table = catalog.loadTable(identifier);

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ public void testWriteCreateTableWithPartitionSpec() {
545545
}
546546

547547
@Test
548-
public void testWriteCreateTableWithTableProperties() {
548+
public void testWriteCreateTableWithTablePropertiesSpec() {
549549
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
550550
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();
551551

@@ -595,4 +595,57 @@ public void testWriteCreateTableWithTableProperties() {
595595
assertEquals("5", table.properties().get("commit.retry.num-retries"));
596596
assertEquals("134217728", table.properties().get("read.split.target-size"));
597597
}
598+
599+
@Test
600+
public void testWriteCreateTableWithTableProperties() {
601+
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
602+
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();
603+
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
604+
PartitionSpec spec = PartitionSpec.unpartitioned();
605+
Map<String, String> tableProperties =
606+
ImmutableMap.of(
607+
"write.format.default", "orc",
608+
"commit.retry.num-retries", "5",
609+
"read.split.target-size", "134217728");
610+
611+
// Create the table with properties
612+
warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec, tableProperties);
613+
614+
List<Row> rows = new ArrayList<>();
615+
for (int i = 0; i < 10; i++) {
616+
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
617+
rows.add(row);
618+
}
619+
620+
Map<String, Object> config =
621+
ImmutableMap.of(
622+
"table",
623+
identifier,
624+
"catalog_properties",
625+
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));
626+
627+
PCollection<Row> result =
628+
testPipeline
629+
.apply("Records To Add", Create.of(rows))
630+
.setRowSchema(schema)
631+
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
632+
.get(SNAPSHOTS_TAG);
633+
634+
PAssert.that(result)
635+
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
636+
testPipeline.run().waitUntilFinish();
637+
638+
// Read back and check records are correct
639+
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
640+
PCollection<Row> readRows =
641+
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
642+
PAssert.that(readRows).containsInAnyOrder(rows);
643+
p.run().waitUntilFinish();
644+
645+
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
646+
// Assert that the table properties are set on the Iceberg table
647+
assertEquals("orc", table.properties().get("write.format.default"));
648+
assertEquals("5", table.properties().get("commit.retry.num-retries"));
649+
assertEquals("134217728", table.properties().get("read.split.target-size"));
650+
}
598651
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,22 @@ public DataFile writeRecords(
163163
}
164164

165165
public Table createTable(TableIdentifier tableId, Schema schema) {
166-
return createTable(tableId, schema, null);
166+
return createTable(tableId, schema, null, null);
167167
}
168168

169169
public Table createTable(
170170
TableIdentifier tableId, Schema schema, @Nullable PartitionSpec partitionSpec) {
171171
someTableHasBeenCreated = true;
172-
return catalog.createTable(tableId, schema, partitionSpec);
172+
return catalog.createTable(tableId, schema, partitionSpec, null);
173+
}
174+
175+
public Table createTable(
176+
TableIdentifier tableId,
177+
Schema schema,
178+
@Nullable PartitionSpec partitionSpec,
179+
@Nullable Map<String, String> tableProperties) {
180+
someTableHasBeenCreated = true;
181+
return catalog.createTable(tableId, schema, partitionSpec, tableProperties);
173182
}
174183

175184
public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) {

website/www/site/content/en/documentation/io/managed-io.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ For more information on partition transforms, please visit https://iceberg.apach
443443
<td>
444444
Table Properties set while creating Iceberg Table.
445445

446-
For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties
446+
For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties
447447
</td>
448448
</tr>
449449
</table>

0 commit comments

Comments
 (0)