Skip to content

Commit 6842f3a

Browse files
Add Encryption When Writing to Iceberg Tables in RecordWriter.java (#34021)
1 parent 332fe03 commit 6842f3a

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.iceberg.catalog.Catalog;
3030
import org.apache.iceberg.data.Record;
3131
import org.apache.iceberg.data.parquet.GenericParquetWriter;
32+
import org.apache.iceberg.encryption.EncryptedOutputFile;
33+
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
3234
import org.apache.iceberg.io.DataWriter;
3335
import org.apache.iceberg.io.FileIO;
3436
import org.apache.iceberg.io.OutputFile;
@@ -60,7 +62,6 @@ class RecordWriter {
6062
throws IOException {
6163
this.table = table;
6264
this.fileFormat = fileFormat;
63-
6465
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
6566

6667
if (table.spec().isUnpartitioned()) {
@@ -72,8 +73,12 @@ class RecordWriter {
7273
table.locationProvider().newDataLocation(table.spec(), partitionKey, filename));
7374
}
7475
OutputFile outputFile;
76+
EncryptionKeyMetadata keyMetadata;
7577
try (FileIO io = table.io()) {
76-
outputFile = io.newOutputFile(absoluteFilename);
78+
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
79+
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
80+
outputFile = encryptedOutputFile.encryptingOutputFile();
81+
keyMetadata = encryptedOutputFile.keyMetadata();
7782
}
7883

7984
switch (fileFormat) {
@@ -85,6 +90,7 @@ class RecordWriter {
8590
.withSpec(table.spec())
8691
.withPartition(partitionKey)
8792
.metricsConfig(metricsConfig)
93+
.withKeyMetadata(keyMetadata)
8894
.overwrite()
8995
.build();
9096
break;
@@ -96,6 +102,7 @@ class RecordWriter {
96102
.withSpec(table.spec())
97103
.withPartition(partitionKey)
98104
.metricsConfig(metricsConfig)
105+
.withKeyMetadata(keyMetadata)
99106
.overwrite()
100107
.build();
101108
break;
@@ -114,6 +121,7 @@ class RecordWriter {
114121
}
115122

116123
public void write(Record record) {
124+
117125
icebergDataWriter.write(record);
118126
}
119127

0 commit comments

Comments
 (0)