Skip to content

Commit 15fb95b

Browse files
committed
[kv] Support elect standby replicas for primary key table
1 parent bafba42 commit 15fb95b

33 files changed

+831
-131
lines changed

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,7 @@ message PbAdjustIsrReqForBucket {
900900
repeated int32 new_isr = 4 [packed = true];
901901
required int32 coordinator_epoch = 5;
902902
required int32 bucket_epoch = 6;
903+
repeated int32 standby_replicas = 7 [packed = true];
903904
}
904905

905906
message PbAdjustIsrRespForTable {
@@ -917,6 +918,7 @@ message PbAdjustIsrRespForBucket {
917918
repeated int32 isr = 7 [packed = true];
918919
optional int32 bucket_epoch = 8;
919920
optional int32 coordinator_epoch = 9;
921+
repeated int32 standby_replicas = 10 [packed = true];
920922
}
921923

922924
message PbListOffsetsRespForBucket {
@@ -937,6 +939,7 @@ message PbNotifyLeaderAndIsrReqForBucket {
937939
repeated int32 replicas = 5 [packed = true];
938940
repeated int32 isr = 6 [packed = true];
939941
required int32 bucket_epoch = 7;
942+
repeated int32 standby_replicas = 8 [packed = true];
940943
}
941944

942945
message PbNotifyLeaderAndIsrRespForBucket {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
16181618
// TODO: reject the request if there is a replica in ISR is not online,
16191619
// see KIP-841.
16201620
tryAdjustLeaderAndIsr.isr(),
1621+
tryAdjustLeaderAndIsr.standbyReplicas(),
16211622
coordinatorContext.getCoordinatorEpoch(),
16221623
currentLeaderAndIsr.bucketEpoch() + 1);
16231624
newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElection.java

Lines changed: 165 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.server.zk.data.LeaderAndIsr;
2222

2323
import java.util.ArrayList;
24+
import java.util.Collections;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Optional;
@@ -41,23 +42,25 @@ public static class DefaultLeaderElection extends ReplicaLeaderElection {
4142
* @param assignments the assignments
4243
* @param aliveReplicas the alive replicas
4344
* @param leaderAndIsr the original leaderAndIsr
45+
* @param isPrimaryKeyTable whether this table bucket is primary key table
4446
* @return the election result
4547
*/
4648
public Optional<ElectionResult> leaderElection(
47-
List<Integer> assignments, List<Integer> aliveReplicas, LeaderAndIsr leaderAndIsr) {
48-
// currently, we always use the first replica in assignment, which also in aliveReplicas
49-
// and
50-
// isr as the leader replica.
49+
List<Integer> assignments,
50+
List<Integer> aliveReplicas,
51+
LeaderAndIsr leaderAndIsr,
52+
boolean isPrimaryKeyTable) {
5153
List<Integer> isr = leaderAndIsr.isr();
52-
for (int assignment : assignments) {
53-
if (aliveReplicas.contains(assignment) && isr.contains(assignment)) {
54-
return Optional.of(
55-
new TableBucketStateMachine.ElectionResult(
56-
aliveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
57-
}
58-
}
59-
60-
return Optional.empty();
54+
// First we will filter out the assignment list to only contain the alive replicas and
55+
// isr.
56+
List<Integer> availableReplicas =
57+
assignments.stream()
58+
.filter(
59+
replica ->
60+
aliveReplicas.contains(replica)
61+
&& isr.contains(replica))
62+
.collect(Collectors.toList());
63+
return electLeader(availableReplicas, aliveReplicas, leaderAndIsr, isPrimaryKeyTable);
6164
}
6265
}
6366

@@ -70,32 +73,66 @@ public static class ControlledShutdownLeaderElection extends ReplicaLeaderElecti
7073
* @param aliveReplicas the alive replicas
7174
* @param leaderAndIsr the original leaderAndIsr
7275
* @param shutdownTabletServers the shutdown tabletServers
76+
* @param isPrimaryKeyTable whether this table bucket is primary key table
7377
* @return the election result
7478
*/
7579
public Optional<ElectionResult> leaderElection(
7680
List<Integer> assignments,
7781
List<Integer> aliveReplicas,
7882
LeaderAndIsr leaderAndIsr,
79-
Set<Integer> shutdownTabletServers) {
83+
Set<Integer> shutdownTabletServers,
84+
boolean isPrimaryKeyTable) {
8085
List<Integer> originIsr = leaderAndIsr.isr();
8186
Set<Integer> isrSet = new HashSet<>(originIsr);
82-
for (Integer id : assignments) {
83-
if (aliveReplicas.contains(id)
84-
&& isrSet.contains(id)
85-
&& !shutdownTabletServers.contains(id)) {
86-
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
87-
newAliveReplicas.removeAll(shutdownTabletServers);
88-
List<Integer> newIsr =
89-
originIsr.stream()
90-
.filter(replica -> !shutdownTabletServers.contains(replica))
91-
.collect(Collectors.toList());
92-
return Optional.of(
93-
new ElectionResult(
94-
new ArrayList<>(newAliveReplicas),
95-
leaderAndIsr.newLeaderAndIsr(id, newIsr)));
96-
}
87+
// Filter out available replicas: alive, in ISR, and not shutting down.
88+
List<Integer> availableReplicas =
89+
assignments.stream()
90+
.filter(
91+
replica ->
92+
aliveReplicas.contains(replica)
93+
&& isrSet.contains(replica)
94+
&& !shutdownTabletServers.contains(replica))
95+
.collect(Collectors.toList());
96+
97+
if (availableReplicas.isEmpty()) {
98+
return Optional.empty();
9799
}
98-
return Optional.empty();
100+
101+
// For log table, simply use the first available replica as leader.
102+
if (!isPrimaryKeyTable) {
103+
List<Integer> newIsr =
104+
originIsr.stream()
105+
.filter(replica -> !shutdownTabletServers.contains(replica))
106+
.collect(Collectors.toList());
107+
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
108+
newAliveReplicas.removeAll(shutdownTabletServers);
109+
return Optional.of(
110+
new ElectionResult(
111+
new ArrayList<>(newAliveReplicas),
112+
leaderAndIsr.newLeaderAndIsr(
113+
availableReplicas.get(0),
114+
newIsr,
115+
Collections.emptyList())));
116+
}
117+
118+
// For PK table, elect leader and standby.
119+
LeaderAndStandby leaderAndStandby =
120+
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
121+
122+
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
123+
newAliveReplicas.removeAll(shutdownTabletServers);
124+
List<Integer> newIsr =
125+
originIsr.stream()
126+
.filter(replica -> !shutdownTabletServers.contains(replica))
127+
.collect(Collectors.toList());
128+
129+
return Optional.of(
130+
new ElectionResult(
131+
new ArrayList<>(newAliveReplicas),
132+
leaderAndIsr.newLeaderAndIsr(
133+
leaderAndStandby.leader,
134+
newIsr,
135+
leaderAndStandby.standbyReplicas)));
99136
}
100137
}
101138

@@ -108,20 +145,109 @@ public ReassignmentLeaderElection(List<Integer> newReplicas) {
108145
}
109146

110147
public Optional<ElectionResult> leaderElection(
111-
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr) {
112-
// currently, we always use the first replica in targetReplicas, which also in
148+
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr, boolean isPrimaryKeyTable) {
149+
// Currently, we always use the first replica in targetReplicas, which also in
113150
// liveReplicas and isr as the leader replica. For bucket reassignment, the first
114151
// replica is the target leader replica.
115152
List<Integer> isr = leaderAndIsr.isr();
116-
for (int assignment : newReplicas) {
117-
if (liveReplicas.contains(assignment) && isr.contains(assignment)) {
118-
return Optional.of(
119-
new ElectionResult(
120-
liveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
121-
}
122-
}
153+
List<Integer> availableReplicas =
154+
newReplicas.stream()
155+
.filter(
156+
replica ->
157+
liveReplicas.contains(replica) && isr.contains(replica))
158+
.collect(Collectors.toList());
159+
return electLeader(availableReplicas, liveReplicas, leaderAndIsr, isPrimaryKeyTable);
160+
}
161+
}
162+
163+
// ------------------------------------------------------------------------
164+
// Common election logic
165+
// ------------------------------------------------------------------------
123166

167+
private static Optional<ElectionResult> electLeader(
168+
List<Integer> availableReplicas,
169+
List<Integer> aliveReplicas,
170+
LeaderAndIsr leaderAndIsr,
171+
boolean isPrimaryKeyTable) {
172+
if (availableReplicas.isEmpty()) {
124173
return Optional.empty();
125174
}
175+
176+
List<Integer> isr = leaderAndIsr.isr();
177+
178+
// For log table, simply use the first available replica as leader.
179+
if (!isPrimaryKeyTable) {
180+
return Optional.of(
181+
new ElectionResult(
182+
aliveReplicas,
183+
leaderAndIsr.newLeaderAndIsr(
184+
availableReplicas.get(0), isr, Collections.emptyList())));
185+
}
186+
187+
// For PK table, elect leader and standby.
188+
LeaderAndStandby leaderAndStandby =
189+
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
190+
return Optional.of(
191+
new ElectionResult(
192+
aliveReplicas,
193+
leaderAndIsr.newLeaderAndIsr(
194+
leaderAndStandby.leader, isr, leaderAndStandby.standbyReplicas)));
195+
}
196+
197+
/**
198+
* Elect leader and standby for PK table.
199+
*
200+
* <p>Election strategy:
201+
*
202+
* <ul>
203+
* <li>If current standby exists and is available, promote it to leader
204+
* <li>Otherwise, use the first available replica as leader
205+
* <li>Select new standby from remaining available replicas (if any)
206+
* </ul>
207+
*/
208+
private static LeaderAndStandby electLeaderAndStandbyForPkTable(
209+
List<Integer> availableReplicas, LeaderAndIsr leaderAndIsr) {
210+
int currentStandby = getCurrentStandby(leaderAndIsr);
211+
int newLeader;
212+
213+
if (currentStandby != -1 && availableReplicas.contains(currentStandby)) {
214+
// Promote current standby to leader.
215+
newLeader = currentStandby;
216+
} else {
217+
// Use first available replica as leader.
218+
newLeader = availableReplicas.get(0);
219+
}
220+
221+
// Find new standby from remaining replicas.
222+
List<Integer> standbyReplicas = findNewStandby(availableReplicas, newLeader);
223+
return new LeaderAndStandby(newLeader, standbyReplicas);
224+
}
225+
226+
/** Get current standby replica ID, returns -1 if no standby exists. */
227+
private static int getCurrentStandby(LeaderAndIsr leaderAndIsr) {
228+
return leaderAndIsr.standbyReplicas().isEmpty()
229+
? -1
230+
: leaderAndIsr.standbyReplicas().get(0);
231+
}
232+
233+
/** Find new standby from available replicas, excluding the leader. */
234+
private static List<Integer> findNewStandby(
235+
List<Integer> availableReplicas, int excludeLeader) {
236+
return availableReplicas.stream()
237+
.filter(replica -> replica != excludeLeader)
238+
.findFirst()
239+
.map(Collections::singletonList)
240+
.orElse(Collections.emptyList());
241+
}
242+
243+
/** Internal class to hold leader and standby election result. */
244+
private static class LeaderAndStandby {
245+
final int leader;
246+
final List<Integer> standbyReplicas;
247+
248+
LeaderAndStandby(int leader, List<Integer> standbyReplicas) {
249+
this.leader = leader;
250+
this.standbyReplicas = standbyReplicas;
251+
}
126252
}
127253
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
481481
.collect(Collectors.toList());
482482
LeaderAndIsr adjustLeaderAndIsr =
483483
newLeader == LeaderAndIsr.NO_LEADER
484-
? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
484+
? leaderAndIsr.newLeaderAndIsr(
485+
newLeader, newIsr, leaderAndIsr.standbyReplicas())
485486
: leaderAndIsr.newLeaderAndIsr(newIsr);
486487
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
487488
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,12 @@ private Optional<ElectionResult> doInitElectionForBucket(
439439
// servers as inSyncReplica set.
440440
Optional<ElectionResult> resultOpt =
441441
initReplicaLeaderElection(
442-
assignedServers, liveServers, coordinatorContext.getCoordinatorEpoch());
442+
assignedServers,
443+
liveServers,
444+
coordinatorContext.getCoordinatorEpoch(),
445+
coordinatorContext
446+
.getTableInfoById(tableBucket.getTableId())
447+
.hasPrimaryKey());
443448
if (!resultOpt.isPresent()) {
444449
LOG.error(
445450
"The leader election for table bucket {} is empty.",
@@ -613,10 +618,12 @@ private Optional<ElectionResult> electLeader(
613618
}
614619

615620
Optional<ElectionResult> resultOpt = Optional.empty();
621+
boolean isPkTable =
622+
coordinatorContext.getTableInfoById(tableBucket.getTableId()).hasPrimaryKey();
616623
if (electionStrategy instanceof DefaultLeaderElection) {
617624
resultOpt =
618625
((DefaultLeaderElection) electionStrategy)
619-
.leaderElection(assignment, liveReplicas, leaderAndIsr);
626+
.leaderElection(assignment, liveReplicas, leaderAndIsr, isPkTable);
620627
} else if (electionStrategy instanceof ControlledShutdownLeaderElection) {
621628
Set<Integer> shuttingDownTabletServers = coordinatorContext.shuttingDownTabletServers();
622629
resultOpt =
@@ -625,11 +632,12 @@ private Optional<ElectionResult> electLeader(
625632
assignment,
626633
liveReplicas,
627634
leaderAndIsr,
628-
shuttingDownTabletServers);
635+
shuttingDownTabletServers,
636+
isPkTable);
629637
} else if (electionStrategy instanceof ReassignmentLeaderElection) {
630638
resultOpt =
631639
((ReassignmentLeaderElection) electionStrategy)
632-
.leaderElection(liveReplicas, leaderAndIsr);
640+
.leaderElection(liveReplicas, leaderAndIsr, isPkTable);
633641
}
634642

635643
if (!resultOpt.isPresent()) {
@@ -670,23 +678,40 @@ public LeaderAndIsr getLeaderAndIsr() {
670678
* @param assignments the assignments
671679
* @param aliveReplicas the alive replicas
672680
* @param coordinatorEpoch the coordinator epoch
681+
* @param isPrimaryKeyTable whether this table bucket is primary key table
673682
* @return the election result
674683
*/
675684
@VisibleForTesting
676685
public static Optional<ElectionResult> initReplicaLeaderElection(
677-
List<Integer> assignments, List<Integer> aliveReplicas, int coordinatorEpoch) {
678-
// currently, we always use the first replica in assignment, which also in aliveReplicas and
679-
// isr as the leader replica.
680-
for (int assignment : assignments) {
681-
if (aliveReplicas.contains(assignment)) {
682-
return Optional.of(
683-
new ElectionResult(
684-
aliveReplicas,
685-
new LeaderAndIsr(
686-
assignment, 0, aliveReplicas, coordinatorEpoch, 0)));
686+
List<Integer> assignments,
687+
List<Integer> aliveReplicas,
688+
int coordinatorEpoch,
689+
boolean isPrimaryKeyTable) {
690+
// First we will filter out the assignment list to only contain the alive replicas.
691+
List<Integer> availableReplicas =
692+
assignments.stream().filter(aliveReplicas::contains).collect(Collectors.toList());
693+
694+
// If the assignment list is empty, we return empty.
695+
if (availableReplicas.isEmpty()) {
696+
return Optional.empty();
697+
}
698+
699+
// Then we will use the first replica in assignment as the leader replica.
700+
int leader = availableReplicas.get(0);
701+
702+
// If this table is primaryKey table, we will use the second replica in assignment as the
703+
// standby if exists.
704+
List<Integer> standbyReplica = new ArrayList<>();
705+
if (isPrimaryKeyTable) {
706+
if (availableReplicas.size() > 1) {
707+
standbyReplica.add(availableReplicas.get(1));
687708
}
688709
}
689710

690-
return Optional.empty();
711+
return Optional.of(
712+
new ElectionResult(
713+
aliveReplicas,
714+
new LeaderAndIsr(
715+
leader, 0, aliveReplicas, standbyReplica, coordinatorEpoch, 0)));
691716
}
692717
}

fluss-server/src/main/java/org/apache/fluss/server/entity/NotifyLeaderAndIsrData.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,12 @@ public int[] getIsrArray() {
8585
public LeaderAndIsr getLeaderAndIsr() {
8686
return leaderAndIsr;
8787
}
88+
89+
public List<Integer> getStandbyReplicas() {
90+
return leaderAndIsr.standbyReplicas();
91+
}
92+
93+
public int[] getStandbyReplicasArray() {
94+
return leaderAndIsr.standbyReplicas().stream().mapToInt(Integer::intValue).toArray();
95+
}
8896
}

0 commit comments

Comments
 (0)