Skip to content

Commit 9d7747f

Browse files
committed
[paimon] Fix union read paimon dv table issue
1 parent 38c2555 commit 9d7747f

File tree

46 files changed

+3598
-304
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+3598
-304
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -409,21 +409,67 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
409409
TableBucket bucket, long snapshotId);
410410

411411
/**
412-
* Get table lake snapshot info of the given table asynchronously.
412+
* Retrieves the absolute latest lake snapshot metadata for a table asynchronously.
413413
*
414-
* <p>It'll get the latest snapshot for all the buckets of the table.
414+
* <p>This returns the most recent snapshot regardless of its visibility or compaction status.
415+
* It includes the latest tiered offsets for all buckets.
415416
*
416-
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
417+
* <p>Exceptions expected when calling {@code get()} on the returned future:
417418
*
418419
* <ul>
419-
* <li>{@link TableNotExistException} if the table does not exist.
420-
* <li>{@link LakeTableSnapshotNotExistException} if no any lake snapshot exist.
420+
* <li>{@link TableNotExistException}: If the table path is invalid.
421+
* <li>{@link LakeTableSnapshotNotExistException}: If no any snapshots.
421422
* </ul>
422423
*
423-
* @param tablePath the table path of the table.
424+
* @param tablePath The path of the target table.
425+
* @return A future returning the latest tiered snapshot.
424426
*/
425427
CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath);
426428

