Skip to content

Commit 5a3ddc4

Browse files
authored
[Iceberg] cleanup FileIO resources (#33509)
* cleanup FileIO resources * trigger integration tests * cleanup
1 parent b4c3a4f commit 5a3ddc4

File tree

3 files changed

+11
-7
lines changed

3 files changed

+11
-7
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 4
3+
"modification": 1
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.iceberg.Table;
4949
import org.apache.iceberg.catalog.Catalog;
5050
import org.apache.iceberg.io.FileIO;
51-
import org.apache.iceberg.io.OutputFile;
5251
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
5352
import org.slf4j.Logger;
5453
import org.slf4j.LoggerFactory;
@@ -187,8 +186,10 @@ private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWrit
187186
int specId = entry.getKey();
188187
List<DataFile> files = entry.getValue();
189188
PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId));
190-
ManifestWriter<DataFile> writer =
191-
createManifestWriter(table.location(), uuid, spec, table.io());
189+
ManifestWriter<DataFile> writer;
190+
try (FileIO io = table.io()) {
191+
writer = createManifestWriter(table.location(), uuid, spec, io);
192+
}
192193
for (DataFile file : files) {
193194
writer.add(file);
194195
committedDataFileByteSize.update(file.fileSizeInBytes());
@@ -207,8 +208,7 @@ private ManifestWriter<DataFile> createManifestWriter(
207208
String.format(
208209
"%s/metadata/%s-%s-%s.manifest",
209210
tableLocation, manifestFilePrefix, uuid, spec.specId()));
210-
OutputFile outputFile = io.newOutputFile(location);
211-
return ManifestFiles.write(spec, outputFile);
211+
return ManifestFiles.write(spec, io.newOutputFile(location));
212212
}
213213
}
214214
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iceberg.data.Record;
3030
import org.apache.iceberg.data.parquet.GenericParquetWriter;
3131
import org.apache.iceberg.io.DataWriter;
32+
import org.apache.iceberg.io.FileIO;
3233
import org.apache.iceberg.io.OutputFile;
3334
import org.apache.iceberg.parquet.Parquet;
3435
import org.slf4j.Logger;
@@ -66,7 +67,10 @@ class RecordWriter {
6667
fileFormat.addExtension(
6768
table.locationProvider().newDataLocation(table.spec(), partitionKey, filename));
6869
}
69-
OutputFile outputFile = table.io().newOutputFile(absoluteFilename);
70+
OutputFile outputFile;
71+
try (FileIO io = table.io()) {
72+
outputFile = io.newOutputFile(absoluteFilename);
73+
}
7074

7175
switch (fileFormat) {
7276
case AVRO:

0 commit comments

Comments
 (0)