Skip to content

Commit f807533

Browse files
authored
Refresh Iceberg Table metadata every 2 minutes instead of before each intermediate file written (#37102)
* Initial implementation * Swap back to vendored guava * Update staleness threshold to 2 minutes to match pr title
1 parent 547ab60 commit f807533

File tree

2 files changed

+115
-18
lines changed

2 files changed

+115
-18
lines changed

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2222

2323
import java.io.IOException;
24+
import java.time.Duration;
25+
import java.time.Instant;
2426
import java.time.LocalDateTime;
2527
import java.time.YearMonth;
2628
import java.time.ZoneOffset;
@@ -135,7 +137,8 @@ class DestinationState {
135137
RuntimeException rethrow =
136138
new RuntimeException(
137139
String.format(
138-
"Encountered an error when closing data writer for table '%s', path: %s",
140+
"Encountered an error when closing data writer for table '%s',"
141+
+ " path: %s",
139142
icebergDestination.getTableIdentifier(), recordWriter.path()),
140143
e);
141144
exceptions.add(rethrow);
@@ -256,8 +259,40 @@ static String getPartitionDataPath(
256259
private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
257260
totalSerializableDataFiles = Maps.newHashMap();
258261

262+
static final class LastRefreshedTable {
263+
final Table table;
264+
volatile Instant lastRefreshTime;
265+
static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2);
266+
267+
LastRefreshedTable(Table table, Instant lastRefreshTime) {
268+
this.table = table;
269+
this.lastRefreshTime = lastRefreshTime;
270+
}
271+
272+
/**
273+
* Refreshes the table metadata if it is considered stale (older than 2 minutes).
274+
*
275+
* <p>This method first performs a non-synchronized check on the table's freshness. This
276+
* provides a lock-free fast path that avoids synchronization overhead in the common case where
277+
* the table does not need to be refreshed. If the table might be stale, it then enters a
278+
* synchronized block to ensure that only one thread performs the refresh operation.
279+
*/
280+
void refreshIfStale() {
281+
// Fast path: Avoid entering the synchronized block if the table is not stale.
282+
if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) {
283+
return;
284+
}
285+
synchronized (this) {
286+
if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) {
287+
table.refresh();
288+
lastRefreshTime = Instant.now();
289+
}
290+
}
291+
}
292+
}
293+
259294
@VisibleForTesting
260-
static final Cache<TableIdentifier, Table> TABLE_CACHE =
295+
static final Cache<TableIdentifier, LastRefreshedTable> LAST_REFRESHED_TABLE_CACHE =
261296
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
262297

263298
private boolean isClosed = false;
@@ -272,22 +307,22 @@ static String getPartitionDataPath(
272307
/**
273308
* Returns an Iceberg {@link Table}.
274309
*
275-
* <p>First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we
276-
* attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
277-
* create it, inferring the table schema from the record schema.
310+
* <p>First attempts to fetch the table from the {@link #LAST_REFRESHED_TABLE_CACHE}. If it's not
311+
* there, we attempt to load it using the Iceberg API. If the table doesn't exist at all, we
312+
* attempt to create it, inferring the table schema from the record schema.
278313
*
279314
* <p>Note that this is a best-effort operation that depends on the {@link Catalog}
280315
* implementation. Although it is expected, some implementations may not support creating a table
281316
* using the Iceberg API.
282317
*/
283-
private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
318+
@VisibleForTesting
319+
Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
284320
TableIdentifier identifier = destination.getTableIdentifier();
285-
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
286-
if (table != null) {
287-
// If fetching from cache, refresh the table to avoid working with stale metadata
288-
// (e.g. partition spec)
289-
table.refresh();
290-
return table;
321+
@Nullable
322+
LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier);
323+
if (lastRefreshedTable != null && lastRefreshedTable.table != null) {
324+
lastRefreshedTable.refreshIfStale();
325+
return lastRefreshedTable.table;
291326
}
292327

293328
Namespace namespace = identifier.namespace();
@@ -299,7 +334,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
299334
? createConfig.getTableProperties()
300335
: Maps.newHashMap();
301336

302-
synchronized (TABLE_CACHE) {
337+
@Nullable Table table = null;
338+
synchronized (LAST_REFRESHED_TABLE_CACHE) {
303339
// Create namespace if it does not exist yet
304340
if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
305341
SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
@@ -323,7 +359,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
323359
try {
324360
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
325361
LOG.info(
326-
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
362+
"Created Iceberg table '{}' with schema: {}\n"
363+
+ ", partition spec: {}, table properties: {}",
327364
identifier,
328365
tableSchema,
329366
partitionSpec,
@@ -334,8 +371,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
334371
}
335372
}
336373
}
337-
338-
TABLE_CACHE.put(identifier, table);
374+
lastRefreshedTable = new LastRefreshedTable(table, Instant.now());
375+
LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable);
339376
return table;
340377
}
341378

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@
2828
import static org.junit.Assert.assertNotNull;
2929
import static org.junit.Assert.assertThrows;
3030
import static org.junit.Assert.assertTrue;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.never;
33+
import static org.mockito.Mockito.times;
34+
import static org.mockito.Mockito.verify;
3135