429+
/**
430+
* Retrieves a specific historical lake snapshot by its ID asynchronously.
431+
*
432+
* <p>It provides the tiered bucket offsets as they existed at the moment the specified snapshot
433+
* was committed.
434+
*
435+
* <p>Exceptions expected when calling {@code get()} on the returned future:
436+
*
437+
* <ul>
438+
* <li>{@link TableNotExistException}: If the table path is invalid.
439+
* <li>{@link LakeTableSnapshotNotExistException}: If the specified snapshot ID is missing in
440+
* Fluss
441+
* </ul>
442+
*
443+
* @param tablePath The path of the target table.
444+
* @param snapshotId The unique identifier of the snapshot.
445+
* @return A future returning the specific lake snapshot.
446+
*/
447+
CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId);
448+
449+
/**
450+
* Retrieves the latest readable lake snapshot and its corresponding readable log offsets.
451+
*
452+
* <p>For Paimon DV tables, the tiered log offset may not be readable because the corresponding
453+
* data might be in the L0 layer. Using tiered offset directly can lead to data loss. This
454+
* method returns a readable snapshot (where L0 data has been compacted) and its corresponding
455+
* readable offsets, which represent safe log offsets that can be read without data loss.
456+
*
457+
* <p>For union read operations, use this method instead of {@link
458+
* #getLatestLakeSnapshot(TablePath)} to ensure data safety and avoid data loss.
459+
*
460+
* <p>Exceptions expected when calling {@code get()} on the returned future:
461+
*
462+
* <ul>
463+
* <li>{@link TableNotExistException}: If the table path is invalid.
464+
* <li>{@link LakeTableSnapshotNotExistException}: If no readable snapshot exists yet.
465+
* </ul>
466+
*
467+
* @param tablePath The path of the target table.
468+
* @return A future returning a {@link LakeSnapshot} containing the readable snapshot ID and
469+
* readable log offsets for each bucket.
470+
*/
471+
CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath);
472+
427473
/**
428474
* List offset for the specified buckets. This operation enables to find the beginning offset,
429475
* end offset as well as the offset matching a timestamp in buckets.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@
6060
import org.apache.fluss.rpc.messages.DropTableRequest;
6161
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
6262
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
63+
import org.apache.fluss.rpc.messages.GetLakeSnapshotRequest;
6364
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
64-
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
6565
import org.apache.fluss.rpc.messages.GetTableInfoRequest;
6666
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
6767
import org.apache.fluss.rpc.messages.ListAclsRequest;
@@ -383,13 +383,38 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
383383

384384
@Override
385385
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
386-
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();
386+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
387387
request.setTablePath()
388388
.setDatabaseName(tablePath.getDatabaseName())
389389
.setTableName(tablePath.getTableName());
390390

391391
return readOnlyGateway
392-
.getLatestLakeSnapshot(request)
392+
.getLakeSnapshot(request)
393+
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
394+
}
395+
396+
@Override
397+
public CompletableFuture<LakeSnapshot> getLakeSnapshot(TablePath tablePath, long snapshotId) {
398+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
399+
request.setTablePath()
400+
.setDatabaseName(tablePath.getDatabaseName())
401+
.setTableName(tablePath.getTableName());
402+
request.setSnapshotId(snapshotId);
403+
404+
return readOnlyGateway
405+
.getLakeSnapshot(request)
406+
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
407+
}
408+
409+
@Override
410+
public CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePath) {
411+
GetLakeSnapshotRequest request = new GetLakeSnapshotRequest();
412+
request.setTablePath()
413+
.setDatabaseName(tablePath.getDatabaseName())
414+
.setTableName(tablePath.getTableName());
415+
request.setReadable(true);
416+
return readOnlyGateway
417+
.getLakeSnapshot(request)
393418
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
394419
}
395420

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@
4242
import org.apache.fluss.rpc.messages.DropPartitionRequest;
4343
import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
4444
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
45+
import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse;
4546
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
46-
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
4747
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
4848
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
4949
import org.apache.fluss.rpc.messages.LookupRequest;
@@ -210,7 +210,7 @@ public static KvSnapshotMetadata toKvSnapshotMetadata(GetKvSnapshotMetadataRespo
210210
toFsPathAndFileName(response.getSnapshotFilesList()), response.getLogOffset());
211211
}
212212

213-
public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse response) {
213+
public static LakeSnapshot toLakeTableSnapshotInfo(GetLakeSnapshotResponse response) {
214214
long tableId = response.getTableId();
215215
long snapshotId = response.getSnapshotId();
216216
Map<TableBucket, Long> tableBucketsOffset =

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,11 @@ public interface CommitterInitContext {
5151
* @return the lake tiering config
5252
*/
5353
Configuration lakeTieringConfig();
54+
55+
/**
56+
* Returns the fluss config.
57+
*
58+
* @return the fluss config
59+
*/
60+
Configuration flussConfig();
5461
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.committer;
20+
21+
import org.apache.fluss.annotation.PublicEvolving;
22+
import org.apache.fluss.metadata.TableBucket;
23+
24+
import javax.annotation.Nullable;
25+
26+
import java.util.Map;
27+
import java.util.Objects;
28+
29+
/**
30+
* The result of a lake commit operation, containing the committed snapshot ID and the readable
31+
* snapshot information.
32+
*
33+
* <p>For most implementations, the readable snapshot is the same as the committed snapshot, and the
34+
* log end offsets are the same as the tiered offsets from TieringCommitOperator.
35+
*
36+
* <p>For Paimon DV tables, the readable snapshot will be different from the committed snapshot, and
37+
* the log end offsets will be different as well (based on compaction status).
38+
*
39+
* @since 0.9
40+
*/
41+
@PublicEvolving
42+
public class LakeCommitResult {
43+
44+
// -1 to enforce to keep all previous snapshots
45+
public static final Long KEEP_ALL = -1L;
46+
47+
// The snapshot ID that was just committed
48+
private final long committedSnapshotId;
49+
50+
private final boolean committedIsReadable;
51+
52+
// The earliest snapshot ID to keep, null means not to keep any previous snapshot
53+
@Nullable private final Long earliestSnapshotIDToKeep;
54+
55+
@Nullable private final ReadableSnapshot readableSnapshot;
56+
57+
private LakeCommitResult(
58+
long committedSnapshotId,
59+
boolean committedIsReadable,
60+
@Nullable ReadableSnapshot readableSnapshot,
61+
@Nullable Long earliestSnapshotIDToKeep) {
62+
this.committedSnapshotId = committedSnapshotId;
63+
this.committedIsReadable = committedIsReadable;
64+
this.readableSnapshot = readableSnapshot;
65+
this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
66+
}
67+
68+
public static LakeCommitResult committedIsReadable(long committedSnapshotId) {
69+
return new LakeCommitResult(committedSnapshotId, true, null, null);
70+
}
71+
72+
public static LakeCommitResult unknownReadableSnapshot(long committedSnapshotId) {
73+
return new LakeCommitResult(committedSnapshotId, false, null, KEEP_ALL);
74+
}
75+
76+
public static LakeCommitResult withReadableSnapshot(
77+
long committedSnapshotId,
78+
// the readable snapshot id
79+
long readableSnapshotId,
80+
// the tiered log end offset for readable snapshot
81+
Map<TableBucket, Long> tieredLogEndOffsets,
82+
// the readable log end offset for readable snapshot
83+
Map<TableBucket, Long> readableLogEndOffsets,
84+
@Nullable Long earliestSnapshotIDToKeep) {
85+
return new LakeCommitResult(
86+
committedSnapshotId,
87+
false,
88+
new ReadableSnapshot(
89+
readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets),
90+
earliestSnapshotIDToKeep);
91+
}
92+
93+
public long getCommittedSnapshotId() {
94+
return committedSnapshotId;
95+
}
96+
97+
public boolean committedIsReadable() {
98+
return committedIsReadable;
99+
}
100+
101+
@Nullable
102+
public ReadableSnapshot getReadableSnapshot() {
103+
return readableSnapshot;
104+
}
105+
106+
/**
107+
* Gets the earliest snapshot ID to keep.
108+
*
109+
* @return the earliest snapshot ID to keep
110+
*/
111+
@Nullable
112+
public Long getEarliestSnapshotIDToKeep() {
113+
return earliestSnapshotIDToKeep;
114+
}
115+
116+
/** todo. */
117+
public static class ReadableSnapshot {
118+
private final long readableSnapshotId;
119+
private final Map<TableBucket, Long> tieredLogEndOffsets;
120+
private final Map<TableBucket, Long> readableLogEndOffsets;
121+
122+
public ReadableSnapshot(
123+
Long readableSnapshotId,
124+
Map<TableBucket, Long> tieredLogEndOffsets,
125+
Map<TableBucket, Long> readableLogEndOffsets) {
126+
this.readableSnapshotId = readableSnapshotId;
127+
this.tieredLogEndOffsets = tieredLogEndOffsets;
128+
this.readableLogEndOffsets = readableLogEndOffsets;
129+
}
130+
131+
public Long getReadableSnapshotId() {
132+
return readableSnapshotId;
133+
}
134+
135+
public Map<TableBucket, Long> getTieredLogEndOffsets() {
136+
return tieredLogEndOffsets;
137+
}
138+
139+
public Map<TableBucket, Long> getReadableLogEndOffsets() {
140+
return readableLogEndOffsets;
141+
}
142+
143+
@Override
144+
public boolean equals(Object o) {
145+
if (this == o) {
146+
return true;
147+
}
148+
if (o == null || getClass() != o.getClass()) {
149+
return false;
150+
}
151+
ReadableSnapshot that = (ReadableSnapshot) o;
152+
return readableSnapshotId == that.readableSnapshotId
153+
&& Objects.equals(tieredLogEndOffsets, that.tieredLogEndOffsets)
154+
&& Objects.equals(readableLogEndOffsets, that.readableLogEndOffsets);
155+
}
156+
157+
@Override
158+
public int hashCode() {
159+
return Objects.hash(readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets);
160+
}
161+
}
162+
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable
5656
*
5757
* @param committable the committable object
5858
* @param snapshotProperties the properties that lake supported to store in snapshot
59-
* @return the committed snapshot ID
59+
* @return the commit result, which always includes the latest committed snapshot ID and may
60+
* optionally contain distinct readable snapshot information if the physical tiered offsets
61+
* do not yet represent a consistent readable state (e.g., in Paimon DV tables where the
62+
* tiered log records may still in level0 which is not readable).
6063
* @throws IOException if an I/O error occurs
6164
*/
62-
long commit(CommittableT committable, Map<String, String> snapshotProperties)
65+
LakeCommitResult commit(CommittableT committable, Map<String, String> snapshotProperties)
6366
throws IOException;
6467

6568
/**

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public LakeSplitGenerator(
8787
public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
8888
LakeSnapshot lakeSnapshotInfo;
8989
try {
90-
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
90+
lakeSnapshotInfo = flussAdmin.getReadableLakeSnapshot(tableInfo.getTablePath()).get();
9191
} catch (Exception exception) {
9292
if (ExceptionUtils.stripExecutionException(exception)
9393
instanceof LakeTableSnapshotNotExistException) {
@@ -119,7 +119,7 @@ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
119119
lakeSplits, isLogTable, tableBucketsOffset, partitionNameById);
120120
} else {
121121
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
122-
lakeSplits.values().iterator().next();
122+
lakeSplits.isEmpty() ? null : lakeSplits.values().iterator().next();
123123
// non-partitioned table
124124
return generateNoPartitionedTableSplit(
125125
nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
@@ -307,7 +307,7 @@ private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
307307
}
308308

309309
private List<SourceSplitBase> generateNoPartitionedTableSplit(
310-
Map<Integer, List<LakeSplit>> lakeSplits,
310+
@Nullable Map<Integer, List<LakeSplit>> lakeSplits,
311311
boolean isLogTable,
312312
Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
313313
// iterate all bucket

0 commit comments

Comments
 (0)