From cbfd38c60e34de92b8985f8d589648c66ab8298c Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Fri, 12 Dec 2025 22:38:18 +0000 Subject: [PATCH 1/3] Initial implementation --- .../sdk/io/iceberg/RecordWriterManager.java | 87 +++++++++++++------ .../io/iceberg/RecordWriterManagerTest.java | 64 +++++++++++++- 2 files changed, 124 insertions(+), 27 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 6ddd943eb198..21792855636f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -17,10 +17,19 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.time.LocalDateTime; import java.time.YearMonth; import java.time.ZoneOffset; @@ -35,13 +44,6 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionField; @@ -135,7 +137,8 @@ class DestinationState { RuntimeException rethrow = new RuntimeException( String.format( - "Encountered an error when closing data writer for table '%s', path: %s", + "Encountered an error when closing data writer for table '%s'," + + " path: %s", icebergDestination.getTableIdentifier(), recordWriter.path()), e); exceptions.add(rethrow); @@ -256,8 +259,40 @@ static String getPartitionDataPath( private final Map, List> totalSerializableDataFiles = Maps.newHashMap(); + static final class LastRefreshedTable { + final Table table; + volatile Instant lastRefreshTime; + static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(3); + + LastRefreshedTable(Table table, Instant lastRefreshTime) { + this.table = table; + this.lastRefreshTime = lastRefreshTime; + } + + /** + * Refreshes the table metadata if it is considered stale (older than 3 minutes). + * + *

This method first performs a non-synchronized check on the table's freshness. This + * provides a lock-free fast path that avoids synchronization overhead in the common case where + * the table does not need to be refreshed. If the table might be stale, it then enters a + * synchronized block to ensure that only one thread performs the refresh operation. + */ + void refreshIfStale() { + // Fast path: Avoid entering the synchronized block if the table is not stale. + if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) { + return; + } + synchronized (this) { + if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) { + table.refresh(); + lastRefreshTime = Instant.now(); + } + } + } + } + @VisibleForTesting - static final Cache TABLE_CACHE = + static final Cache LAST_REFRESHED_TABLE_CACHE = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); private boolean isClosed = false; @@ -272,22 +307,22 @@ static String getPartitionDataPath( /** * Returns an Iceberg {@link Table}. * - *

First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we - * attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to - * create it, inferring the table schema from the record schema. + *

First attempts to fetch the table from the {@link #LAST_REFRESHED_TABLE_CACHE}. If it's not + * there, we attempt to load it using the Iceberg API. If the table doesn't exist at all, we + * attempt to create it, inferring the table schema from the record schema. * *

Note that this is a best-effort operation that depends on the {@link Catalog} * implementation. Although it is expected, some implementations may not support creating a table * using the Iceberg API. */ - private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { + @VisibleForTesting + Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { TableIdentifier identifier = destination.getTableIdentifier(); - @Nullable Table table = TABLE_CACHE.getIfPresent(identifier); - if (table != null) { - // If fetching from cache, refresh the table to avoid working with stale metadata - // (e.g. partition spec) - table.refresh(); - return table; + @Nullable + LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); + if (lastRefreshedTable != null && lastRefreshedTable.table != null) { + lastRefreshedTable.refreshIfStale(); + return lastRefreshedTable.table; } Namespace namespace = identifier.namespace(); @@ -299,7 +334,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema ? createConfig.getTableProperties() : Maps.newHashMap(); - synchronized (TABLE_CACHE) { + @Nullable Table table = null; + synchronized (LAST_REFRESHED_TABLE_CACHE) { // Create namespace if it does not exist yet if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) { SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog; @@ -323,7 +359,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema try { table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties); LOG.info( - "Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}", + "Created Iceberg table '{}' with schema: {}\n" + + ", partition spec: {}, table properties: {}", identifier, tableSchema, partitionSpec, @@ -334,8 +371,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema } } } - - TABLE_CACHE.put(identifier, table); + lastRefreshedTable = new LastRefreshedTable(table, Instant.now()); + LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable); return table; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 36b74967f0b2..7bce0b16cb16 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -28,10 +28,16 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.net.URLEncoder; import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -102,7 +108,7 @@ public void setUp() { windowedDestination = getWindowedDestination("table_" + testName.getMethodName(), PARTITION_SPEC); catalog = new HadoopCatalog(new Configuration(), warehouse.location); - RecordWriterManager.TABLE_CACHE.invalidateAll(); + RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll(); } private WindowedValue getWindowedDestination( @@ -451,10 +457,15 @@ public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException { assertThat(dataFile.path().toString(), containsString("bool=true")); // table is cached - assertEquals(1, RecordWriterManager.TABLE_CACHE.size()); + assertEquals(1, RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.size()); // update spec table.updateSpec().addField("id").removeField("bool").commit(); + // Make the cached table stale to force reloading its metadata. + RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.getIfPresent( + windowedDestination.getValue().getTableIdentifier()) + .lastRefreshTime = + Instant.EPOCH; // write a second data file // should refresh the table and use the new partition spec @@ -938,4 +949,53 @@ public void testDefaultMetrics() throws IOException { } } } + + @Test + public void testGetOrCreateTable_refreshLogic() { + Table mockTable = mock(Table.class); + TableIdentifier identifier = TableIdentifier.of("db", "table"); + IcebergDestination destination = + IcebergDestination.builder() + .setTableIdentifier(identifier) + .setFileFormat(FileFormat.PARQUET) + .setTableCreateConfig( + IcebergTableCreateConfig.builder() + .setPartitionFields(null) + .setSchema(BEAM_SCHEMA) + .build()) + .build(); + // The schema is only used if the table is created, so a null is fine for this + // test. + Schema beamSchema = null; + + // Instantiate a RecordWriterManager with a dummy catalog. + RecordWriterManager writer = new RecordWriterManager(null, "p", 1L, 1); + + // Clean up cache before test + RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll(); + + // --- 1. Test the fast path (entry is not stale) --- + Instant freshTimestamp = Instant.now().minus(Duration.ofMinutes(1)); + RecordWriterManager.LastRefreshedTable freshEntry = + new RecordWriterManager.LastRefreshedTable(mockTable, freshTimestamp); + RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, freshEntry); + + // Access the table + writer.getOrCreateTable(destination, beamSchema); + + // Verify that refresh() was NOT called because the entry is fresh. + verify(mockTable, never()).refresh(); + + // --- 2. Test the stale path (entry is stale) --- + Instant staleTimestamp = Instant.now().minus(Duration.ofMinutes(5)); + RecordWriterManager.LastRefreshedTable staleEntry = + new RecordWriterManager.LastRefreshedTable(mockTable, staleTimestamp); + RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, staleEntry); + + // Access the table again + writer.getOrCreateTable(destination, beamSchema); + + // Verify that refresh() WAS called exactly once because the entry was stale. + verify(mockTable, times(1)).refresh(); + } } From d4b0171e21a66480abd3a455268286bee3edfc2e Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Fri, 12 Dec 2025 22:56:42 +0000 Subject: [PATCH 2/3] Swap back to vendored guava --- .../sdk/io/iceberg/RecordWriterManager.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 21792855636f..cbd284a68d72 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -17,16 +17,9 @@ */ package org.apache.beam.sdk.io.iceberg; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Splitter; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -44,6 +37,13 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionField; From 8268afd8178ce59f95b21e2dd2647dec9fe5d743 Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Fri, 12 Dec 2025 23:59:12 +0000 Subject: [PATCH 3/3] Update staleness threshold to 2 minutes to match pr title --- .../org/apache/beam/sdk/io/iceberg/RecordWriterManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index cbd284a68d72..da62fb658846 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -262,7 +262,7 @@ static String getPartitionDataPath( static final class LastRefreshedTable { final Table table; volatile Instant lastRefreshTime; - static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(3); + static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2); LastRefreshedTable(Table table, Instant lastRefreshTime) { this.table = table; @@ -270,7 +270,7 @@ static final class LastRefreshedTable { } /** - * Refreshes the table metadata if it is considered stale (older than 3 minutes). + * Refreshes the table metadata if it is considered stale (older than 2 minutes). * *

This method first performs a non-synchronized check on the table's freshness. This * provides a lock-free fast path that avoids synchronization overhead in the common case where