Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,21 +409,67 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
TableBucket bucket, long snapshotId);

/**
* Get table lake snapshot info of the given table asynchronously.
* Retrieves the absolute latest lake snapshot metadata for a table asynchronously.
*
* <p>It'll get the latest snapshot for all the buckets of the table.
* <p>This returns the most recent snapshot regardless of its visibility or compaction status.
* It includes the latest tiered offsets for all buckets.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
* <p>Exceptions expected when calling {@code get()} on the returned future:
*
* <ul>
* <li>{@link TableNotExistException} if the table does not exist.
* <li>{@link LakeTableSnapshotNotExistException} if no any lake snapshot exist.
* <li>{@link TableNotExistException}: If the table path is invalid.
* <li>{@link LakeTableSnapshotNotExistException}: If no any snapshots.
* </ul>
*
* @param tablePath the table path of the table.
* @param tablePath The path of the target table.
* @return A future returning the latest tiered snapshot.
*/
CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath);

/**
* Retrieves a specific historical lake snapshot by its ID asynchronously.
*
* <p>It provides the tiered bucket offsets as they existed at the moment the specified snapshot
* was committed.
*
* <p>Exceptions expected when calling {@code get()} on the returned future:
*
* <ul>
* <li>{@link TableNotExistException}: If the table path is invalid.
* <li>{@link LakeTableSnapshotNotExistException}: If the specified snapshot ID is missing in
* Fluss
* </ul>
*
* @param tablePath The path of the target table.
* @param snapshotId The unique identifier of the snapshot.
* @return A future returning the specific lake snapshot.
*/
CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId);

/**
* Retrieves the latest readable lake snapshot and its corresponding readable log offsets.
*
* <p>For Paimon DV tables, the tiered log offset may not be readable because the corresponding
* data might be in the L0 layer. Using tiered offset directly can lead to data loss. This
* method returns a readable snapshot (where L0 data has been compacted) and its corresponding
* readable offsets, which represent safe log offsets that can be read without data loss.
*
* <p>For union read operations, use this method instead of {@link
* #getLatestLakeSnapshot(TablePath)} to ensure data safety and avoid data loss.
*
* <p>Exceptions expected when calling {@code get()} on the returned future:
*
* <ul>
* <li>{@link TableNotExistException}: If the table path is invalid.
* <li>{@link LakeTableSnapshotNotExistException}: If no readable snapshot exists yet.
* </ul>
*
* @param tablePath The path of the target table.
* @return A future returning a {@link LakeSnapshot} containing the readable snapshot ID and
* readable log offsets for each bucket.
*/
CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath);

/**
* List offset for the specified buckets. This operation enables to find the beginning offset,
* end offset as well as the offset matching a timestamp in buckets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
import org.apache.fluss.rpc.messages.DropTableRequest;
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.GetLakeSnapshotRequest;
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
import org.apache.fluss.rpc.messages.GetTableInfoRequest;
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
import org.apache.fluss.rpc.messages.ListAclsRequest;
Expand Down Expand Up @@ -383,13 +383,38 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(

@Override
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
request.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());

return readOnlyGateway
.getLatestLakeSnapshot(request)
.getLakeSnapshot(request)
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
}

@Override
public CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId) {
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
request.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
request.setSnapshotId(snapshotId);

return readOnlyGateway
.getLakeSnapshot(request)
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
}

@Override
public CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath) {
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
request.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
request.setReadable(true);
return readOnlyGateway
.getLakeSnapshot(request)
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import org.apache.fluss.rpc.messages.DropPartitionRequest;
import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse;
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
import org.apache.fluss.rpc.messages.LookupRequest;
Expand Down Expand Up @@ -210,7 +210,7 @@ public static KvSnapshotMetadata toKvSnapshotMetadata(GetKvSnapshotMetadataRespo
toFsPathAndFileName(response.getSnapshotFilesList()), response.getLogOffset());
}

public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse response) {
public static LakeSnapshot toLakeTableSnapshotInfo(GetLakeSnapshotResponse response) {
long tableId = response.getTableId();
long snapshotId = response.getSnapshotId();
Map<TableBucket, Long> tableBucketsOffset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,11 @@ public interface CommitterInitContext {
* @return the lake tiering config
*/
Configuration lakeTieringConfig();

/**
* Returns the fluss config.
*
* @return the fluss config
*/
Configuration flussConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.committer;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.metadata.TableBucket;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Objects;

