Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -135,7 +137,8 @@ class DestinationState {
RuntimeException rethrow =
new RuntimeException(
String.format(
"Encountered an error when closing data writer for table '%s', path: %s",
"Encountered an error when closing data writer for table '%s',"
+ " path: %s",
icebergDestination.getTableIdentifier(), recordWriter.path()),
e);
exceptions.add(rethrow);
Expand Down Expand Up @@ -256,8 +259,40 @@ static String getPartitionDataPath(
private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();

static final class LastRefreshedTable {
final Table table;
volatile Instant lastRefreshTime;
static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2);

LastRefreshedTable(Table table, Instant lastRefreshTime) {
this.table = table;
this.lastRefreshTime = lastRefreshTime;
}

/**
* Refreshes the table metadata if it is considered stale (older than 2 minutes).
*
* <p>This method first performs a non-synchronized check on the table's freshness. This
* provides a lock-free fast path that avoids synchronization overhead in the common case where
* the table does not need to be refreshed. If the table might be stale, it then enters a
* synchronized block to ensure that only one thread performs the refresh operation.
*/
void refreshIfStale() {
// Fast path: Avoid entering the synchronized block if the table is not stale.
if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) {
return;
}
synchronized (this) {
if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) {
table.refresh();
lastRefreshTime = Instant.now();
}
}
}
}

@VisibleForTesting
static final Cache<TableIdentifier, Table> TABLE_CACHE =
static final Cache<TableIdentifier, LastRefreshedTable> LAST_REFRESHED_TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

private boolean isClosed = false;
Expand All @@ -272,22 +307,22 @@ static String getPartitionDataPath(
/**
* Returns an Iceberg {@link Table}.
*
* <p>First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we
* attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
* create it, inferring the table schema from the record schema.
* <p>First attempts to fetch the table from the {@link #LAST_REFRESHED_TABLE_CACHE}. If it's not
* there, we attempt to load it using the Iceberg API. If the table doesn't exist at all, we
* attempt to create it, inferring the table schema from the record schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog}
* implementation. Although it is expected, some implementations may not support creating a table
* using the Iceberg API.
*/
private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
@VisibleForTesting
Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
TableIdentifier identifier = destination.getTableIdentifier();
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table != null) {
// If fetching from cache, refresh the table to avoid working with stale metadata
// (e.g. partition spec)
table.refresh();
return table;
@Nullable
LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier);
if (lastRefreshedTable != null && lastRefreshedTable.table != null) {
lastRefreshedTable.refreshIfStale();
return lastRefreshedTable.table;
}

Namespace namespace = identifier.namespace();
Expand All @@ -299,7 +334,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
? createConfig.getTableProperties()
: Maps.newHashMap();

synchronized (TABLE_CACHE) {
@Nullable Table table = null;
synchronized (LAST_REFRESHED_TABLE_CACHE) {
// Create namespace if it does not exist yet
if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
Expand All @@ -323,7 +359,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
try {
table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
LOG.info(
"Created Iceberg table '{}' with schema: {}\n, partition spec: {}, table properties: {}",
"Created Iceberg table '{}' with schema: {}\n"
+ ", partition spec: {}, table properties: {}",
identifier,
tableSchema,
partitionSpec,
Expand All @@ -334,8 +371,8 @@ private Table getOrCreateTable(IcebergDestination destination, Schema dataSchema
}
}
}

TABLE_CACHE.put(identifier, table);
lastRefreshedTable = new LastRefreshedTable(table, Instant.now());
LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable);
return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -102,7 +108,7 @@ public void setUp() {
windowedDestination =
getWindowedDestination("table_" + testName.getMethodName(), PARTITION_SPEC);
catalog = new HadoopCatalog(new Configuration(), warehouse.location);
RecordWriterManager.TABLE_CACHE.invalidateAll();
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();
}

private WindowedValue<IcebergDestination> getWindowedDestination(
Expand Down Expand Up @@ -451,10 +457,15 @@ public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException {
assertThat(dataFile.path().toString(), containsString("bool=true"));

// table is cached
assertEquals(1, RecordWriterManager.TABLE_CACHE.size());
assertEquals(1, RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.size());

// update spec
table.updateSpec().addField("id").removeField("bool").commit();
// Make the cached table stale to force reloading its metadata.
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.getIfPresent(
windowedDestination.getValue().getTableIdentifier())
.lastRefreshTime =
Instant.EPOCH;

// write a second data file
// should refresh the table and use the new partition spec
Expand Down Expand Up @@ -938,4 +949,53 @@ public void testDefaultMetrics() throws IOException {
}
}
}

@Test
public void testGetOrCreateTable_refreshLogic() {
Table mockTable = mock(Table.class);
TableIdentifier identifier = TableIdentifier.of("db", "table");
IcebergDestination destination =
IcebergDestination.builder()
.setTableIdentifier(identifier)
.setFileFormat(FileFormat.PARQUET)
.setTableCreateConfig(
IcebergTableCreateConfig.builder()
.setPartitionFields(null)
.setSchema(BEAM_SCHEMA)
.build())
.build();
// The schema is only used if the table is created, so a null is fine for this
// test.
Schema beamSchema = null;

// Instantiate a RecordWriterManager with a dummy catalog.
RecordWriterManager writer = new RecordWriterManager(null, "p", 1L, 1);

// Clean up cache before test
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();

// --- 1. Test the fast path (entry is not stale) ---
Instant freshTimestamp = Instant.now().minus(Duration.ofMinutes(1));
RecordWriterManager.LastRefreshedTable freshEntry =
new RecordWriterManager.LastRefreshedTable(mockTable, freshTimestamp);
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, freshEntry);

// Access the table
writer.getOrCreateTable(destination, beamSchema);

// Verify that refresh() was NOT called because the entry is fresh.
verify(mockTable, never()).refresh();

// --- 2. Test the stale path (entry is stale) ---
Instant staleTimestamp = Instant.now().minus(Duration.ofMinutes(5));
RecordWriterManager.LastRefreshedTable staleEntry =
new RecordWriterManager.LastRefreshedTable(mockTable, staleTimestamp);
RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, staleEntry);

// Access the table again
writer.getOrCreateTable(destination, beamSchema);

// Verify that refresh() WAS called exactly once because the entry was stale.
verify(mockTable, times(1)).refresh();
}
}
Loading