Skip to content

Commit ee16bba

Browse files
committed
[kv] Support download latest kv snapshot from remote while becoming standby replica
1 parent cdcf9d4 commit ee16bba

30 files changed

+2860
-1009
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ public class MetricNames {
8484
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
8585
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
8686

87+
/**
88+
* Total physical storage size of standby replica snapshots on this tablet server.
89+
*
90+
* <p>Standby replicas maintain downloaded KV snapshots for fast leader failover. This metric
91+
* tracks the total size of all KV snapshot data downloaded by standby replicas on this server.
92+
*/
93+
public static final String SERVER_PHYSICAL_STORAGE_STANDBY_SIZE = "standbySize";
94+
8795
// --------------------------------------------------------------------------------------------
8896
// metrics for user
8997
// --------------------------------------------------------------------------------------------

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ message NotifyKvSnapshotOffsetRequest {
365365
required int32 bucket_id = 3;
366366
required int32 coordinator_epoch = 4;
367367
required int64 min_retain_offset = 5;
368+
optional int64 snapshot_id = 6;
368369
}
369370

370371
message NotifyKvSnapshotOffsetResponse {

fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws T
173173
* @param tableBucket the table bucket
174174
* @return the tablet directory
175175
*/
176-
protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
176+
public File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
177177
File tabletDir = getTabletDir(tablePath, tableBucket);
178178
if (tabletDir.exists()) {
179179
return tabletDir;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,7 +1741,9 @@ private void tryProcessCommitKvSnapshot(
17411741
completedSnapshotStore.add(completedSnapshot);
17421742
coordinatorEventManager.put(
17431743
new NotifyKvSnapshotOffsetEvent(
1744-
tb, completedSnapshot.getLogOffset()));
1744+
tb,
1745+
completedSnapshot.getLogOffset(),
1746+
completedSnapshot.getSnapshotID()));
17451747
callback.complete(new CommitKvSnapshotResponse());
17461748
} catch (Exception e) {
17471749
callback.completeExceptionally(e);
@@ -1751,7 +1753,6 @@ private void tryProcessCommitKvSnapshot(
17511753

17521754
private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent event) {
17531755
TableBucket tb = event.getTableBucket();
1754-
long logOffset = event.getLogOffset();
17551756
coordinatorRequestBatch.newBatch();
17561757
coordinatorContext
17571758
.getBucketLeaderAndIsr(tb)
@@ -1762,7 +1763,8 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
17621763
coordinatorContext.getFollowers(
17631764
tb, leaderAndIsr.leader()),
17641765
tb,
1765-
logOffset));
1766+
event.getLogOffset(),
1767+
event.getSnapshotId()));
17661768
coordinatorRequestBatch.sendNotifyKvSnapshotOffsetRequest(
17671769
coordinatorContext.getCoordinatorEpoch());
17681770
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,15 +372,18 @@ public void addNotifyRemoteLogOffsetsRequestForTabletServers(
372372
}
373373

374374
public void addNotifyKvSnapshotOffsetRequestForTabletServers(
375-
List<Integer> tabletServers, TableBucket tableBucket, long minRetainOffset) {
375+
List<Integer> tabletServers,
376+
TableBucket tableBucket,
377+
long minRetainOffset,
378+
long snapshotId) {
376379
tabletServers.stream()
377380
.filter(s -> s >= 0)
378381
.forEach(
379382
id ->
380383
notifyKvSnapshotOffsetRequestMap.put(
381384
id,
382385
makeNotifyKvSnapshotOffsetRequest(
383-
tableBucket, minRetainOffset)));
386+
tableBucket, minRetainOffset, snapshotId)));
384387
}
385388