3236
import java.io.IOException;
3337
import java.net.URLEncoder;
3438
import java.nio.ByteBuffer;
39+
import java.time.Duration;
40+
import java.time.Instant;
3541
import java.time.LocalDate;
3642
import java.time.LocalDateTime;
3743
import java.time.LocalTime;
@@ -102,7 +108,7 @@ public void setUp() {
102108
windowedDestination =
103109
getWindowedDestination("table_" + testName.getMethodName(), PARTITION_SPEC);
104110
catalog = new HadoopCatalog(new Configuration(), warehouse.location);
105-
RecordWriterManager.TABLE_CACHE.invalidateAll();
111+
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();
106112
}
107113

108114
private WindowedValue<IcebergDestination> getWindowedDestination(
@@ -451,10 +457,15 @@ public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException {
451457
assertThat(dataFile.path().toString(), containsString("bool=true"));
452458

453459
// table is cached
454-
assertEquals(1, RecordWriterManager.TABLE_CACHE.size());
460+
assertEquals(1, RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.size());
455461

456462
// update spec
457463
table.updateSpec().addField("id").removeField("bool").commit();
464+
// Make the cached table stale to force reloading its metadata.
465+
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.getIfPresent(
466+
windowedDestination.getValue().getTableIdentifier())
467+
.lastRefreshTime =
468+
Instant.EPOCH;
458469

459470
// write a second data file
460471
// should refresh the table and use the new partition spec
@@ -938,4 +949,53 @@ public void testDefaultMetrics() throws IOException {
938949
}
939950
}
940951
}
952+
953+
@Test
954+
public void testGetOrCreateTable_refreshLogic() {
955+
Table mockTable = mock(Table.class);
956+
TableIdentifier identifier = TableIdentifier.of("db", "table");
957+
IcebergDestination destination =
958+
IcebergDestination.builder()
959+
.setTableIdentifier(identifier)
960+
.setFileFormat(FileFormat.PARQUET)
961+
.setTableCreateConfig(
962+
IcebergTableCreateConfig.builder()
963+
.setPartitionFields(null)
964+
.setSchema(BEAM_SCHEMA)
965+
.build())
966+
.build();
967+
// The schema is only used if the table is created, so a null is fine for this
968+
// test.
969+
Schema beamSchema = null;
970+
971+
// Instantiate a RecordWriterManager with a dummy catalog.
972+
RecordWriterManager writer = new RecordWriterManager(null, "p", 1L, 1);
973+
974+
// Clean up cache before test
975+
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();
976+
977+
// --- 1. Test the fast path (entry is not stale) ---
978+
Instant freshTimestamp = Instant.now().minus(Duration.ofMinutes(1));
979+
RecordWriterManager.LastRefreshedTable freshEntry =
980+
new RecordWriterManager.LastRefreshedTable(mockTable, freshTimestamp);
981+
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, freshEntry);
982+
983+
// Access the table
984+
writer.getOrCreateTable(destination, beamSchema);
985+
986+
// Verify that refresh() was NOT called because the entry is fresh.
987+
verify(mockTable, never()).refresh();
988+
989+
// --- 2. Test the stale path (entry is stale) ---
990+
Instant staleTimestamp = Instant.now().minus(Duration.ofMinutes(5));
991+
RecordWriterManager.LastRefreshedTable staleEntry =
992+
new RecordWriterManager.LastRefreshedTable(mockTable, staleTimestamp);
993+
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, staleEntry);
994+
995+
// Access the table again
996+
writer.getOrCreateTable(destination, beamSchema);
997+
998+
// Verify that refresh() WAS called exactly once because the entry was stale.
999+
verify(mockTable, times(1)).refresh();
1000+
}
9411001
}

0 commit comments

Comments
 (0)