Skip to content

Commit 5a4374a

Browse files
authored
[paimon] Fix union read paimon dv table issue (apache#2326)
1 parent 139035e commit 5a4374a

File tree

52 files changed

+3959
-308
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3959
-308
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -438,21 +438,74 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
438438
KvSnapshotLease createKvSnapshotLease(String leaseId, long leaseDurationMs);
439439

440440
/**
441-
* Get table lake snapshot info of the given table asynchronously.
441+
* Retrieves the absolute latest lake snapshot metadata for a table asynchronously.
442442
*
443-
* <p>It'll get the latest snapshot for all the buckets of the table.
443+
* <p>This returns the most recent snapshot regardless of its visibility or compaction status.
444+
* It includes the latest tiered offsets for all buckets.
444445
*
445-
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
446+
* <p><b>NOTE: This API is not intended for union reads and should be considered internal.</b>
447+
* For union read operations (e.g. Flink/Spark reading from Fluss + lake), use {@link
448+
* #getReadableLakeSnapshot(TablePath)} instead. Using this method for union reads can lead to
449+
* data loss when the latest tiered snapshot is not yet readable (e.g. Paimon DV tables with
450+
* un-compacted L0 data). This method remains for internal use cases such as tiering commit and
451+
* readable-offset resolution.
452+
*
453+
* <p>Exceptions expected when calling {@code get()} on the returned future:
446454
*
447455
* <ul>
448-
* <li>{@link TableNotExistException} if the table does not exist.
449-
* <li>{@link LakeTableSnapshotNotExistException} if no any lake snapshot exist.
456+
* <li>{@link TableNotExistException}: If the table does not exist.
457+
* <li>{@link LakeTableSnapshotNotExistException}: If no any snapshots.
450458
* </ul>
451459
*
452-
* @param tablePath the table path of the table.
460+
* @param tablePath The path of the target table.
461+
* @return A future returning the latest tiered snapshot.
453462
*/
454463
CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath);
455464

