Skip to content

Commit 72f9e61

Browse files
committed
address baiye's comments
1 parent 2c2d22e commit 72f9e61

File tree

6 files changed

+276
-33
lines changed

6 files changed

+276
-33
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ public class MetricNames {
7373
public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
7474
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
7575
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
76+
77+
/**
78+
* Total physical storage size of standby replica snapshots on this tablet server.
79+
*
80+
* <p>Standby replicas maintain downloaded KV snapshots for fast leader failover. This metric
81+
* tracks the total size of all KV snapshot data downloaded by standby replicas on this server.
82+
*/
7683
public static final String SERVER_PHYSICAL_STORAGE_STANDBY_SIZE = "standbySize";
7784

7885
// --------------------------------------------------------------------------------------------

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotManager.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ public void becomeLeader() {
137137

138138
public void becomeFollower() {
139139
isLeader = false;
140+
// Clear standby download cache when leaving standby role
141+
clearStandbyDownloadCache();
140142
}
141143

142144
public void becomeStandby() {
@@ -153,6 +155,24 @@ public void becomeStandby() {
153155
return downloadedMiscFiles;
154156
}
155157

158+
/**
159+
* Clear the standby download cache.
160+
*
161+
* <p>This method should be called when a replica leaves the standby role (becomes a regular
162+
* follower or leader). It clears the cached state of downloaded SST files, misc files, and
163+
* snapshot size. This ensures that if the replica becomes standby again later, it will perform
164+
* a fresh download based on the actual local files, rather than reusing stale cache that
165+
* references deleted files.
166+
*/
167+
private void clearStandbyDownloadCache() {
168+
downloadedSstFiles = null;
169+
downloadedMiscFiles = null;
170+
standbySnapshotSize = 0;
171+
LOG.info(
172+
"Cleared standby download cache for table bucket {}, will reload from local files on next standby promotion",
173+
tableBucket);
174+
}
175+
156176
/**
157177
* The guardedExecutor is an executor that uses to trigger upload snapshot.
158178
*

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.fluss.server.zk.data.LeaderAndIsr;
8686
import org.apache.fluss.server.zk.data.ZkData;
8787
import org.apache.fluss.utils.CloseableRegistry;
88+
import org.apache.fluss.utils.FileUtils;
8889
import org.apache.fluss.utils.FlussPaths;
8990
import org.apache.fluss.utils.IOUtils;
9091
import org.apache.fluss.utils.MapUtils;
@@ -98,6 +99,7 @@
9899
import javax.annotation.concurrent.ThreadSafe;
99100

100101
import java.io.Closeable;
102+
import java.io.File;
101103
import java.io.IOException;
102104
import java.nio.file.Path;
103105
import java.util.ArrayList;
@@ -249,7 +251,8 @@ public Replica(
249251
this.kvSnapshotManager =
250252
KvSnapshotManager.create(
251253
tableBucket,
252-
kvManager.getOrCreateTabletDir(physicalPath, tableBucket),
254+
kvManager.getOrCreateTabletDir(
255+
physicalPath, tableBucket), // kv tablet dir always built.
253256
snapshotContext,
254257
clock);
255258
}
@@ -583,10 +586,14 @@ private void onBecomeNewFollower(int standbyReplica) {
583586
// to be new follower.
584587
kvSnapshotManager.becomeFollower();
585588
if (wasStandby || wasLeader) {
586-
// standby -> leader or leader -> leader
589+
// standby -> follower or leader -> follower
587590
dropKv();
588591
}
589592

593+
if (wasStandby) {
594+
isStandbyReplica = false;
595+
}
596+
590597
// follower -> follower: do nothing.
591598
}
592599
}
@@ -692,6 +699,25 @@ private void dropKv() {
692699
checkNotNull(kvManager);
693700
kvManager.dropKv(tableBucket);
694701
kvTablet = null;
702+
} else {
703+
// For standby replicas, kvTablet is null, so we need to manually delete the
704+
// downloaded snapshot files and the tablet directory.
705+
// For follower replicas, we need to delete the empty directory.
706+
checkNotNull(kvSnapshotManager);
707+
File tabletDir = kvSnapshotManager.getTabletDir();
708+
try {
709+
FileUtils.deleteDirectory(tabletDir);
710+
LOG.info(
711+
"Deleted standby replica snapshot directory {} for table bucket {}",
712+
tabletDir,
713+
tableBucket);
714+
} catch (IOException e) {
715+
LOG.warn(
716+
"Failed to delete standby replica snapshot directory {} for table bucket {}",
717+
tabletDir,
718+
tableBucket,
719+
e);
720+
}
695721
}
696722
}
697723

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -788,35 +788,48 @@ public void notifyRemoteLogOffsets(
788788
public void notifyKvSnapshotOffset(
789789
NotifyKvSnapshotOffsetData notifyKvSnapshotOffsetData,
790790
Consumer<NotifyKvSnapshotOffsetResponse> responseCallback) {
791-
inLock(
792-
replicaStateChangeLock,
793-
() -> {
794-
// check or apply coordinator epoch.
795-
validateAndApplyCoordinatorEpoch(
796-
notifyKvSnapshotOffsetData.getCoordinatorEpoch(),
797-
"notifyKvSnapshotOffset");
798-
// update the snapshot offset.
799-
TableBucket tb = notifyKvSnapshotOffsetData.getTableBucket();
800-
Replica replica = getReplicaOrException(tb);
801-
if (notifyKvSnapshotOffsetData.getSnapshotId() != null) {
802-
// try to download the snapshot for standby replica.
791+
TableBucket tb = notifyKvSnapshotOffsetData.getTableBucket();
792+
Long snapshotId = notifyKvSnapshotOffsetData.getSnapshotId();
793+
long minRetainOffset = notifyKvSnapshotOffsetData.getMinRetainOffset();
794+
795+
// Validate and get replica under lock
796+
Replica replica =
797+
inLock(
798+
replicaStateChangeLock,
799+
() -> {
800+
// check or apply coordinator epoch.
801+
validateAndApplyCoordinatorEpoch(
802+
notifyKvSnapshotOffsetData.getCoordinatorEpoch(),
803+
"notifyKvSnapshotOffset");
804+
return getReplicaOrException(tb);
805+
});
806+
807+
// Respond immediately to avoid blocking coordinator
808+
responseCallback.accept(new NotifyKvSnapshotOffsetResponse());
809+
810+
// Download snapshot asynchronously outside the lock to avoid blocking
811+
// leader/follower transitions and other state changes
812+
if (snapshotId != null) {
813+
ioExecutor.execute(
814+
() -> {
803815
try {
804-
replica.downloadSnapshot(notifyKvSnapshotOffsetData.getSnapshotId());
805-
updateMinRetainOffset(
806-
replica, notifyKvSnapshotOffsetData.getMinRetainOffset());
816+
replica.downloadSnapshot(snapshotId);
817+
updateMinRetainOffset(replica, minRetainOffset);
818+
LOG.info(
819+
"Successfully downloaded snapshot {} for standby replica {}.",
820+
snapshotId,
821+
tb);
807822
} catch (Exception e) {
808823
LOG.error(
809824
"Error downloading snapshot id {} for standby replica {}.",
810-
notifyKvSnapshotOffsetData.getSnapshotId(),
825+
snapshotId,
811826
tb,
812827
e);
813828
}
814-
} else {
815-
updateMinRetainOffset(
816-
replica, notifyKvSnapshotOffsetData.getMinRetainOffset());
817-
}
818-
responseCallback.accept(new NotifyKvSnapshotOffsetResponse());
819-
});
829+
});
830+
} else {
831+
updateMinRetainOffset(replica, minRetainOffset);
832+
}
820833
}
821834

822835
private void updateMinRetainOffset(Replica replica, long minRetainOffset) {

fluss-server/src/test/java/org/apache/fluss/server/replica/KvSnapshotITCase.java

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
3232
import org.apache.fluss.server.tablet.TabletServer;
3333
import org.apache.fluss.server.testutils.FlussClusterExtension;
34+
import org.apache.fluss.server.zk.data.LeaderAndIsr;
3435
import org.apache.fluss.utils.FlussPaths;
3536
import org.apache.fluss.utils.types.Tuple2;
3637

@@ -41,6 +42,7 @@
4142
import java.io.File;
4243
import java.time.Duration;
4344
import java.util.ArrayList;
45+
import java.util.Collections;
4446
import java.util.HashMap;
4547
import java.util.HashSet;
4648
import java.util.List;
@@ -435,6 +437,187 @@ void testStandbyIncrementalSnapshotDownload() throws Exception {
435437
assertThat(standbySnapshotManager.getStandbySnapshotSize()).isGreaterThan(0);
436438
}
437439

440+
@Test
441+
void testStandbyDemotionAndReStandby() throws Exception {
442+
TablePath tablePath = TablePath.of("test_db", "test_table_standby_demotion");
443+
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_DESCRIPTOR_PK);
444+
TableBucket tb0 = new TableBucket(tableId, 0);
445+
446+
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb0);
447+
int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
448+
449+
// put some data and wait for snapshot
450+
KvRecordBatch kvRecordBatch =
451+
genKvRecordBatch(
452+
Tuple2.of("k1", new Object[] {1, "k1"}),
453+
Tuple2.of("k2", new Object[] {2, "k2"}));
454+
PutKvRequest putKvRequest = newPutKvRequest(tableId, 0, -1, kvRecordBatch);
455+
TabletServerGateway leaderGateway =
456+
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
457+
leaderGateway.putKv(putKvRequest).get();
458+
459+
// wait for snapshot
460+
waitValue(
461+
() -> completedSnapshotHandleStore.get(tb0, 0),
462+
Duration.ofMinutes(2),
463+
"Fail to wait for snapshot 0");
464+
465+
// verify standby replica has downloaded snapshot
466+
int standbyServer = FLUSS_CLUSTER_EXTENSION.waitAndGetStandby(tb0);
467+
TabletServer standbyTs = FLUSS_CLUSTER_EXTENSION.getTabletServerById(standbyServer);
468+
Replica standbyReplica = standbyTs.getReplicaManager().getReplicaOrException(tb0);
469+
assertThat(standbyReplica.isStandby()).isTrue();
470+
471+
KvSnapshotManager standbySnapshotManager = standbyReplica.getKvSnapshotManager();
472+
retry(
473+
Duration.ofMinutes(1),
474+
() -> assertThat(standbySnapshotManager.getDownloadedSstFiles()).isNotEmpty());
475+
476+
// get current leader and isr
477+
LeaderAndIsr leaderAndIsr = FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb0);
478+
List<Integer> replicas = new ArrayList<>(leaderAndIsr.isr());
479+
480+
// demote standby to regular follower by sending notifyLeaderAndIsr without standby
481+
LeaderAndIsr newLeaderAndIsr =
482+
new LeaderAndIsr(
483+
leaderAndIsr.leader(),
484+
leaderAndIsr.leaderEpoch() + 1,
485+
leaderAndIsr.isr(),
486+
Collections.emptyList(), // no standby replicas
487+
leaderAndIsr.coordinatorEpoch(),
488+
leaderAndIsr.bucketEpoch());
489+
490+
FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
491+
standbyServer, tablePath, tb0, newLeaderAndIsr, replicas);
492+
493+
// verify the replica is no longer standby
494+
retry(
495+
Duration.ofMinutes(1),
496+
() -> {
497+
Replica replica = standbyTs.getReplicaManager().getReplicaOrException(tb0);
498+
assertThat(replica.isStandby()).isFalse();
499+
// verify kv tablet is dropped when demoted from standby
500+
assertThat(replica.getKvTablet()).isNull();
501+
// verify standby download cache is cleared
502+
KvSnapshotManager snapshotManager = replica.getKvSnapshotManager();
503+
assertThat(snapshotManager.getDownloadedSstFiles()).isNull();
504+
assertThat(snapshotManager.getDownloadedMiscFiles()).isNull();
505+
assertThat(snapshotManager.getStandbySnapshotSize()).isEqualTo(0);
506+
});
507+
508+
// put more data and create another snapshot
509+
kvRecordBatch =
510+
genKvRecordBatch(
511+
Tuple2.of("k3", new Object[] {3, "k3"}),
512+
Tuple2.of("k4", new Object[] {4, "k4"}));
513+
putKvRequest = newPutKvRequest(tableId, 0, 1, kvRecordBatch);
514+
leaderGateway.putKv(putKvRequest).get();
515+
516+
waitValue(
517+
() -> completedSnapshotHandleStore.get(tb0, 1),
518+
Duration.ofMinutes(2),
519+
"Fail to wait for snapshot 1");
520+
521+
// re-promote the replica back to standby by sending notifyLeaderAndIsr with standby
522+
LeaderAndIsr reStandbyLeaderAndIsr =
523+
new LeaderAndIsr(
524+
newLeaderAndIsr.leader(),
525+
newLeaderAndIsr.leaderEpoch() + 1,
526+
newLeaderAndIsr.isr(),
527+
Collections.singletonList(standbyServer),
528+
newLeaderAndIsr.coordinatorEpoch(),
529+
newLeaderAndIsr.bucketEpoch()); // re-add as standby
530+
531+
FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
532+
standbyServer, tablePath, tb0, reStandbyLeaderAndIsr, replicas);
533+
534+
// verify the replica is standby again and downloads the latest snapshot
535+
retry(
536+
Duration.ofMinutes(1),
537+
() -> {
538+
Replica replica = standbyTs.getReplicaManager().getReplicaOrException(tb0);
539+
assertThat(replica.isStandby()).isTrue();
540+
KvSnapshotManager snapshotManager = replica.getKvSnapshotManager();
541+
assertThat(snapshotManager).isNotNull();
542+
assertThat(snapshotManager.getDownloadedSstFiles()).isNotEmpty();
543+
// verify it has the latest snapshot data
544+
assertThat(snapshotManager.getStandbySnapshotSize()).isGreaterThan(0);
545+
});
546+
}
547+
548+
@Test
549+
void testStandbySnapshotDownloadFailureAndRecovery() throws Exception {
550+
TablePath tablePath = TablePath.of("test_db", "test_table_download_failure");
551+
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_DESCRIPTOR_PK);
552+
TableBucket tb0 = new TableBucket(tableId, 0);
553+
554+
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb0);
555+
int leaderServer = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb0);
556+
557+
// put data and create snapshot
558+
KvRecordBatch kvRecordBatch =
559+
genKvRecordBatch(
560+
Tuple2.of("k1", new Object[] {1, "k1"}),
561+
Tuple2.of("k2", new Object[] {2, "k2"}));
562+
PutKvRequest putKvRequest = newPutKvRequest(tableId, 0, -1, kvRecordBatch);
563+
TabletServerGateway leaderGateway =
564+
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
565+
leaderGateway.putKv(putKvRequest).get();
566+
567+
// wait for first snapshot
568+
CompletedSnapshot snapshot0 =
569+
waitValue(
570+
() -> completedSnapshotHandleStore.get(tb0, 0),
571+
Duration.ofMinutes(2),
572+
"Fail to wait for snapshot 0")
573+
.retrieveCompleteSnapshot();
574+
575+
// get standby server and verify it's marked as standby even if download fails initially
576+
int standbyServer = FLUSS_CLUSTER_EXTENSION.waitAndGetStandby(tb0);
577+
TabletServer standbyTs = FLUSS_CLUSTER_EXTENSION.getTabletServerById(standbyServer);
578+
Replica standbyReplica = standbyTs.getReplicaManager().getReplicaOrException(tb0);
579+
580+
// verify replica is marked as standby (even if download might fail)
581+
retry(Duration.ofMinutes(1), () -> assertThat(standbyReplica.isStandby()).isTrue());
582+
583+
// wait for standby to eventually download snapshot (may retry on failures)
584+
KvSnapshotManager standbySnapshotManager = standbyReplica.getKvSnapshotManager();
585+
assertThat(standbySnapshotManager).isNotNull();
586+
587+
// verify snapshot is eventually downloaded successfully
588+
retry(
589+
Duration.ofMinutes(1),
590+
() -> {
591+
assertThat(standbySnapshotManager.getDownloadedSstFiles()).isNotEmpty();
592+
assertThat(standbySnapshotManager.getStandbySnapshotSize())
593+
.isEqualTo(snapshot0.getSnapshotSize());
594+
});
595+
596+
// create another snapshot to verify recovery continues to work
597+
kvRecordBatch =
598+
genKvRecordBatch(
599+
Tuple2.of("k3", new Object[] {3, "k3"}),
600+
Tuple2.of("k4", new Object[] {4, "k4"}));
601+
putKvRequest = newPutKvRequest(tableId, 0, 1, kvRecordBatch);
602+
leaderGateway.putKv(putKvRequest).get();
603+
604+
CompletedSnapshot snapshot1 =
605+
waitValue(
606+
() -> completedSnapshotHandleStore.get(tb0, 1),
607+
Duration.ofMinutes(2),
608+
"Fail to wait for snapshot 1")
609+
.retrieveCompleteSnapshot();
610+
611+
// verify standby continues to download new snapshots after recovery
612+
retry(
613+
Duration.ofMinutes(1),
614+
() -> {
615+
assertThat(standbySnapshotManager.getDownloadedSstFiles()).isNotEmpty();
616+
assertThat(standbySnapshotManager.getStandbySnapshotSize())
617+
.isEqualTo(snapshot1.getSnapshotSize());
618+
});
619+
}
620+
438621
private void checkDirsDeleted(Set<File> bucketDirs, Map<Long, TablePath> tablePathMap) {
439622
for (File bucketDir : bucketDirs) {
440623
retry(Duration.ofMinutes(1), () -> assertThat(bucketDir.exists()).isFalse());

0 commit comments

Comments
 (0)