386389
public void addNotifyLakeTableOffsetRequestForTableServers(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ public class NotifyKvSnapshotOffsetEvent implements CoordinatorEvent {
2525

2626
private final TableBucket tableBucket;
2727
private final long logOffset;
28+
private final long snapshotId;
2829

29-
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset) {
30+
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset, long snapshotId) {
3031
this.tableBucket = tableBucket;
3132
this.logOffset = logOffset;
33+
this.snapshotId = snapshotId;
3234
}
3335

3436
public TableBucket getTableBucket() {
@@ -38,4 +40,8 @@ public TableBucket getTableBucket() {
3840
public long getLogOffset() {
3941
return logOffset;
4042
}
43+
44+
public long getSnapshotId() {
45+
return snapshotId;
46+
}
4147
}

fluss-server/src/main/java/org/apache/fluss/server/entity/NotifyKvSnapshotOffsetData.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,24 @@
2020
import org.apache.fluss.metadata.TableBucket;
2121
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
2222

23+
import javax.annotation.Nullable;
24+
2325
/** The data for request {@link NotifyRemoteLogOffsetsRequest}. */
2426
public class NotifyKvSnapshotOffsetData {
2527
private final TableBucket tableBucket;
2628
private final long minRetainOffset;
2729
private final int coordinatorEpoch;
30+
private final @Nullable Long snapshotId;
2831

2932
public NotifyKvSnapshotOffsetData(
30-
TableBucket tableBucket, long minRetainOffset, int coordinatorEpoch) {
33+
TableBucket tableBucket,
34+
long minRetainOffset,
35+
int coordinatorEpoch,
36+
@Nullable Long snapshotId) {
3137
this.tableBucket = tableBucket;
3238
this.minRetainOffset = minRetainOffset;
3339
this.coordinatorEpoch = coordinatorEpoch;
40+
this.snapshotId = snapshotId;
3441
}
3542

3643
public TableBucket getTableBucket() {
@@ -45,6 +52,11 @@ public int getCoordinatorEpoch() {
4552
return coordinatorEpoch;
4653
}
4754

55+
@Nullable
56+
public Long getSnapshotId() {
57+
return snapshotId;
58+
}
59+
4860
@Override
4961
public String toString() {
5062
return "NotifyRemoteLogOffsetsData{"
@@ -54,6 +66,8 @@ public String toString() {
5466
+ minRetainOffset
5567
+ ", coordinatorEpoch="
5668
+ coordinatorEpoch
69+
+ ", snapshotId="
70+
+ snapshotId
5771
+ '}';
5872
}
5973
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -307,39 +307,50 @@ public Optional<KvTablet> getKv(TableBucket tableBucket) {
307307
return Optional.ofNullable(currentKvs.get(tableBucket));
308308
}
309309

310-
public void dropKv(TableBucket tableBucket) {
311-
KvTablet dropKvTablet =
310+
public void closeOrDropKv(TableBucket tableBucket, boolean needDrop) {
311+
KvTablet removeTablet =
312312
inLock(tabletCreationOrDeletionLock, () -> currentKvs.remove(tableBucket));
313313

314-
if (dropKvTablet != null) {
315-
TablePath tablePath = dropKvTablet.getTablePath();
314+
if (removeTablet != null) {
315+
TablePath tablePath = removeTablet.getTablePath();
316316
try {
317-
dropKvTablet.drop();
318-
if (dropKvTablet.getPartitionName() == null) {
319-
LOG.info(
320-
"Deleted kv bucket {} for table {} in file path {}.",
321-
tableBucket.getBucket(),
322-
tablePath,
323-
dropKvTablet.getKvTabletDir().getAbsolutePath());
317+
if (needDrop) {
318+
removeTablet.drop();
319+
if (removeTablet.getPartitionName() == null) {
320+
LOG.info(
321+
"Deleted kv bucket {} for table {} in file path {}.",
322+
tableBucket.getBucket(),
323+
tablePath,
324+
removeTablet.getKvTabletDir().getAbsolutePath());
325+
} else {
326+
LOG.info(
327+
"Deleted kv bucket {} for the partition {} of table {} in file path {}.",
328+
tableBucket.getBucket(),
329+
removeTablet.getPartitionName(),
330+
tablePath,
331+
removeTablet.getKvTabletDir().getAbsolutePath());
332+
}
324333
} else {
334+
removeTablet.close();
325335
LOG.info(
326-
"Deleted kv bucket {} for the partition {} of table {} in file path {}.",
327-
tableBucket.getBucket(),
328-
dropKvTablet.getPartitionName(),
329-
tablePath,
330-
dropKvTablet.getKvTabletDir().getAbsolutePath());
336+
"Closed kvTablet and released RocksDB lock for bucket {} without deleting files",
337+
tableBucket);
331338
}
332339
} catch (Exception e) {
333340
throw new KvStorageException(
334341
String.format(
335-
"Exception while deleting kv for table %s, bucket %s in dir %s.",
342+
"Exception while deleting or closing kv for table %s, bucket %s in dir %s, needDrop: %s.",
336343
tablePath,
337344
tableBucket.getBucket(),
338-
dropKvTablet.getKvTabletDir().getAbsolutePath()),
345+
removeTablet.getKvTabletDir().getAbsolutePath(),
346+
needDrop),
339347
e);
340348
}
341349
} else {
342-
LOG.warn("Fail to delete kv bucket {}.", tableBucket.getBucket());
350+
LOG.warn(
351+
"Fail to delete or close kv bucket {}, needDrop: {}.",
352+
tableBucket.getBucket(),
353+
needDrop);
343354
}
344355
}
345356

fluss-server/src/main/java/org/apache/fluss/server/kv/KvSnapshotResource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.kv;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.config.ConfigOptions;
2122
import org.apache.fluss.config.Configuration;
2223
import org.apache.fluss.server.kv.snapshot.KvSnapshotDataDownloader;
@@ -52,7 +53,8 @@ public class KvSnapshotResource {
5253
/** A downloader to download snapshot data. */
5354
private final KvSnapshotDataDownloader kvSnapshotDataDownloader;
5455

55-
private KvSnapshotResource(
56+
@VisibleForTesting
57+
public KvSnapshotResource(
5658
ScheduledExecutorService kvSnapshotScheduler,
5759
KvSnapshotDataUploader kvSnapshotDataUploader,
5860
KvSnapshotDataDownloader kvSnapshotDataDownloader,

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,17 @@ public CompletedSnapshot(
114114
this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0, null, null);
115115
}
116116

117+
public CompletedSnapshot getIncrementalSnapshot(KvSnapshotHandle newKvSnapshotHandle) {
118+
return new CompletedSnapshot(
119+
tableBucket,
120+
snapshotID,
121+
snapshotLocation,
122+
newKvSnapshotHandle,
123+
logOffset,
124+
rowCount,
125+
autoIncIDRanges);
126+
}
127+
117128
public long getSnapshotID() {
118129
return snapshotID;
119130
}
@@ -184,6 +195,10 @@ private void disposeSnapshotStorage() throws IOException {
184195
}
185196
}
186197

198+
public CompletedSnapshotHandle getCompletedSnapshotHandle() {
199+
return new CompletedSnapshotHandle(snapshotID, getMetadataFilePath(), logOffset);
200+
}
201+
187202
/**
188203
* Return the metadata file path that stores all the information that describes the snapshot.
189204
*/

0 commit comments

Comments
 (0)