diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 34a6e02150e7..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 4 + "modification": 1 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 12888b4e4e06..1789932d69a7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -189,14 +189,14 @@ private void appendManifestFiles(Table table, Iterable fileWrit ManifestWriter writer; try (FileIO io = table.io()) { writer = createManifestWriter(table.location(), uuid, spec, io); + for (DataFile file : files) { + writer.add(file); + committedDataFileByteSize.update(file.fileSizeInBytes()); + committedDataFileRecordCount.update(file.recordCount()); + } + writer.close(); + update.appendManifest(writer.toManifestFile()); } - for (DataFile file : files) { - writer.add(file); - committedDataFileByteSize.update(file.fileSizeInBytes()); - committedDataFileRecordCount.update(file.recordCount()); - } - writer.close(); - update.appendManifest(writer.toManifestFile()); } update.commit(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index d4a61c6d3e17..d233b0ac05b5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -34,6 +34,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ class RecordWriter { private final Table table; private final String absoluteFilename; private final FileFormat fileFormat; + private @Nullable FileIO io; RecordWriter( Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) @@ -72,12 +74,14 @@ class RecordWriter { } OutputFile outputFile; EncryptionKeyMetadata keyMetadata; - try (FileIO io = table.io()) { - OutputFile tmpFile = io.newOutputFile(absoluteFilename); - EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile); - outputFile = encryptedOutputFile.encryptingOutputFile(); - keyMetadata = encryptedOutputFile.keyMetadata(); - } + // Keep FileIO open for the lifetime of this writer to avoid + // premature shutdown of underlying client pools (e.g., S3), + // which manifests as "Connection pool shut down" (Issue #36438). + this.io = table.io(); + OutputFile tmpFile = io.newOutputFile(absoluteFilename); + EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile); + outputFile = encryptedOutputFile.encryptingOutputFile(); + keyMetadata = encryptedOutputFile.keyMetadata(); switch (fileFormat) { case AVRO: @@ -120,16 +124,38 @@ public void write(Record record) { } public void close() throws IOException { + IOException closeError = null; try { icebergDataWriter.close(); } catch (IOException e) { - throw new IOException( - String.format( - "Failed to close %s writer for table %s, path: %s", - fileFormat, table.name(), absoluteFilename), - e); + closeError = + new IOException( + String.format( + "Failed to close %s writer for table %s, path: %s", + fileFormat, table.name(), absoluteFilename), + e); + } finally { + // Always attempt to close FileIO and decrement metrics + if (io != null) { + try { + io.close(); + } catch (Exception ioCloseError) { + if (closeError != null) { + closeError.addSuppressed(ioCloseError); + } else { + closeError = new IOException("Failed to close FileIO", ioCloseError); + } + } finally { + io = null; + } + } + activeIcebergWriters.dec(); + } + + if (closeError != null) { + throw closeError; } - activeIcebergWriters.dec(); + DataFile dataFile = icebergDataWriter.toDataFile(); LOG.info( "Closed {} writer for table '{}' ({} records, {} bytes), path: {}", diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 7bce0b16cb16..375d90737117 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; @@ -65,6 +66,10 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; @@ -83,6 +88,7 @@ import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; /** Test class for {@link RecordWriterManager}. */ @RunWith(JUnit4.class) @@ -950,6 +956,105 @@ public void testDefaultMetrics() throws IOException { } } + @Test + public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException { + TableIdentifier tableId = + TableIdentifier.of( + "default", + "table_" + + testName.getMethodName() + + "_" + + UUID.randomUUID().toString().replace("-", "").substring(0, 6)); + Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA); + + CloseTrackingFileIO trackingFileIO = new CloseTrackingFileIO(table.io()); + Table spyTable = Mockito.spy(table); + Mockito.doReturn(trackingFileIO).when(spyTable).io(); + + PartitionKey partitionKey = new PartitionKey(spyTable.spec(), spyTable.schema()); + RecordWriter writer = + new RecordWriter(spyTable, FileFormat.PARQUET, "file.parquet", partitionKey); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + + writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + writer.close(); + + assertTrue("FileIO should be closed after writer close", trackingFileIO.closed); + } + + private static final class CloseTrackingFileIO implements FileIO { + private final FileIO delegate; + volatile boolean closed = false; + + CloseTrackingFileIO(FileIO delegate) { + this.delegate = delegate; + } + + @Override + public InputFile newInputFile(String path) { + return delegate.newInputFile(path); + } + + @Override + public OutputFile newOutputFile(String path) { + OutputFile underlying = delegate.newOutputFile(path); + return new CloseAwareOutputFile(underlying, this); + } + + @Override + public void deleteFile(String path) { + delegate.deleteFile(path); + } + + @Override + public Map properties() { + return delegate.properties(); + } + + @Override + public void close() { + closed = true; + delegate.close(); + } + } + + private static final class CloseAwareOutputFile implements OutputFile { + private final OutputFile delegate; + private final CloseTrackingFileIO io; + + CloseAwareOutputFile(OutputFile delegate, CloseTrackingFileIO io) { + this.delegate = delegate; + this.io = io; + } + + @Override + public PositionOutputStream create() { + if (io.closed) { + throw new IllegalStateException("Connection pool shut down"); + } + return delegate.create(); + } + + @Override + public PositionOutputStream createOrOverwrite() { + if (io.closed) { + throw new IllegalStateException("Connection pool shut down"); + } + return delegate.createOrOverwrite(); + } + + @Override + public String location() { + return delegate.location(); + } + + @Override + public InputFile toInputFile() { + return delegate.toInputFile(); + } + } + @Test public void testGetOrCreateTable_refreshLogic() { Table mockTable = mock(Table.class);