Skip to content

Commit 42fb185

Browse files
committed
[kv] PrimaryKey table support standby replica to reduce recovery time
1 parent 8eaad3b commit 42fb185

File tree

53 files changed

+2057
-769
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2057
-769
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class MetricNames {
7373
public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
7474
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
7575
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
76+
public static final String SERVER_PHYSICAL_STORAGE_STANDBY_SIZE = "standbySize";
7677

7778
// --------------------------------------------------------------------------------------------
7879
// metrics for user

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ message NotifyKvSnapshotOffsetRequest {
335335
required int32 bucket_id = 3;
336336
required int32 coordinator_epoch = 4;
337337
required int64 min_retain_offset = 5;
338+
optional int64 snapshot_id = 6;
338339
}
339340

340341
message NotifyKvSnapshotOffsetResponse {
@@ -796,6 +797,7 @@ message PbAdjustIsrReqForBucket {
796797
repeated int32 new_isr = 4 [packed = true];
797798
required int32 coordinator_epoch = 5;
798799
required int32 bucket_epoch = 6;
800+
repeated int32 standby_replicas = 7 [packed = true];
799801
}
800802

801803
message PbAdjustIsrRespForTable {
@@ -813,6 +815,7 @@ message PbAdjustIsrRespForBucket {
813815
repeated int32 isr = 7 [packed = true];
814816
optional int32 bucket_epoch = 8;
815817
optional int32 coordinator_epoch = 9;
818+
repeated int32 standby_replicas = 10 [packed = true];
816819
}
817820

818821
message PbListOffsetsRespForBucket {
@@ -833,6 +836,7 @@ message PbNotifyLeaderAndIsrReqForBucket {
833836
repeated int32 replicas = 5 [packed = true];
834837
repeated int32 isr = 6 [packed = true];
835838
required int32 bucket_epoch = 7;
839+
repeated int32 standby_replicas = 8 [packed = true];
836840
}
837841

838842
message PbNotifyLeaderAndIsrRespForBucket {

fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws T
173173
* @param tableBucket the table bucket
174174
* @return the tablet directory
175175
*/
176-
protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
176+
public File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
177177
File tabletDir = getTabletDir(tablePath, tableBucket);
178178
if (tabletDir.exists()) {
179179
return tabletDir;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1570,6 +1570,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
15701570
// TODO: reject the request if there is a replica in ISR is not online,
15711571
// see KIP-841.
15721572
tryAdjustLeaderAndIsr.isr(),
1573+
tryAdjustLeaderAndIsr.standbyReplicas(),
15731574
coordinatorContext.getCoordinatorEpoch(),
15741575
currentLeaderAndIsr.bucketEpoch() + 1);
15751576
newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
@@ -1692,7 +1693,9 @@ private void tryProcessCommitKvSnapshot(
16921693
completedSnapshotStore.add(completedSnapshot);
16931694
coordinatorEventManager.put(
16941695
new NotifyKvSnapshotOffsetEvent(
1695-
tb, completedSnapshot.getLogOffset()));
1696+
tb,
1697+
completedSnapshot.getLogOffset(),
1698+
completedSnapshot.getSnapshotID()));
16961699
callback.complete(new CommitKvSnapshotResponse());
16971700
} catch (Exception e) {
16981701
callback.completeExceptionally(e);
@@ -1702,7 +1705,6 @@ private void tryProcessCommitKvSnapshot(
17021705

17031706
private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent event) {
17041707
TableBucket tb = event.getTableBucket();
1705-
long logOffset = event.getLogOffset();
17061708
coordinatorRequestBatch.newBatch();
17071709
coordinatorContext
17081710
.getBucketLeaderAndIsr(tb)
@@ -1713,7 +1715,8 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
17131715
coordinatorContext.getFollowers(
17141716
tb, leaderAndIsr.leader()),
17151717
tb,
1716-
logOffset));
1718+
event.getLogOffset(),
1719+
event.getSnapshotId()));
17171720
coordinatorRequestBatch.sendNotifyKvSnapshotOffsetRequest(
17181721
coordinatorContext.getCoordinatorEpoch());
17191722
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,15 +364,18 @@ public void addNotifyRemoteLogOffsetsRequestForTabletServers(
364364
}
365365

366366
public void addNotifyKvSnapshotOffsetRequestForTabletServers(
367-
List<Integer> tabletServers, TableBucket tableBucket, long minRetainOffset) {
367+
List<Integer> tabletServers,
368+
TableBucket tableBucket,
369+
long minRetainOffset,
370+
long snapshotId) {
368371
tabletServers.stream()
369372
.filter(s -> s >= 0)
370373
.forEach(
371374
id ->
372375
notifyKvSnapshotOffsetRequestMap.put(
373376
id,
374377
makeNotifyKvSnapshotOffsetRequest(
375-
tableBucket, minRetainOffset)));
378+
tableBucket, minRetainOffset, snapshotId)));
376379
}
377380