465+
/**
466+
* Retrieves a specific historical lake snapshot by its ID asynchronously.
467+
*
468+
* <p>It provides the tiered bucket offsets as they existed at the moment the specified snapshot
469+
* was committed.
470+
*
471+
* <p>Exceptions expected when calling {@code get()} on the returned future:
472+
*
473+
* <ul>
474+
* <li>{@link TableNotExistException}: If the table does not exist.
475+
* <li>{@link LakeTableSnapshotNotExistException}: If the specified snapshot ID is missing in
476+
* Fluss
477+
* </ul>
478+
*
479+
* @param tablePath The path of the target table.
480+
* @param snapshotId The unique identifier of the snapshot.
481+
* @return A future returning the specific lake snapshot.
482+
*/
483+
CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId);
484+
485+
/**
486+
* Retrieves the latest readable lake snapshot and its corresponding readable log offsets.
487+
*
488+
* <p>For Paimon DV tables, the tiered log offset may not be readable because the corresponding
489+
* data might be in the L0 layer. Using tiered offset directly can lead to data loss. This
490+
* method returns a readable snapshot (where L0 data has been compacted) and its corresponding
491+
* readable offsets, which represent safe log offsets that can be read without data loss.
492+
*
493+
* <p>For union read operations, use this method instead of {@link
494+
* #getLatestLakeSnapshot(TablePath)} to ensure data safety and avoid data loss.
495+
*
496+
* <p>Exceptions expected when calling {@code get()} on the returned future:
497+
*
498+
* <ul>
499+
* <li>{@link TableNotExistException}: If the table does not exist.
500+
* <li>{@link LakeTableSnapshotNotExistException}: If no readable snapshot exists yet.
501+
* </ul>
502+
*
503+
* @param tablePath The path of the target table.
504+
* @return A future returning a {@link LakeSnapshot} containing the readable snapshot ID and
505+
* readable log offsets for each bucket.
506+
*/
507+
CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath);
508+
456509
/**
457510
* List offset for the specified buckets. This operation enables to find the beginning offset,
458511
* end offset as well as the offset matching a timestamp in buckets.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@
6565
import org.apache.fluss.rpc.messages.DropTableRequest;
6666
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
6767
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
68+
import org.apache.fluss.rpc.messages.GetLakeSnapshotRequest;
6869
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
69-
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
7070
import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest;
7171
import org.apache.fluss.rpc.messages.GetTableInfoRequest;
7272
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
@@ -407,13 +407,38 @@ public KvSnapshotLease createKvSnapshotLease(String leaseId, long leaseDurationM
407407

408408
@Override
409409
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
410-
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();
410+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
411411
request.setTablePath()
412412
.setDatabaseName(tablePath.getDatabaseName())
413413
.setTableName(tablePath.getTableName());
414414

415415
return readOnlyGateway
416-
.getLatestLakeSnapshot(request)
416+
.getLakeSnapshot(request)
417+
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
418+
}
419+
420+
@Override
421+
public CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId) {
422+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
423+
request.setTablePath()
424+
.setDatabaseName(tablePath.getDatabaseName())
425+
.setTableName(tablePath.getTableName());
426+
request.setSnapshotId(snapshotId);
427+
428+
return readOnlyGateway
429+
.getLakeSnapshot(request)
430+
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
431+
}
432+
433+
@Override
434+
public CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath) {
435+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
436+
request.setTablePath()
437+
.setDatabaseName(tablePath.getDatabaseName())
438+
.setTableName(tablePath.getTableName());
439+
request.setReadable(true);
440+
return readOnlyGateway
441+
.getLakeSnapshot(request)
417442
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
418443
}
419444

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@
5151
import org.apache.fluss.rpc.messages.DropPartitionRequest;
5252
import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
5353
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
54+
import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse;
5455
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
55-
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
5656
import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
5757
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
5858
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
@@ -258,7 +258,7 @@ public static KvSnapshotMetadata toKvSnapshotMetadata(GetKvSnapshotMetadataRespo
258258
toFsPathAndFileName(response.getSnapshotFilesList()), response.getLogOffset());
259259
}
260260

261-
public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse response) {
261+
public static LakeSnapshot toLakeTableSnapshotInfo(GetLakeSnapshotResponse response) {
262262
long tableId = response.getTableId();
263263
long snapshotId = response.getSnapshotId();
264264
Map<TableBucket, Long> tableBucketsOffset =

fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ void testDescribeTableOperation() throws Exception {
458458
assertThatThrownBy(() -> guestAdmin.getLatestLakeSnapshot(DATA1_TABLE_PATH_PK).get())
459459
.rootCause()
460460
.isInstanceOf(LakeTableSnapshotNotExistException.class)
461-
.hasMessageContaining("Lake table snapshot not exist for table");
461+
.hasMessageContaining("Lake table snapshot doesn't exist for table");
462462
}
463463

464464
@ParameterizedTest

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,12 @@ public interface CommitterInitContext {
5151
* @return the lake tiering config
5252
*/
5353
Configuration lakeTieringConfig();
54+
55+
/**
56+
* Returns the Fluss client configuration. This configuration can be used to build a Fluss
57+
* client, such as a Connection.
58+
*
59+
* @return the Fluss client configuration
60+
*/
61+
Configuration flussClientConfig();
5462
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.committer;
20+
21+
import org.apache.fluss.annotation.PublicEvolving;
22+
import org.apache.fluss.metadata.TableBucket;
23+
24+
import javax.annotation.Nullable;
25+
26+
import java.util.Map;
27+
import java.util.Objects;
28+
29+
/**
30+
* The result of a lake commit operation, containing the committed snapshot ID and the readable
31+
* snapshot information.
32+
*
33+
* <p>For most implementations, the readable snapshot is the same as the committed snapshot, and the
34+
* readable log offsets are the same as the tiered offsets from TieringCommitOperator.
35+
*
36+
* <p>For Paimon DV tables, the readable snapshot will be different from the committed snapshot, and
37+
* the log end offsets will be different as well (based on compaction status).
38+
*
39+
* @since 0.9
40+
*/
41+
@PublicEvolving
42+
public class LakeCommitResult {
43+
44+
/** Keep only the latest snapshot; discard all previous ones. */
45+
public static final Long KEEP_LATEST = null;
46+
47+
/** Keep all previous snapshots (infinite retention). */
48+
public static final Long KEEP_ALL_PREVIOUS = -1L;
49+
50+
// The snapshot ID that was just committed
51+
private final long committedSnapshotId;
52+
53+
private final boolean committedIsReadable;
54+
55+
// The earliest snapshot ID to keep. KEEP_LATEST (null) means keep only the latest (discard all
56+
// previous). KEEP_ALL_PREVIOUS (-1) means keep all previous snapshots (infinite retention).
57+
@Nullable private final Long earliestSnapshotIDToKeep;
58+
59+
// the readable snapshot, null if
60+
// 1: the readable snapshot is unknown,
61+
// 2: committedIsReadable is true, committed snapshot is just also readable
62+
@Nullable private final ReadableSnapshot readableSnapshot;
63+
64+
private LakeCommitResult(
65+
long committedSnapshotId,
66+
boolean committedIsReadable,
67+
@Nullable ReadableSnapshot readableSnapshot,
68+
@Nullable Long earliestSnapshotIDToKeep) {
69+
this.committedSnapshotId = committedSnapshotId;
70+
this.committedIsReadable = committedIsReadable;
71+
this.readableSnapshot = readableSnapshot;
72+
this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
73+
}
74+
75+
public static LakeCommitResult committedIsReadable(long committedSnapshotId) {
76+
return new LakeCommitResult(committedSnapshotId, true, null, KEEP_LATEST);
77+
}
78+
79+
public static LakeCommitResult unknownReadableSnapshot(long committedSnapshotId) {
80+
return new LakeCommitResult(committedSnapshotId, false, null, KEEP_ALL_PREVIOUS);
81+
}
82+
83+
public static LakeCommitResult withReadableSnapshot(
84+
long committedSnapshotId,
85+
// the readable snapshot id
86+
long readableSnapshotId,
87+
// the tiered log end offset for readable snapshot
88+
Map<TableBucket, Long> tieredLogEndOffsets,
89+
// the readable log end offset for readable snapshot
90+
Map<TableBucket, Long> readableLogEndOffsets,
91+
@Nullable Long earliestSnapshotIDToKeep) {
92+
return new LakeCommitResult(
93+
committedSnapshotId,
94+
false,
95+
new ReadableSnapshot(
96+
readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets),
97+
earliestSnapshotIDToKeep);
98+
}
99+
100+
public long getCommittedSnapshotId() {
101+
return committedSnapshotId;
102+
}
103+
104+
public boolean committedIsReadable() {
105+
return committedIsReadable;
106+
}
107+
108+
@Nullable
109+
public ReadableSnapshot getReadableSnapshot() {
110+
return readableSnapshot;
111+
}
112+
113+
/**
114+
* Gets the earliest snapshot ID to keep.
115+
*
116+
* @return the earliest snapshot ID to keep
117+
*/
118+
@Nullable
119+
public Long getEarliestSnapshotIDToKeep() {
120+
return earliestSnapshotIDToKeep;
121+
}
122+
123+
/**
124+
* Represents the information about a snapshot that is considered "readable".
125+
*
126+
* <p>In lake storage, a snapshot might be physically committed but not yet fully consistent for
127+
* reading (e.g., due to data invisible in level0 for Paimon DV tables). This class tracks both
128+
* the tiered offsets (what's been uploaded) and the readable offsets (what can be safely
129+
* queried).
130+
*/
131+
public static class ReadableSnapshot {
132+
133+
private final long readableSnapshotId;
134+
/**
135+
* The log end offsets that have been tiered to the lake storage for this snapshot. These
136+
* represent the physical data boundaries in the lake.
137+
*/
138+
private final Map<TableBucket, Long> tieredLogEndOffsets;
139+
140+
/**
141+
* The log end offsets that are actually visible/readable for this snapshot. For some table
142+
* types (like Paimon DV), this might lag behind {@link #tieredLogEndOffsets} until specific
143+
* compaction tasks complete.
144+
*/
145+
private final Map<TableBucket, Long> readableLogEndOffsets;
146+
147+
public ReadableSnapshot(
148+
Long readableSnapshotId,
149+
Map<TableBucket, Long> tieredLogEndOffsets,
150+
Map<TableBucket, Long> readableLogEndOffsets) {
151+
this.readableSnapshotId = readableSnapshotId;
152+
this.tieredLogEndOffsets = tieredLogEndOffsets;
153+
this.readableLogEndOffsets = readableLogEndOffsets;
154+
}
155+
156+
public Long getReadableSnapshotId() {
157+
return readableSnapshotId;
158+
}
159+
160+
public Map<TableBucket, Long> getTieredLogEndOffsets() {
161+
return tieredLogEndOffsets;
162+
}
163+
164+
public Map<TableBucket, Long> getReadableLogEndOffsets() {
165+
return readableLogEndOffsets;
166+
}
167+
168+
@Override
169+
public boolean equals(Object o) {
170+
if (this == o) {
171+
return true;
172+
}
173+
if (o == null || getClass() != o.getClass()) {
174+
return false;
175+
}
176+
ReadableSnapshot that = (ReadableSnapshot) o;
177+
return readableSnapshotId == that.readableSnapshotId
178+
&& Objects.equals(tieredLogEndOffsets, that.tieredLogEndOffsets)
179+
&& Objects.equals(readableLogEndOffsets, that.readableLogEndOffsets);
180+
}
181+
182+
@Override
183+
public int hashCode() {
184+
return Objects.hash(readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets);
185+
}
186+
}
187+
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable
5656
*
5757
* @param committable the committable object
5858
* @param snapshotProperties the properties that lake supported to store in snapshot
59-
* @return the committed snapshot ID
59+
* @return the commit result, which always includes the latest committed snapshot ID and may
60+
* optionally contain distinct readable snapshot information if the physical tiered offsets
61+
* do not yet represent a consistent readable state (e.g., in Paimon DV tables where the
62+
* tiered log records may still in level0 which is not readable).
6063
* @throws IOException if an I/O error occurs
6164
*/
62-
long commit(CommittableT committable, Map<String, String> snapshotProperties)
65+
LakeCommitResult commit(CommittableT committable, Map<String, String> snapshotProperties)
6366
throws IOException;
6467

6568
/**

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public LakeSplitGenerator(
8787
public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
8888
LakeSnapshot lakeSnapshotInfo;
8989
try {
90-
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
90+
lakeSnapshotInfo = flussAdmin.getReadableLakeSnapshot(tableInfo.getTablePath()).get();
9191
} catch (Exception exception) {
9292
if (ExceptionUtils.stripExecutionException(exception)
9393
instanceof LakeTableSnapshotNotExistException) {

0 commit comments

Comments
 (0)