Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.fluss.lake.iceberg.tiering;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
Expand All @@ -28,6 +30,7 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -61,12 +64,20 @@ public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult, I

private final Catalog icebergCatalog;
private final Table icebergTable;
private final boolean isAutoSnapshotExpiration;
private static final ThreadLocal<Long> currentCommitSnapshotId = new ThreadLocal<>();

public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider, TablePath tablePath)
public IcebergLakeCommitter(
IcebergCatalogProvider icebergCatalogProvider,
CommitterInitContext committerInitContext)
throws IOException {
this.icebergCatalog = icebergCatalogProvider.get();
this.icebergTable = getTable(tablePath);
this.icebergTable = getTable(committerInitContext.tablePath());
this.isAutoSnapshotExpiration =
committerInitContext.tableInfo().getTableConfig().isDataLakeAutoExpireSnapshot()
|| committerInitContext
.lakeTieringConfig()
.get(ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT);
// register iceberg listener
Listeners.register(new IcebergSnapshotCreateListener(), CreateSnapshotEvent.class);
}
Expand Down Expand Up @@ -141,6 +152,12 @@ public LakeCommitResult commit(
snapshotId = rewriteCommitSnapshotId;
}
}

// Expire old snapshots if auto-expire-snapshot is enabled
if (isAutoSnapshotExpiration) {
expireSnapshots();
}

// Iceberg does not provide cumulative table stats API yet; leave stats as -1 (unknown).
return LakeCommitResult.committedIsReadable(snapshotId);
} catch (Exception e) {
Expand Down Expand Up @@ -262,6 +279,22 @@ public void close() throws Exception {
}
}