378381
public void addNotifyLakeTableOffsetRequestForTableServers(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ public class NotifyKvSnapshotOffsetEvent implements CoordinatorEvent {
2525

2626
private final TableBucket tableBucket;
2727
private final long logOffset;
28+
private final long snapshotId;
2829

29-
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset) {
30+
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset, long snapshotId) {
3031
this.tableBucket = tableBucket;
3132
this.logOffset = logOffset;
33+
this.snapshotId = snapshotId;
3234
}
3335

3436
public TableBucket getTableBucket() {
@@ -38,4 +40,8 @@ public TableBucket getTableBucket() {
3840
public long getLogOffset() {
3941
return logOffset;
4042
}
43+
44+
public long getSnapshotId() {
45+
return snapshotId;
46+
}
4147
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElection.java

Lines changed: 165 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.server.zk.data.LeaderAndIsr;
2222

2323
import java.util.ArrayList;
24+
import java.util.Collections;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Optional;
@@ -41,23 +42,25 @@ public static class DefaultLeaderElection extends ReplicaLeaderElection {
4142
* @param assignments the assignments
4243
* @param aliveReplicas the alive replicas
4344
* @param leaderAndIsr the original leaderAndIsr
45+
* @param isPrimaryKeyTable whether this table bucket is primary key table
4446
* @return the election result
4547
*/
4648
public Optional<ElectionResult> leaderElection(
47-
List<Integer> assignments, List<Integer> aliveReplicas, LeaderAndIsr leaderAndIsr) {
48-
// currently, we always use the first replica in assignment, which also in aliveReplicas
49-
// and
50-
// isr as the leader replica.
49+
List<Integer> assignments,
50+
List<Integer> aliveReplicas,
51+
LeaderAndIsr leaderAndIsr,
52+
boolean isPrimaryKeyTable) {
5153
List<Integer> isr = leaderAndIsr.isr();
52-
for (int assignment : assignments) {
53-
if (aliveReplicas.contains(assignment) && isr.contains(assignment)) {
54-
return Optional.of(
55-
new TableBucketStateMachine.ElectionResult(
56-
aliveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
57-
}
58-
}
59-
60-
return Optional.empty();
54+
// First we will filter out the assignment list to only contain the alive replicas and
55+
// isr.
56+
List<Integer> availableReplicas =
57+
assignments.stream()
58+
.filter(
59+
replica ->
60+
aliveReplicas.contains(replica)
61+
&& isr.contains(replica))
62+
.collect(Collectors.toList());
63+
return electLeader(availableReplicas, aliveReplicas, leaderAndIsr, isPrimaryKeyTable);
6164
}
6265
}
6366

@@ -70,32 +73,66 @@ public static class ControlledShutdownLeaderElection extends ReplicaLeaderElecti
7073
* @param aliveReplicas the alive replicas
7174
* @param leaderAndIsr the original leaderAndIsr
7275
* @param shutdownTabletServers the shutdown tabletServers
76+
* @param isPrimaryKeyTable whether this table bucket is primary key table
7377
* @return the election result
7478
*/
7579
public Optional<ElectionResult> leaderElection(
7680
List<Integer> assignments,
7781
List<Integer> aliveReplicas,
7882
LeaderAndIsr leaderAndIsr,
79-
Set<Integer> shutdownTabletServers) {
83+
Set<Integer> shutdownTabletServers,
84+
boolean isPrimaryKeyTable) {
8085
List<Integer> originIsr = leaderAndIsr.isr();
8186
Set<Integer> isrSet = new HashSet<>(originIsr);
82-
for (Integer id : assignments) {
83-
if (aliveReplicas.contains(id)
84-
&& isrSet.contains(id)
85-
&& !shutdownTabletServers.contains(id)) {
86-
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
87-
newAliveReplicas.removeAll(shutdownTabletServers);
88-
List<Integer> newIsr =
89-
originIsr.stream()
90-
.filter(replica -> !shutdownTabletServers.contains(replica))
91-
.collect(Collectors.toList());
92-
return Optional.of(
93-
new ElectionResult(
94-
new ArrayList<>(newAliveReplicas),
95-
leaderAndIsr.newLeaderAndIsr(id, newIsr)));
96-
}
87+
// Filter out available replicas: alive, in ISR, and not shutting down.
88+
List<Integer> availableReplicas =
89+
assignments.stream()
90+
.filter(
91+
replica ->
92+
aliveReplicas.contains(replica)
93+
&& isrSet.contains(replica)
94+
&& !shutdownTabletServers.contains(replica))
95+
.collect(Collectors.toList());
96+
97+
if (availableReplicas.isEmpty()) {
98+
return Optional.empty();
9799
}
98-
return Optional.empty();
100+
101+
// For log table, simply use the first available replica as leader.
102+
if (!isPrimaryKeyTable) {
103+
List<Integer> newIsr =
104+
originIsr.stream()
105+
.filter(replica -> !shutdownTabletServers.contains(replica))
106+
.collect(Collectors.toList());
107+
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
108+
newAliveReplicas.removeAll(shutdownTabletServers);
109+
return Optional.of(
110+
new ElectionResult(
111+
new ArrayList<>(newAliveReplicas),
112+
leaderAndIsr.newLeaderAndIsr(
113+
availableReplicas.get(0),
114+
newIsr,
115+
Collections.emptyList())));
116+
}
117+
118+
// For PK table, elect leader and standby.
119+
LeaderAndStandby leaderAndStandby =
120+
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
121+
122+
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
123+
newAliveReplicas.removeAll(shutdownTabletServers);
124+
List<Integer> newIsr =
125+
originIsr.stream()
126+
.filter(replica -> !shutdownTabletServers.contains(replica))
127+
.collect(Collectors.toList());
128+
129+
return Optional.of(
130+
new ElectionResult(
131+
new ArrayList<>(newAliveReplicas),
132+
leaderAndIsr.newLeaderAndIsr(
133+
leaderAndStandby.leader,
134+
newIsr,
135+
leaderAndStandby.standbyReplicas)));
99136
}
100137
}
101138

@@ -108,20 +145,109 @@ public ReassignmentLeaderElection(List<Integer> newReplicas) {
108145
}
109146

110147
public Optional<ElectionResult> leaderElection(
111-
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr) {
112-
// currently, we always use the first replica in targetReplicas, which also in
148+
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr, boolean isPrimaryKeyTable) {
149+
// Currently, we always use the first replica in targetReplicas, which also in
113150
// liveReplicas and isr as the leader replica. For bucket reassignment, the first
114151
// replica is the target leader replica.
115152
List<Integer> isr = leaderAndIsr.isr();
116-
for (int assignment : newReplicas) {
117-
if (liveReplicas.contains(assignment) && isr.contains(assignment)) {
118-
return Optional.of(
119-
new ElectionResult(
120-
liveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
121-
}
122-
}
153+
List<Integer> availableReplicas =
154+
newReplicas.stream()
155+
.filter(
156+
replica ->
157+
liveReplicas.contains(replica) && isr.contains(replica))
158+
.collect(Collectors.toList());
159+
return electLeader(availableReplicas, liveReplicas, leaderAndIsr, isPrimaryKeyTable);
160+
}
161+
}
162+
163+
// ------------------------------------------------------------------------
164+
// Common election logic
165+
// ------------------------------------------------------------------------
123166

167+
private static Optional<ElectionResult> electLeader(
168+
List<Integer> availableReplicas,
169+
List<Integer> aliveReplicas,
170+
LeaderAndIsr leaderAndIsr,
171+
boolean isPrimaryKeyTable) {
172+
if (availableReplicas.isEmpty()) {
124173
return Optional.empty();
125174
}
175+
176+
List<Integer> isr = leaderAndIsr.isr();
177+
178+
// For log table, simply use the first available replica as leader.
179+
if (!isPrimaryKeyTable) {
180+
return Optional.of(
181+
new ElectionResult(
182+
aliveReplicas,
183+
leaderAndIsr.newLeaderAndIsr(
184+
availableReplicas.get(0), isr, Collections.emptyList())));
185+
}
186+
187+
// For PK table, elect leader and standby.
188+
LeaderAndStandby leaderAndStandby =
189+
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
190+
return Optional.of(
191+
new ElectionResult(
192+
aliveReplicas,
193+
leaderAndIsr.newLeaderAndIsr(
194+
leaderAndStandby.leader, isr, leaderAndStandby.standbyReplicas)));
195+
}
196+
197+
/**
198+
* Elect leader and standby for PK table.
199+
*
200+
* <p>Election strategy:
201+
*
202+
* <ul>
203+
* <li>If current standby exists and is available, promote it to leader
204+
* <li>Otherwise, use the first available replica as leader
205+
* <li>Select new standby from remaining available replicas (if any)
206+
* </ul>
207+
*/
208+
private static LeaderAndStandby electLeaderAndStandbyForPkTable(
209+
List<Integer> availableReplicas, LeaderAndIsr leaderAndIsr) {
210+
int currentStandby = getCurrentStandby(leaderAndIsr);
211+
int newLeader;
212+
213+
if (currentStandby != -1 && availableReplicas.contains(currentStandby)) {
214+
// Promote current standby to leader.
215+
newLeader = currentStandby;
216+
} else {
217+
// Use first available replica as leader.
218+
newLeader = availableReplicas.get(0);
219+
}
220+
221+
// Find new standby from remaining replicas.
222+
List<Integer> standbyReplicas = findNewStandby(availableReplicas, newLeader);
223+
return new LeaderAndStandby(newLeader, standbyReplicas);
224+
}
225+
226+
/** Get current standby replica ID, returns -1 if no standby exists. */
227+
private static int getCurrentStandby(LeaderAndIsr leaderAndIsr) {
228+
return leaderAndIsr.standbyReplicas().isEmpty()
229+
? -1
230+
: leaderAndIsr.standbyReplicas().get(0);
231+
}
232+
233+
/** Find new standby from available replicas, excluding the leader. */
234+
private static List<Integer> findNewStandby(
235+
List<Integer> availableReplicas, int excludeLeader) {
236+
return availableReplicas.stream()
237+
.filter(replica -> replica != excludeLeader)
238+
.findFirst()
239+
.map(Collections::singletonList)
240+
.orElse(Collections.emptyList());
241+
}
242+
243+
/** Internal class to hold leader and standby election result. */
244+
private static class LeaderAndStandby {
245+
final int leader;
246+
final List<Integer> standbyReplicas;
247+
248+
LeaderAndStandby(int leader, List<Integer> standbyReplicas) {
249+
this.leader = leader;
250+
this.standbyReplicas = standbyReplicas;
251+
}
126252
}
127253
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
461461
.collect(Collectors.toList());
462462
LeaderAndIsr adjustLeaderAndIsr =
463463
newLeader == LeaderAndIsr.NO_LEADER
464-
? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
464+
? leaderAndIsr.newLeaderAndIsr(
465+
newLeader, newIsr, leaderAndIsr.standbyReplicas())
465466
: leaderAndIsr.newLeaderAndIsr(newIsr);
466467
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
467468
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);

0 commit comments

Comments
 (0)