Skip to content

Commit 2335851

Browse files
committed
[paimon] Fix union read paimon dv table issue
1 parent 8b11e00 commit 2335851

File tree

48 files changed

+3912
-303
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+3912
-303
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -413,21 +413,67 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
413413
TableBucket bucket, long snapshotId);
414414

415415
/**
416-
* Get table lake snapshot info of the given table asynchronously.
416+
* Retrieves the absolute latest lake snapshot metadata for a table asynchronously.
417417
*
418-
* <p>It'll get the latest snapshot for all the buckets of the table.
418+
* <p>This returns the most recent snapshot regardless of its visibility or compaction status.
419+
* It includes the latest tiered offsets for all buckets.
419420
*
420-
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
421+
* <p>Exceptions expected when calling {@code get()} on the returned future:
421422
*
422423
* <ul>
423-
* <li>{@link TableNotExistException} if the table does not exist.
424-
* <li>{@link LakeTableSnapshotNotExistException} if no any lake snapshot exist.
424+
* <li>{@link TableNotExistException}: If the table does not exist.
425+
* <li>{@link LakeTableSnapshotNotExistException}: If no any snapshots.
425426
* </ul>
426427
*
427-
* @param tablePath the table path of the table.
428+
* @param tablePath The path of the target table.
429+
* @return A future returning the latest tiered snapshot.
428430
*/
429431
CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath);
430432

