Skip to content

Commit ca27526

Browse files
ahmedabu98liferoad
andauthored
Iceberg fileio close (#37168)
* fix(iceberg): prevent premature FileIO closure in RecordWriter Keep FileIO open for writer lifetime to avoid connection pool shutdown issues Add test to verify FileIO remains open until writer close * trigger ITs * also apply in AppendFilesToTables --------- Co-authored-by: liferoad <[email protected]>
1 parent c511260 commit ca27526

File tree

4 files changed

+151
-20
lines changed

4 files changed

+151
-20
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 4
3+
"modification": 1
44
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,14 @@ private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWrit
189189
ManifestWriter<DataFile> writer;
190190
try (FileIO io = table.io()) {
191191
writer = createManifestWriter(table.location(), uuid, spec, io);
192+
for (DataFile file : files) {
193+
writer.add(file);
194+
committedDataFileByteSize.update(file.fileSizeInBytes());
195+
committedDataFileRecordCount.update(file.recordCount());
196+
}
197+
writer.close();
198+
update.appendManifest(writer.toManifestFile());
192199
}
193-
for (DataFile file : files) {
194-
writer.add(file);
195-
committedDataFileByteSize.update(file.fileSizeInBytes());
196-
committedDataFileRecordCount.update(file.recordCount());
197-
}
198-
writer.close();
199-
update.appendManifest(writer.toManifestFile());
200200
}
201201
update.commit();
202202
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iceberg.io.FileIO;
3535
import org.apache.iceberg.io.OutputFile;
3636
import org.apache.iceberg.parquet.Parquet;
37+
import org.checkerframework.checker.nullness.qual.Nullable;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

