Skip to content

Commit a316c3d

Browse files
authored
[lake] Record a file path storing log offsets in lake snapshot property (#2223)
1 parent 7fad656 commit a316c3d

File tree

42 files changed

+1734
-1274
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1734
-1274
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@
3636
@PublicEvolving
3737
public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable {
3838

39+
/**
40+
* The property key used to store the file path of lake table bucket offsets in snapshot
41+
* properties.
42+
*/
43+
String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = "fluss-offsets";
44+
3945
/**
4046
* Converts a list of write results to a committable object.
4147
*

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -710,14 +710,14 @@ public static FsPath remoteLakeTableSnapshotDir(
710710
* <p>The path contract:
711711
*
712712
* <pre>
713-
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{uuid}.manifest
713+
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{UUID}.offsets
714714
* </pre>
715715
*/
716-
public static FsPath remoteLakeTableSnapshotManifestPath(
716+
public static FsPath remoteLakeTableSnapshotOffsetPath(
717717
String remoteDataDir, TablePath tablePath, long tableId) {
718718
return new FsPath(
719719
String.format(
720-
"%s/metadata/%s.manifest",
720+
"%s/metadata/%s.offsets",
721721
remoteLakeTableSnapshotDir(remoteDataDir, tablePath, tableId),
722722
UUID.randomUUID()));
723723
}

fluss-common/src/main/java/org/apache/fluss/utils/json/BucketOffsetJsonSerde.java

Lines changed: 0 additions & 62 deletions
This file was deleted.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.utils.json;
20+
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Map;
24+
import java.util.Objects;
25+
26+
/**
27+
* Represents the offsets for all buckets of a table. This class stores the mapping from {@link
28+
* TableBucket} to their corresponding offsets.
29+
*
30+
* <p>This class is used to track the log end offsets for each bucket in a table. It supports both
31+
* non-partitioned tables (where buckets are identified only by bucket id) and partitioned tables
32+
* (where buckets are identified by partition id and bucket id).
33+
*
34+
* <p>The offsets map contains entries for each bucket that has a valid offset. Missing buckets are
35+
* not included in the map.
36+
*
37+
* @see TableBucketOffsetsJsonSerde for JSON serialization and deserialization.
38+
*/
39+
public class TableBucketOffsets {
40+
41+
/** The table ID that all buckets belong to. */
42+
private final long tableId;
43+
44+
/**
45+
* The mapping from {@link TableBucket} to their offsets. The map contains entries only for
46+
* buckets that have valid offsets.
47+
*/
48+
private final Map<TableBucket, Long> offsets;
49+
50+
/**
51+
* Creates a new {@link TableBucketOffsets} instance.
52+
*
53+
* @param tableId the table ID that all buckets belong to
54+
* @param offsets the mapping from {@link TableBucket} to their offsets
55+
*/
56+
public TableBucketOffsets(long tableId, Map<TableBucket, Long> offsets) {
57+
this.tableId = tableId;
58+
this.offsets = offsets;
59+
}
60+
61+
/**
62+
* Returns the table ID that all buckets belong to.
63+
*
64+
* @return the table ID
65+
*/
66+
public long getTableId() {
67+
return tableId;
68+
}
69+
70+
/**
71+
* Returns the mapping from {@link TableBucket} to their offsets.
72+
*
73+
* @return the offsets map
74+
*/
75+
public Map<TableBucket, Long> getOffsets() {
76+
return offsets;
77+
}
78+
79+
/**
80+
* Serialize to a JSON byte array.
81+
*
82+
* @see TableBucketOffsetsJsonSerde
83+
*/
84+
public byte[] toJsonBytes() {
85+
return JsonSerdeUtils.writeValueAsBytes(this, TableBucketOffsetsJsonSerde.INSTANCE);
86+
}
87+
88+
/**
89+
* Deserialize from JSON byte array to an instance of {@link TableBucketOffsets}.
90+
*
91+
* @see TableBucketOffsets
92+
*/
93+
public static TableBucketOffsets fromJsonBytes(byte[] json) {
94+
return JsonSerdeUtils.readValue(json, TableBucketOffsetsJsonSerde.INSTANCE);
95+
}
96+
97+
@Override
98+
public boolean equals(Object o) {
99+
if (this == o) {
100+
return true;
101+
}
102+
if (o == null || getClass() != o.getClass()) {
103+
return false;
104+
}
105+
TableBucketOffsets that = (TableBucketOffsets) o;
106+
return tableId == that.tableId && Objects.equals(offsets, that.offsets);
107+
}
108+
109+
@Override
110+
public int hashCode() {
111+
return Objects.hash(tableId, offsets);
112+
}
113+
114+
@Override
115+
public String toString() {
116+
return "TableBucketOffsets{" + "tableId=" + tableId + ", offsets=" + offsets + '}';
117+
}
118+
}

0 commit comments

Comments
 (0)