433+
/**
434+
* Retrieves a specific historical lake snapshot by its ID asynchronously.
435+
*
436+
* <p>It provides the tiered bucket offsets as they existed at the moment the specified snapshot
437+
* was committed.
438+
*
439+
* <p>Exceptions expected when calling {@code get()} on the returned future:
440+
*
441+
* <ul>
442+
* <li>{@link TableNotExistException}: If the table does not exist.
443+
* <li>{@link LakeTableSnapshotNotExistException}: If the specified snapshot ID is missing in
444+
* Fluss
445+
* </ul>
446+
*
447+
* @param tablePath The path of the target table.
448+
* @param snapshotId The unique identifier of the snapshot.
449+
* @return A future returning the specific lake snapshot.
450+
*/
451+
CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId);
452+
453+
/**
454+
* Retrieves the latest readable lake snapshot and its corresponding readable log offsets.
455+
*
456+
* <p>For Paimon DV tables, the tiered log offset may not be readable because the corresponding
457+
* data might be in the L0 layer. Using tiered offset directly can lead to data loss. This
458+
* method returns a readable snapshot (where L0 data has been compacted) and its corresponding
459+
* readable offsets, which represent safe log offsets that can be read without data loss.
460+
*
461+
* <p>For union read operations, use this method instead of {@link
462+
* #getLatestLakeSnapshot(TablePath)} to ensure data safety and avoid data loss.
463+
*
464+
* <p>Exceptions expected when calling {@code get()} on the returned future:
465+
*
466+
* <ul>
467+
* <li>{@link TableNotExistException}: If the table does not exist.
468+
* <li>{@link LakeTableSnapshotNotExistException}: If no readable snapshot exists yet.
469+
* </ul>
470+
*
471+
* @param tablePath The path of the target table.
472+
* @return A future returning a {@link LakeSnapshot} containing the readable snapshot ID and
473+
* readable log offsets for each bucket.
474+
*/
475+
CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath);
476+
431477
/**
432478
* List offset for the specified buckets. This operation enables to find the beginning offset,
433479
* end offset as well as the offset matching a timestamp in buckets.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@
6363
import org.apache.fluss.rpc.messages.DropTableRequest;
6464
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
6565
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
66+
import org.apache.fluss.rpc.messages.GetLakeSnapshotRequest;
6667
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
67-
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
6868
import org.apache.fluss.rpc.messages.GetTableInfoRequest;
6969
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
7070
import org.apache.fluss.rpc.messages.ListAclsRequest;
@@ -390,13 +390,38 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
390390

391391
@Override
392392
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
393-
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();
393+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
394394
request.setTablePath()
395395
.setDatabaseName(tablePath.getDatabaseName())
396396
.setTableName(tablePath.getTableName());
397397

398398
return readOnlyGateway
399-
.getLatestLakeSnapshot(request)
399+
.getLakeSnapshot(request)
400+
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
401+
}
402+
403+
@Override
404+
public CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId) {
405+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
406+
request.setTablePath()
407+
.setDatabaseName(tablePath.getDatabaseName())
408+
.setTableName(tablePath.getTableName());
409+
request.setSnapshotId(snapshotId);
410+
411+
return readOnlyGateway
412+
.getLakeSnapshot(request)
413+
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
414+
}
415+
416+
@Override
417+
public CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath) {
418+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
419+
request.setTablePath()
420+
.setDatabaseName(tablePath.getDatabaseName())
421+
.setTableName(tablePath.getTableName());
422+
request.setReadable(true);
423+
return readOnlyGateway
424+
.getLakeSnapshot(request)
400425
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
401426
}
402427

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@
4646
import org.apache.fluss.rpc.messages.DropPartitionRequest;
4747
import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
4848
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
49+
import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse;
4950
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
50-
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
5151
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
5252
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
5353
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
@@ -221,7 +221,7 @@ public static KvSnapshotMetadata toKvSnapshotMetadata(GetKvSnapshotMetadataRespo
221221
toFsPathAndFileName(response.getSnapshotFilesList()), response.getLogOffset());
222222
}
223223

224-
public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse response) {
224+
public static LakeSnapshot toLakeTableSnapshotInfo(GetLakeSnapshotResponse response) {
225225
long tableId = response.getTableId();
226226
long snapshotId = response.getSnapshotId();
227227
Map<TableBucket, Long> tableBucketsOffset =

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,11 @@ public interface CommitterInitContext {
5151
* @return the lake tiering config
5252
*/
5353
Configuration lakeTieringConfig();
54+
55+
/**
56+
* Returns the fluss config.
57+
*
58+
* @return the fluss config
59+
*/
60+
Configuration flussConfig();
5461
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.committer;
20+
21+
import org.apache.fluss.annotation.PublicEvolving;
22+
import org.apache.fluss.metadata.TableBucket;
23+
24+
import javax.annotation.Nullable;
25+
26+
import java.util.Map;
27+
import java.util.Objects;
28+
29+
/**
30+
* The result of a lake commit operation, containing the committed snapshot ID and the readable
31+
* snapshot information.
32+
*
33+
* <p>For most implementations, the readable snapshot is the same as the committed snapshot, and the
34+
* readable log offsets are the same as the tiered offsets from TieringCommitOperator.
35+
*
36+
* <p>For Paimon DV tables, the readable snapshot will be different from the committed snapshot, and
37+
* the log end offsets will be different as well (based on compaction status).
38+
*
39+
* @since 0.9
40+
*/
41+
@PublicEvolving
42+
public class LakeCommitResult {
43+
44+
// -1 to enforce to keep all previous snapshots
45+
public static final Long KEEP_ALL_PREVIOUS = -1L;
46+
47+
// The snapshot ID that was just committed
48+
private final long committedSnapshotId;
49+
50+
private final boolean committedIsReadable;
51+
52+
// The earliest snapshot ID to keep, null means not to keep any previous snapshot
53+
@Nullable private final Long earliestSnapshotIDToKeep;
54+
55+
// the readable snapshot, null if
56+
// 1: the readable snapshot is unknown,
57+
// 2: committedIsReadable is true, committed snapshot is just also readable
58+
@Nullable private final ReadableSnapshot readableSnapshot;
59+
60+
private LakeCommitResult(
61+
long committedSnapshotId,
62+
boolean committedIsReadable,
63+
@Nullable ReadableSnapshot readableSnapshot,
64+
@Nullable Long earliestSnapshotIDToKeep) {
65+
this.committedSnapshotId = committedSnapshotId;
66+
this.committedIsReadable = committedIsReadable;
67+
this.readableSnapshot = readableSnapshot;
68+
this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
69+
}
70+
71+
public static LakeCommitResult committedIsReadable(long committedSnapshotId) {
72+
return new LakeCommitResult(committedSnapshotId, true, null, null);
73+
}
74+
75+
public static LakeCommitResult unknownReadableSnapshot(long committedSnapshotId) {
76+
return new LakeCommitResult(committedSnapshotId, false, null, KEEP_ALL_PREVIOUS);
77+
}
78+
79+
public static LakeCommitResult withReadableSnapshot(
80+
long committedSnapshotId,
81+
// the readable snapshot id
82+
long readableSnapshotId,
83+
// the tiered log end offset for readable snapshot
84+
Map<TableBucket, Long> tieredLogEndOffsets,
85+
// the readable log end offset for readable snapshot
86+
Map<TableBucket, Long> readableLogEndOffsets,
87+
@Nullable Long earliestSnapshotIDToKeep) {
88+
return new LakeCommitResult(
89+
committedSnapshotId,
90+
false,
91+
new ReadableSnapshot(
92+
readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets),
93+
earliestSnapshotIDToKeep);
94+
}
95+
96+
public long getCommittedSnapshotId() {
97+
return committedSnapshotId;
98+
}
99+
100+
public boolean committedIsReadable() {
101+
return committedIsReadable;
102+
}
103+
104+
@Nullable
105+
public ReadableSnapshot getReadableSnapshot() {
106+
return readableSnapshot;
107+
}
108+
109+
/**
110+
* Gets the earliest snapshot ID to keep.
111+
*
112+
* @return the earliest snapshot ID to keep
113+
*/
114+
@Nullable
115+
public Long getEarliestSnapshotIDToKeep() {
116+
return earliestSnapshotIDToKeep;
117+
}
118+
119+
/**
120+
* Represents the information about a snapshot that is considered "readable".
121+
*
122+
* <p>In lake storage, a snapshot might be physically committed but not yet fully consistent for
123+
* reading (e.g., due to data invisible in level0 for Paimon DV tables). This class tracks both
124+
* the tiered offsets (what's been uploaded) and the readable offsets (what can be safely
125+
* queried).
126+
*/
127+
public static class ReadableSnapshot {
128+
129+
private final long readableSnapshotId;
130+
/**
131+
* The log end offsets that have been tiered to the lake storage for this snapshot. These
132+
* represent the physical data boundaries in the lake.
133+
*/
134+
private final Map<TableBucket, Long> tieredLogEndOffsets;
135+
136+
/**
137+
* The log end offsets that are actually visible/readable for this snapshot. For some table
138+
* types (like Paimon DV), this might lag behind {@link #tieredLogEndOffsets} until specific
139+
* compaction tasks complete.
140+
*/
141+
private final Map<TableBucket, Long> readableLogEndOffsets;
142+
143+
public ReadableSnapshot(
144+
Long readableSnapshotId,
145+
Map<TableBucket, Long> tieredLogEndOffsets,
146+
Map<TableBucket, Long> readableLogEndOffsets) {
147+
this.readableSnapshotId = readableSnapshotId;
148+
this.tieredLogEndOffsets = tieredLogEndOffsets;
149+
this.readableLogEndOffsets = readableLogEndOffsets;
150+
}
151+
152+
public Long getReadableSnapshotId() {
153+
return readableSnapshotId;
154+
}
155+
156+
public Map<TableBucket, Long> getTieredLogEndOffsets() {
157+
return tieredLogEndOffsets;
158+
}
159+
160+
public Map<TableBucket, Long> getReadableLogEndOffsets() {
161+
return readableLogEndOffsets;
162+
}
163+
164+
@Override
165+
public boolean equals(Object o) {
166+
if (this == o) {
167+
return true;
168+
}
169+
if (o == null || getClass() != o.getClass()) {
170+
return false;
171+
}
172+
ReadableSnapshot that = (ReadableSnapshot) o;
173+
return readableSnapshotId == that.readableSnapshotId
174+
&& Objects.equals(tieredLogEndOffsets, that.tieredLogEndOffsets)
175+
&& Objects.equals(readableLogEndOffsets, that.readableLogEndOffsets);
176+
}
177+
178+
@Override
179+
public int hashCode() {
180+
return Objects.hash(readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets);
181+
}
182+
}
183+
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable
5656
*
5757
* @param committable the committable object
5858
* @param snapshotProperties the properties that lake supported to store in snapshot
59-
* @return the committed snapshot ID
59+
* @return the commit result, which always includes the latest committed snapshot ID and may
60+
* optionally contain distinct readable snapshot information if the physical tiered offsets
61+
* do not yet represent a consistent readable state (e.g., in Paimon DV tables where the
62+
* tiered log records may still in level0 which is not readable).
6063
* @throws IOException if an I/O error occurs
6164
*/
62-
long commit(CommittableT committable, Map<String, String> snapshotProperties)
65+
LakeCommitResult commit(CommittableT committable, Map<String, String> snapshotProperties)
6366
throws IOException;
6467

6568
/**

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public LakeSplitGenerator(
8787
public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
8888
LakeSnapshot lakeSnapshotInfo;
8989
try {
90-
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
90+
lakeSnapshotInfo = flussAdmin.getReadableLakeSnapshot(tableInfo.getTablePath()).get();
9191
} catch (Exception exception) {
9292
if (ExceptionUtils.stripExecutionException(exception)
9393
instanceof LakeTableSnapshotNotExistException) {
@@ -119,7 +119,7 @@ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
119119
lakeSplits, isLogTable, tableBucketsOffset, partitionNameById);
120120
} else {
121121
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
122-
lakeSplits.values().iterator().next();
122+
lakeSplits.isEmpty() ? null : lakeSplits.values().iterator().next();
123123
// non-partitioned table
124124
return generateNoPartitionedTableSplit(
125125
nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
@@ -307,7 +307,7 @@ private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
307307
}
308308

309309
private List<SourceSplitBase> generateNoPartitionedTableSplit(
310-
Map<Integer, List<LakeSplit>> lakeSplits,
310+
@Nullable Map<Integer, List<LakeSplit>> lakeSplits,
311311
boolean isLogTable,
312312
Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
313313
// iterate all bucket

0 commit comments

Comments
 (0)