/**
* The result of a lake commit operation, containing the committed snapshot ID and the readable
* snapshot information.
*
* <p>For most implementations, the readable snapshot is the same as the committed snapshot, and the
* log end offsets are the same as the tiered offsets from TieringCommitOperator.
*
* <p>For Paimon DV tables, the readable snapshot will be different from the committed snapshot, and
* the log end offsets will be different as well (based on compaction status).
*
* @since 0.9
*/
@PublicEvolving
public class LakeCommitResult {

// -1 to enforce to keep all previous snapshots
public static final Long KEEP_ALL = -1L;

// The snapshot ID that was just committed
private final long committedSnapshotId;

private final boolean committedIsReadable;

// The earliest snapshot ID to keep, null means not to keep any previous snapshot
@Nullable private final Long earliestSnapshotIDToKeep;

@Nullable private final ReadableSnapshot readableSnapshot;

private LakeCommitResult(
long committedSnapshotId,
boolean committedIsReadable,
@Nullable ReadableSnapshot readableSnapshot,
@Nullable Long earliestSnapshotIDToKeep) {
this.committedSnapshotId = committedSnapshotId;
this.committedIsReadable = committedIsReadable;
this.readableSnapshot = readableSnapshot;
this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
}

public static LakeCommitResult committedIsReadable(long committedSnapshotId) {
return new LakeCommitResult(committedSnapshotId, true, null, null);
}

public static LakeCommitResult unknownReadableSnapshot(long committedSnapshotId) {
return new LakeCommitResult(committedSnapshotId, false, null, KEEP_ALL);
}

public static LakeCommitResult withReadableSnapshot(
long committedSnapshotId,
// the readable snapshot id
long readableSnapshotId,
// the tiered log end offset for readable snapshot
Map<TableBucket, Long> tieredLogEndOffsets,
// the readable log end offset for readable snapshot
Map<TableBucket, Long> readableLogEndOffsets,
@Nullable Long earliestSnapshotIDToKeep) {
return new LakeCommitResult(
committedSnapshotId,
false,
new ReadableSnapshot(
readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets),
earliestSnapshotIDToKeep);
}

public long getCommittedSnapshotId() {
return committedSnapshotId;
}

public boolean committedIsReadable() {
return committedIsReadable;
}

@Nullable
public ReadableSnapshot getReadableSnapshot() {
return readableSnapshot;
}

/**
* Gets the earliest snapshot ID to keep.
*
* @return the earliest snapshot ID to keep
*/
@Nullable
public Long getEarliestSnapshotIDToKeep() {
return earliestSnapshotIDToKeep;
}

/** todo. */
public static class ReadableSnapshot {
private final long readableSnapshotId;
private final Map<TableBucket, Long> tieredLogEndOffsets;
private final Map<TableBucket, Long> readableLogEndOffsets;

public ReadableSnapshot(
Long readableSnapshotId,
Map<TableBucket, Long> tieredLogEndOffsets,
Map<TableBucket, Long> readableLogEndOffsets) {
this.readableSnapshotId = readableSnapshotId;
this.tieredLogEndOffsets = tieredLogEndOffsets;
this.readableLogEndOffsets = readableLogEndOffsets;
}

public Long getReadableSnapshotId() {
return readableSnapshotId;
}

public Map<TableBucket, Long> getTieredLogEndOffsets() {
return tieredLogEndOffsets;
}

public Map<TableBucket, Long> getReadableLogEndOffsets() {
return readableLogEndOffsets;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReadableSnapshot that = (ReadableSnapshot) o;
return readableSnapshotId == that.readableSnapshotId
&& Objects.equals(tieredLogEndOffsets, that.tieredLogEndOffsets)
&& Objects.equals(readableLogEndOffsets, that.readableLogEndOffsets);
}

@Override
public int hashCode() {
return Objects.hash(readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable
*
* @param committable the committable object
* @param snapshotProperties the properties that lake supported to store in snapshot
* @return the committed snapshot ID
* @return the commit result, which always includes the latest committed snapshot ID and may
* optionally contain distinct readable snapshot information if the physical tiered offsets
* do not yet represent a consistent readable state (e.g., in Paimon DV tables where the
* tiered log records may still in level0 which is not readable).
* @throws IOException if an I/O error occurs
*/
long commit(CommittableT committable, Map<String, String> snapshotProperties)
LakeCommitResult commit(CommittableT committable, Map<String, String> snapshotProperties)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public LakeSplitGenerator(
public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
LakeSnapshot lakeSnapshotInfo;
try {
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
lakeSnapshotInfo = flussAdmin.getReadableLakeSnapshot(tableInfo.getTablePath()).get();
} catch (Exception exception) {
if (ExceptionUtils.stripExecutionException(exception)
instanceof LakeTableSnapshotNotExistException) {
Expand Down Expand Up @@ -119,7 +119,7 @@ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
lakeSplits, isLogTable, tableBucketsOffset, partitionNameById);
} else {
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
lakeSplits.values().iterator().next();
lakeSplits.isEmpty() ? null : lakeSplits.values().iterator().next();
// non-partitioned table
return generateNoPartitionedTableSplit(
nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
Expand Down Expand Up @@ -307,7 +307,7 @@ private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
}

private List<SourceSplitBase> generateNoPartitionedTableSplit(
Map<Integer, List<LakeSplit>> lakeSplits,
@Nullable Map<Integer, List<LakeSplit>> lakeSplits,
boolean isLogTable,
Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
// iterate all bucket
Expand Down
Loading