From a23a5a91fdc5eda3f7d092ecbac66d32056d8160 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 23 Oct 2025 11:33:43 -0400 Subject: [PATCH 1/3] fix(iceberg): prevent premature FileIO closure in RecordWriter Keep FileIO open for writer lifetime to avoid connection pool shutdown issues Add test to verify FileIO remains open until writer close --- .../beam/sdk/io/iceberg/RecordWriter.java | 50 +++++++-- .../io/iceberg/RecordWriterManagerTest.java | 105 ++++++++++++++++++ 2 files changed, 143 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index d4a61c6d3e17..d233b0ac05b5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -34,6 +34,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ class RecordWriter { private final Table table; private final String absoluteFilename; private final FileFormat fileFormat; + private @Nullable FileIO io; RecordWriter( Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) @@ -72,12 +74,14 @@ class RecordWriter { } OutputFile outputFile; EncryptionKeyMetadata keyMetadata; - try (FileIO io = table.io()) { - OutputFile tmpFile = io.newOutputFile(absoluteFilename); - EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile); - outputFile = encryptedOutputFile.encryptingOutputFile(); - keyMetadata = encryptedOutputFile.keyMetadata(); - } + // Keep FileIO open for the lifetime of this writer to avoid + // premature shutdown of underlying client pools (e.g., S3), + // which manifests as "Connection pool shut down" (Issue #36438). + this.io = table.io(); + OutputFile tmpFile = io.newOutputFile(absoluteFilename); + EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile); + outputFile = encryptedOutputFile.encryptingOutputFile(); + keyMetadata = encryptedOutputFile.keyMetadata(); switch (fileFormat) { case AVRO: @@ -120,16 +124,38 @@ public void write(Record record) { } public void close() throws IOException { + IOException closeError = null; try { icebergDataWriter.close(); } catch (IOException e) { - throw new IOException( - String.format( - "Failed to close %s writer for table %s, path: %s", - fileFormat, table.name(), absoluteFilename), - e); + closeError = + new IOException( + String.format( + "Failed to close %s writer for table %s, path: %s", + fileFormat, table.name(), absoluteFilename), + e); + } finally { + // Always attempt to close FileIO and decrement metrics + if (io != null) { + try { + io.close(); + } catch (Exception ioCloseError) { + if (closeError != null) { + closeError.addSuppressed(ioCloseError); + } else { + closeError = new IOException("Failed to close FileIO", ioCloseError); + } + } finally { + io = null; + } + } + activeIcebergWriters.dec(); + } + + if (closeError != null) { + throw closeError; } - activeIcebergWriters.dec(); + DataFile dataFile = icebergDataWriter.toDataFile(); LOG.info( "Closed {} writer for table '{}' ({} records, {} bytes), path: {}", diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 36b74967f0b2..4746fc7b4bd9 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; @@ -59,6 +60,10 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; @@ -77,6 +82,7 @@ import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; /** Test class for {@link RecordWriterManager}. */ @RunWith(JUnit4.class) @@ -938,4 +944,103 @@ public void testDefaultMetrics() throws IOException { } } } + + @Test + public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException { + TableIdentifier tableId = + TableIdentifier.of( + "default", + "table_" + + testName.getMethodName() + + "_" + + UUID.randomUUID().toString().replace("-", "").substring(0, 6)); + Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA); + + CloseTrackingFileIO trackingFileIO = new CloseTrackingFileIO(table.io()); + Table spyTable = Mockito.spy(table); + Mockito.doReturn(trackingFileIO).when(spyTable).io(); + + PartitionKey partitionKey = new PartitionKey(spyTable.spec(), spyTable.schema()); + RecordWriter writer = + new RecordWriter(spyTable, FileFormat.PARQUET, "file.parquet", partitionKey); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + + writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + writer.close(); + + assertTrue("FileIO should be closed after writer close", trackingFileIO.closed); + } + + private static final class CloseTrackingFileIO implements FileIO { + private final FileIO delegate; + volatile boolean closed = false; + + CloseTrackingFileIO(FileIO delegate) { + this.delegate = delegate; + } + + @Override + public InputFile newInputFile(String path) { + return delegate.newInputFile(path); + } + + @Override + public OutputFile newOutputFile(String path) { + OutputFile underlying = delegate.newOutputFile(path); + return new CloseAwareOutputFile(underlying, this); + } + + @Override + public void deleteFile(String path) { + delegate.deleteFile(path); + } + + @Override + public Map properties() { + return delegate.properties(); + } + + @Override + public void close() { + closed = true; + delegate.close(); + } + } + + private static final class CloseAwareOutputFile implements OutputFile { + private final OutputFile delegate; + private final CloseTrackingFileIO io; + + CloseAwareOutputFile(OutputFile delegate, CloseTrackingFileIO io) { + this.delegate = delegate; + this.io = io; + } + + @Override + public PositionOutputStream create() { + if (io.closed) { + throw new IllegalStateException("Connection pool shut down"); + } + return delegate.create(); + } + + @Override + public PositionOutputStream createOrOverwrite() { + if (io.closed) { + throw new IllegalStateException("Connection pool shut down"); + } + return delegate.createOrOverwrite(); + } + + @Override + public String location() { + return delegate.location(); + } + + @Override + public InputFile toInputFile() { + return delegate.toInputFile(); + } + } } From 9ac45c3319b01256c6728995deb1272aa3295c0b Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 22 Dec 2025 12:38:39 -0600 Subject: [PATCH 2/3] trigger ITs --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 1 } From fa4237c0f5204c99832c72b2fdafbd77abcfd31e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 22 Dec 2025 15:13:27 -0600 Subject: [PATCH 3/3] also apply in AppendFilesToTables --- .../beam/sdk/io/iceberg/AppendFilesToTables.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 12888b4e4e06..1789932d69a7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -189,14 +189,14 @@ private void appendManifestFiles(Table table, Iterable fileWrit ManifestWriter writer; try (FileIO io = table.io()) { writer = createManifestWriter(table.location(), uuid, spec, io); + for (DataFile file : files) { + writer.add(file); + committedDataFileByteSize.update(file.fileSizeInBytes()); + committedDataFileRecordCount.update(file.recordCount()); + } + writer.close(); + update.appendManifest(writer.toManifestFile()); } - for (DataFile file : files) { - writer.add(file); - committedDataFileByteSize.update(file.fileSizeInBytes()); - committedDataFileRecordCount.update(file.recordCount()); - } - writer.close(); - update.appendManifest(writer.toManifestFile()); } update.commit(); }