/**
* Expires old snapshots from the Iceberg table. Uses Iceberg's built-in snapshot expiration
* which respects table properties like {@code history.expire.max-snapshot-age-ms} and {@code
* history.expire.min-snapshots-to-keep}.
*/
private void expireSnapshots() {
try {
ExpireSnapshots expireSnapshots =
icebergTable.expireSnapshots().cleanExpiredFiles(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this expire all non latest snapshots? If so, would this cause recovery failure in the case when tiering service failover and restarts from latest Flink checkpoint and couldn't find a deleted iceberg snapshot?

expireSnapshots.commit();
LOG.debug("Successfully expired old snapshots for Iceberg table.");
} catch (Exception e) {
LOG.warn("Failed to expire snapshots for Iceberg table, will retry on next commit.", e);
}
}

private Table getTable(TablePath tablePath) throws IOException {
try {
TableIdentifier tableId = toIceberg(tablePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public SimpleVersionedSerializer<IcebergWriteResult> getWriteResultSerializer()
@Override
public LakeCommitter<IcebergWriteResult, IcebergCommittable> createLakeCommitter(
CommitterInitContext committerInitContext) throws IOException {
return new IcebergLakeCommitter(icebergCatalogProvider, committerInitContext.tablePath());
return new IcebergLakeCommitter(icebergCatalogProvider, committerInitContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand Down Expand Up @@ -110,6 +111,15 @@ private static Stream<Arguments> tieringWriteArgs() {
Arguments.of(false, false));
}

private static Stream<Arguments> snapshotExpireArgs() {
return Stream.of(
// isTableAutoExpireSnapshot, isLakeTieringExpireSnapshot
Arguments.of(true, true),
Arguments.of(true, false),
Arguments.of(false, true),
Arguments.of(false, false));
}

@ParameterizedTest
@MethodSource("tieringWriteArgs")
void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable)
Expand Down Expand Up @@ -186,7 +196,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable

// second, commit data
try (LakeCommitter<IcebergWriteResult, IcebergCommittable> lakeCommitter =
createLakeCommitter(tablePath, tableInfo)) {
createLakeCommitter(tablePath, tableInfo, new Configuration())) {
// serialize/deserialize committable
IcebergCommittable icebergCommittable =
lakeCommitter.toCommittable(icebergWriteResults);
Expand Down Expand Up @@ -249,7 +259,8 @@ public TableInfo tableInfo() {
}

private LakeCommitter<IcebergWriteResult, IcebergCommittable> createLakeCommitter(
TablePath tablePath, TableInfo tableInfo) throws IOException {
TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig)
throws IOException {
return icebergLakeTieringFactory.createLakeCommitter(
new CommitterInitContext() {
@Override
Expand All @@ -264,7 +275,7 @@ public TableInfo tableInfo() {

@Override
public Configuration lakeTieringConfig() {
return new Configuration();
return lakeTieringConfig;
}

@Override
Expand All @@ -274,6 +285,108 @@ public Configuration flussClientConfig() {
});
}

@ParameterizedTest
@MethodSource("snapshotExpireArgs")
void testSnapshotExpiration(
boolean isTableAutoExpireSnapshot, boolean isLakeTieringExpireSnapshot)
throws Exception {
int bucketNum = 3;
TablePath tablePath =
TablePath.of(
"iceberg",
String.format(
"test_snapshot_expire_%s_%s",
isTableAutoExpireSnapshot, isLakeTieringExpireSnapshot));

// Create Iceberg table with snapshot retention properties
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.MIN_SNAPSHOTS_TO_KEEP, "1");
tableProperties.put(TableProperties.MAX_SNAPSHOT_AGE_MS, "1000");
createTable(tablePath, false, false, tableProperties);

TableDescriptor descriptor =
TableDescriptor.builder()
.schema(
org.apache.fluss.metadata.Schema.newBuilder()
.column("c1", DataTypes.INT())
.column("c2", DataTypes.STRING())
.column("c3", DataTypes.STRING())
.build())
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
.property(
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT,
isTableAutoExpireSnapshot)
.build();
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L);

Configuration lakeTieringConfig = new Configuration();
lakeTieringConfig.set(
ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT, isLakeTieringExpireSnapshot);

// Write data multiple times to generate snapshots
for (int round = 0; round < 5; round++) {
writeData(tablePath, tableInfo, lakeTieringConfig, bucketNum);
// Delay to ensure snapshots are older than MAX_SNAPSHOT_AGE_MS=1000ms with margin
Thread.sleep(2000);
}

// Verify snapshot count
Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
int snapshotCount = 0;
for (Snapshot ignored : icebergTable.snapshots()) {
snapshotCount++;
}

if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) {
// if auto snapshot expiration is enabled, old snapshots should be expired
// With MIN_SNAPSHOTS_TO_KEEP=1, only 1 snapshot should be retained
assertThat(snapshotCount).isEqualTo(1);
} else {
// if auto snapshot expiration is disabled, all snapshots should be retained
assertThat(snapshotCount).isEqualTo(5);
}
}

private void writeData(
TablePath tablePath,
TableInfo tableInfo,
Configuration lakeTieringConfig,
int bucketNum)
throws Exception {
List<IcebergWriteResult> icebergWriteResults = new ArrayList<>();
SimpleVersionedSerializer<IcebergWriteResult> writeResultSerializer =
icebergLakeTieringFactory.getWriteResultSerializer();
SimpleVersionedSerializer<IcebergCommittable> committableSerializer =
icebergLakeTieringFactory.getCommittableSerializer();

for (int bucket = 0; bucket < bucketNum; bucket++) {
try (LakeWriter<IcebergWriteResult> writer =
createLakeWriter(tablePath, bucket, null, null, tableInfo)) {
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
genLogTableRecords(null, bucket, 3);
for (LogRecord record : writeAndExpectRecords.f0) {
writer.write(record);
}
IcebergWriteResult result = writer.complete();
byte[] serialized = writeResultSerializer.serialize(result);
icebergWriteResults.add(
writeResultSerializer.deserialize(
writeResultSerializer.getVersion(), serialized));
}
}

try (LakeCommitter<IcebergWriteResult, IcebergCommittable> lakeCommitter =
createLakeCommitter(tablePath, tableInfo, lakeTieringConfig)) {
IcebergCommittable committable = lakeCommitter.toCommittable(icebergWriteResults);
byte[] serialized = committableSerializer.serialize(committable);
committable =
committableSerializer.deserialize(
committableSerializer.getVersion(), serialized);
lakeCommitter.commit(committable, Collections.emptyMap());
}
}

private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
@Nullable String partition, int bucket, int numRecords) {
List<LogRecord> logRecords = new ArrayList<>();
Expand Down Expand Up @@ -359,6 +472,14 @@ private GenericRecord toRecord(long offset, GenericRow row, ChangeType changeTyp

private void createTable(
TablePath tablePath, boolean isPrimaryTable, boolean isPartitionedTable) {
createTable(tablePath, isPrimaryTable, isPartitionedTable, Collections.emptyMap());
}

private void createTable(
TablePath tablePath,
boolean isPrimaryTable,
boolean isPartitionedTable,
Map<String, String> tableProperties) {
Namespace namespace = Namespace.of(tablePath.getDatabaseName());
if (icebergCatalog instanceof SupportsNamespaces) {
SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
Expand Down Expand Up @@ -404,7 +525,7 @@ private void createTable(

TableIdentifier tableId =
TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName());
icebergCatalog.createTable(tableId, schema, partitionSpec);
icebergCatalog.createTable(tableId, schema, partitionSpec, tableProperties);
}

private CloseableIterator<Record> getIcebergRows(
Expand Down