Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3
"modification": 2
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)).
* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496))
* Milvus enrichment handler added (Python) ([#35216](https://github.com/apache/beam/pull/35216)).
Beam now supports Milvus enrichment handler capabilities for vector, keyword,
and hybrid search operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,26 @@ public org.apache.iceberg.catalog.Catalog catalog() {
}

public void createTable(
String tableIdentifier, Schema tableSchema, @Nullable List<String> partitionFields) {
String tableIdentifier,
Schema tableSchema,
@Nullable List<String> partitionFields,
@Nullable Map<String, String> tableProperties) {
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
try {
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
catalog()
.createTable(
icebergIdentifier,
icebergSchema,
icebergSpec,
tableProperties == null ? Maps.newHashMap() : tableProperties);
LOG.info(
"Created table '{}' with schema: {}\n, partition spec: {}",
"Created table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
icebergIdentifier,
icebergSchema,
icebergSpec);
icebergSpec,
tableProperties);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.auto.value.AutoValue;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.iceberg.PartitionSpec;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -40,6 +41,9 @@ public PartitionSpec getPartitionSpec() {
@Pure
public abstract @Nullable List<String> getPartitionFields();

@Pure
public abstract @Nullable Map<String, String> getTableProperties();

@Pure
public static Builder builder() {
return new AutoValue_IcebergTableCreateConfig.Builder();
Expand All @@ -51,6 +55,8 @@ public abstract static class Builder {

public abstract Builder setPartitionFields(@Nullable List<String> partitionFields);

public abstract Builder setTableProperties(@Nullable Map<String, String> tableProperties);

@Pure
public abstract IcebergTableCreateConfig build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public static Builder builder() {
+ "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.")
public abstract @Nullable List<String> getPartitionFields();

@SchemaFieldDescription(
"Iceberg table properties to be set on the table when it is created.\n"
+ "For more information on table properties,"
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
public abstract @Nullable Map<String, String> getTableProperties();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);
Expand All @@ -144,6 +150,8 @@ public abstract static class Builder {

public abstract Builder setPartitionFields(List<String> partitionFields);

public abstract Builder setTableProperties(Map<String, String> tableProperties);

public abstract Configuration build();
}

Expand Down Expand Up @@ -209,6 +217,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
FileFormat.PARQUET.toString(),
rows.getSchema(),
configuration.getPartitionFields(),
configuration.getTableProperties(),
configuration.getDrop(),
configuration.getKeep(),
configuration.getOnly()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.iceberg;

import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.util.RowStringInterpolator;
Expand All @@ -33,17 +34,20 @@ class PortableIcebergDestinations implements DynamicDestinations {
private final String fileFormat;

private final @Nullable List<String> partitionFields;
private final @Nullable Map<String, String> tableProperties;

public PortableIcebergDestinations(
String destinationTemplate,
String fileFormat,
Schema inputSchema,
@Nullable List<String> partitionFields,
@Nullable Map<String, String> tableProperties,
@Nullable List<String> fieldsToDrop,
@Nullable List<String> fieldsToKeep,
@Nullable String onlyField) {
this.interpolator = new RowStringInterpolator(destinationTemplate, inputSchema);
this.partitionFields = partitionFields;
this.tableProperties = tableProperties;
RowFilter rf = new RowFilter(inputSchema);

if (fieldsToDrop != null) {
Expand Down Expand Up @@ -82,6 +86,7 @@ public IcebergDestination instantiateDestination(String dest) {
IcebergTableCreateConfig.builder()
.setSchema(getDataSchema())
.setPartitionFields(partitionFields)
.setTableProperties(tableProperties)
.build())
.setFileFormat(FileFormat.fromString(fileFormat))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
@Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
PartitionSpec partitionSpec =
createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
Map<String, String> tableProperties =
createConfig != null && createConfig.getTableProperties() != null
? createConfig.getTableProperties()
: Maps.newHashMap();

synchronized (TABLE_CACHE) {
// Create namespace if it does not exist yet
Expand All @@ -316,12 +320,13 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
try {
table = catalog.createTable(identifier, tableSchema, partitionSpec);
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
LOG.info(
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}",
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
identifier,
tableSchema,
partitionSpec);
partitionSpec,
tableProperties);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this table
table = catalog.loadTable(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,109 @@ public void testWriteCreateTableWithPartitionSpec() {
Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
assertEquals(expectedSpec, table.spec());
}

@Test
public void testWriteCreateTableWithTablePropertiesSpec() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();

// Use real Iceberg table property keys
Map<String, String> tableProperties =
ImmutableMap.of(
"write.format.default", "orc",
"commit.retry.num-retries", "5",
"read.split.target-size", "134217728");

Map<String, Object> config =
ImmutableMap.of(
"table",
identifier,
"catalog_properties",
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location),
"table_properties",
tableProperties);

List<Row> rows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
rows.add(row);
}

PCollection<Row> result =
testPipeline
.apply("Records To Add", Create.of(rows))
.setRowSchema(schema)
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();

// Read back and check records are correct
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
PCollection<Row> readRows =
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
PAssert.that(readRows).containsInAnyOrder(rows);
p.run().waitUntilFinish();

Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
// Assert that the table properties are set on the Iceberg table
assertEquals("orc", table.properties().get("write.format.default"));
assertEquals("5", table.properties().get("commit.retry.num-retries"));
assertEquals("134217728", table.properties().get("read.split.target-size"));
}

@Test
public void testWriteCreateTableWithTableProperties() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build();
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> tableProperties =
ImmutableMap.of(
"write.format.default", "orc",
"commit.retry.num-retries", "5",
"read.split.target-size", "134217728");

// Create the table with properties
warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec, tableProperties);

List<Row> rows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Row row = Row.withSchema(schema).addValues("str_" + i, i).build();
rows.add(row);
}

Map<String, Object> config =
ImmutableMap.of(
"table",
identifier,
"catalog_properties",
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));

PCollection<Row> result =
testPipeline
.apply("Records To Add", Create.of(rows))
.setRowSchema(schema)
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();

// Read back and check records are correct
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
PCollection<Row> readRows =
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
PAssert.that(readRows).containsInAnyOrder(rows);
p.run().waitUntilFinish();

Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
// Assert that the table properties are set on the Iceberg table
assertEquals("orc", table.properties().get("write.format.default"));
assertEquals("5", table.properties().get("commit.retry.num-retries"));
assertEquals("134217728", table.properties().get("read.split.target-size"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,22 @@ public DataFile writeRecords(
}

public Table createTable(TableIdentifier tableId, Schema schema) {
return createTable(tableId, schema, null);
return createTable(tableId, schema, null, null);
}

public Table createTable(
TableIdentifier tableId, Schema schema, @Nullable PartitionSpec partitionSpec) {
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec);
return catalog.createTable(tableId, schema, partitionSpec, null);
}

public Table createTable(
TableIdentifier tableId,
Schema schema,
@Nullable PartitionSpec partitionSpec,
@Nullable Map<String, String> tableProperties) {
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec, tableProperties);
}

public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,11 @@ public void testStreamingWrite() throws IOException {
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
config.put("triggering_frequency_seconds", 4);
config.put("partition_fields", Arrays.asList("bool_field", "modulo_5"));
// Add table properties for testing
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("write.format.default", "parquet");
tableProperties.put("commit.retry.num-retries", "3");
config.put("table_properties", tableProperties);

// create elements from longs in range [0, 1000)
PCollection<Row> input =
Expand All @@ -687,6 +692,8 @@ public void testStreamingWrite() throws IOException {
List<Record> returnedRecords = readRecords(table);
assertThat(
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
assertEquals("parquet", table.properties().get("write.format.default"));
assertEquals("3", table.properties().get("commit.retry.num-retries"));
}

@Test
Expand Down Expand Up @@ -971,4 +978,25 @@ public void runReadBetween(boolean useSnapshotBoundary, boolean streaming) throw
PAssert.that(rows).containsInAnyOrder(expectedRows);
pipeline.run().waitUntilFinish();
}

@Test
public void testWriteWithTableProperties() throws IOException {
Map<String, Object> config = new HashMap<>(managedIcebergConfig(tableId()));
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("write.format.default", "parquet");
tableProperties.put("commit.retry.num-retries", "3");
config.put("table_properties", tableProperties);
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
// Read back and check records are correct
List<Record> returnedRecords = readRecords(table);
assertThat(
returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
// Assert that the table properties are set on the Iceberg table
assertEquals("parquet", table.properties().get("write.format.default"));
assertEquals("3", table.properties().get("commit.retry.num-retries"));
}
}
15 changes: 15 additions & 0 deletions website/www/site/content/en/documentation/io/managed-io.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ and Beam SQL is invoked via the Managed API under the hood.
only (<code style="color: green">str</code>)<br>
partition_fields (<code>list[<span style="color: green;">str</span>]</code>)<br>
triggering_frequency_seconds (<code style="color: #f54251">int32</code>)<br>
table_properties (<code>map[<span style="color: green;">str</span>, <span style="color: green;
">str</span>]</code>)<br>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -431,6 +433,19 @@ For more information on partition transforms, please visit https://iceberg.apach
For a streaming pipeline, sets the frequency at which snapshots are produced.
</td>
</tr>
<tr>
<td>
table_properties
</td>
<td>
<code>map[<span style="color: green;">str</span>, <span style="color: green;">str</span>]</code>
</td>
<td>
Table Properties set while creating Iceberg Table.

For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties
</td>
</tr>
</table>
</div>

Expand Down
Loading