@@ -46,6 +47,7 @@ class RecordWriter {
4647
private final Table table;
4748
private final String absoluteFilename;
4849
private final FileFormat fileFormat;
50+
private @Nullable FileIO io;
4951

5052
RecordWriter(
5153
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
@@ -72,12 +74,14 @@ class RecordWriter {
7274
}
7375
OutputFile outputFile;
7476
EncryptionKeyMetadata keyMetadata;
75-
try (FileIO io = table.io()) {
76-
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
77-
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
78-
outputFile = encryptedOutputFile.encryptingOutputFile();
79-
keyMetadata = encryptedOutputFile.keyMetadata();
80-
}
77+
// Keep FileIO open for the lifetime of this writer to avoid
78+
// premature shutdown of underlying client pools (e.g., S3),
79+
// which manifests as "Connection pool shut down" (Issue #36438).
80+
this.io = table.io();
81+
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
82+
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
83+
outputFile = encryptedOutputFile.encryptingOutputFile();
84+
keyMetadata = encryptedOutputFile.keyMetadata();
8185

8286
switch (fileFormat) {
8387
case AVRO:
@@ -120,16 +124,38 @@ public void write(Record record) {
120124
}
121125

122126
public void close() throws IOException {
127+
IOException closeError = null;
123128
try {
124129
icebergDataWriter.close();
125130
} catch (IOException e) {
126-
throw new IOException(
127-
String.format(
128-
"Failed to close %s writer for table %s, path: %s",
129-
fileFormat, table.name(), absoluteFilename),
130-
e);
131+
closeError =
132+
new IOException(
133+
String.format(
134+
"Failed to close %s writer for table %s, path: %s",
135+
fileFormat, table.name(), absoluteFilename),
136+
e);
137+
} finally {
138+
// Always attempt to close FileIO and decrement metrics
139+
if (io != null) {
140+
try {
141+
io.close();
142+
} catch (Exception ioCloseError) {
143+
if (closeError != null) {
144+
closeError.addSuppressed(ioCloseError);
145+
} else {
146+
closeError = new IOException("Failed to close FileIO", ioCloseError);
147+
}
148+
} finally {
149+
io = null;
150+
}
151+
}
152+
activeIcebergWriters.dec();
153+
}
154+
155+
if (closeError != null) {
156+
throw closeError;
131157
}
132-
activeIcebergWriters.dec();
158+
133159
DataFile dataFile = icebergDataWriter.toDataFile();
134160
LOG.info(
135161
"Closed {} writer for table '{}' ({} records, {} bytes), path: {}",

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.HashMap;
4646
import java.util.List;
4747
import java.util.Map;
48+
import java.util.UUID;
4849
import org.apache.beam.sdk.schemas.Schema;
4950
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
5051
import org.apache.beam.sdk.values.Row;
@@ -65,6 +66,10 @@
6566
import org.apache.iceberg.catalog.Namespace;
6667
import org.apache.iceberg.catalog.TableIdentifier;
6768
import org.apache.iceberg.hadoop.HadoopCatalog;
69+
import org.apache.iceberg.io.FileIO;
70+
import org.apache.iceberg.io.InputFile;
71+
import org.apache.iceberg.io.OutputFile;
72+
import org.apache.iceberg.io.PositionOutputStream;
6873
import org.apache.iceberg.transforms.Transform;
6974
import org.apache.iceberg.types.Conversions;
7075
import org.apache.iceberg.types.Type;
@@ -83,6 +88,7 @@
8388
import org.junit.rules.TestName;
8489
import org.junit.runner.RunWith;
8590
import org.junit.runners.JUnit4;
91+
import org.mockito.Mockito;
8692

8793
/** Test class for {@link RecordWriterManager}. */
8894
@RunWith(JUnit4.class)
@@ -950,6 +956,105 @@ public void testDefaultMetrics() throws IOException {
950956
}
951957
}
952958

959+
@Test
960+
public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
961+
TableIdentifier tableId =
962+
TableIdentifier.of(
963+
"default",
964+
"table_"
965+
+ testName.getMethodName()
966+
+ "_"
967+
+ UUID.randomUUID().toString().replace("-", "").substring(0, 6));
968+
Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA);
969+
970+
CloseTrackingFileIO trackingFileIO = new CloseTrackingFileIO(table.io());
971+
Table spyTable = Mockito.spy(table);
972+
Mockito.doReturn(trackingFileIO).when(spyTable).io();
973+
974+
PartitionKey partitionKey = new PartitionKey(spyTable.spec(), spyTable.schema());
975+
RecordWriter writer =
976+
new RecordWriter(spyTable, FileFormat.PARQUET, "file.parquet", partitionKey);
977+
978+
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
979+
980+
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
981+
writer.close();
982+
983+
assertTrue("FileIO should be closed after writer close", trackingFileIO.closed);
984+
}
985+
986+
private static final class CloseTrackingFileIO implements FileIO {
987+
private final FileIO delegate;
988+
volatile boolean closed = false;
989+
990+
CloseTrackingFileIO(FileIO delegate) {
991+
this.delegate = delegate;
992+
}
993+
994+
@Override
995+
public InputFile newInputFile(String path) {
996+
return delegate.newInputFile(path);
997+
}
998+
999+
@Override
1000+
public OutputFile newOutputFile(String path) {
1001+
OutputFile underlying = delegate.newOutputFile(path);
1002+
return new CloseAwareOutputFile(underlying, this);
1003+
}
1004+
1005+
@Override
1006+
public void deleteFile(String path) {
1007+
delegate.deleteFile(path);
1008+
}
1009+
1010+
@Override
1011+
public Map<String, String> properties() {
1012+
return delegate.properties();
1013+
}
1014+
1015+
@Override
1016+
public void close() {
1017+
closed = true;
1018+
delegate.close();
1019+
}
1020+
}
1021+
1022+
private static final class CloseAwareOutputFile implements OutputFile {
1023+
private final OutputFile delegate;
1024+
private final CloseTrackingFileIO io;
1025+
1026+
CloseAwareOutputFile(OutputFile delegate, CloseTrackingFileIO io) {
1027+
this.delegate = delegate;
1028+
this.io = io;
1029+
}
1030+
1031+
@Override
1032+
public PositionOutputStream create() {
1033+
if (io.closed) {
1034+
throw new IllegalStateException("Connection pool shut down");
1035+
}
1036+
return delegate.create();
1037+
}
1038+
1039+
@Override
1040+
public PositionOutputStream createOrOverwrite() {
1041+
if (io.closed) {
1042+
throw new IllegalStateException("Connection pool shut down");
1043+
}
1044+
return delegate.createOrOverwrite();
1045+
}
1046+
1047+
@Override
1048+
public String location() {
1049+
return delegate.location();
1050+
}
1051+
1052+
@Override
1053+
public InputFile toInputFile() {
1054+
return delegate.toInputFile();
1055+
}
1056+
}
1057+
9531058
@Test
9541059
public void testGetOrCreateTable_refreshLogic() {
9551060
Table mockTable = mock(Table.class);

0 commit comments

Comments
 (0)