Skip to content

Commit 2ec18a6

Browse files
committed
[kv] PrimaryKey table support standby replica to reduce recovery time
111
1 parent 62db319 commit 2ec18a6

File tree

53 files changed

+2114
-788
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2114
-788
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class MetricNames {
7373
public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
7474
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
7575
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
76+
public static final String SERVER_PHYSICAL_STORAGE_STANDBY_SIZE = "standbySize";
7677

7778
// --------------------------------------------------------------------------------------------
7879
// metrics for user

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ message NotifyKvSnapshotOffsetRequest {
335335
required int32 bucket_id = 3;
336336
required int32 coordinator_epoch = 4;
337337
required int64 min_retain_offset = 5;
338+
optional int64 snapshot_id = 6;
338339
}
339340

340341
message NotifyKvSnapshotOffsetResponse {
@@ -802,6 +803,7 @@ message PbAdjustIsrReqForBucket {
802803
repeated int32 new_isr = 4 [packed = true];
803804
required int32 coordinator_epoch = 5;
804805
required int32 bucket_epoch = 6;
806+
repeated int32 standby_replicas = 7 [packed = true];
805807
}
806808

807809
message PbAdjustIsrRespForTable {
@@ -819,6 +821,7 @@ message PbAdjustIsrRespForBucket {
819821
repeated int32 isr = 7 [packed = true];
820822
optional int32 bucket_epoch = 8;
821823
optional int32 coordinator_epoch = 9;
824+
repeated int32 standby_replicas = 10 [packed = true];
822825
}
823826

824827
message PbListOffsetsRespForBucket {
@@ -839,6 +842,7 @@ message PbNotifyLeaderAndIsrReqForBucket {
839842
repeated int32 replicas = 5 [packed = true];
840843
repeated int32 isr = 6 [packed = true];
841844
required int32 bucket_epoch = 7;
845+
repeated int32 standby_replicas = 8 [packed = true];
842846
}
843847

844848
message PbNotifyLeaderAndIsrRespForBucket {

fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws T
173173
* @param tableBucket the table bucket
174174
* @return the tablet directory
175175
*/
176-
protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
176+
public File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
177177
File tabletDir = getTabletDir(tablePath, tableBucket);
178178
if (tabletDir.exists()) {
179179
return tabletDir;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1632,6 +1632,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
16321632
// TODO: reject the request if there is a replica in ISR is not online,
16331633
// see KIP-841.
16341634
tryAdjustLeaderAndIsr.isr(),
1635+
tryAdjustLeaderAndIsr.standbyReplicas(),
16351636
coordinatorContext.getCoordinatorEpoch(),
16361637
currentLeaderAndIsr.bucketEpoch() + 1);
16371638
newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
@@ -1754,7 +1755,9 @@ private void tryProcessCommitKvSnapshot(
17541755
completedSnapshotStore.add(completedSnapshot);
17551756
coordinatorEventManager.put(
17561757
new NotifyKvSnapshotOffsetEvent(
1757-
tb, completedSnapshot.getLogOffset()));
1758+
tb,
1759+
completedSnapshot.getLogOffset(),
1760+
completedSnapshot.getSnapshotID()));
17581761
callback.complete(new CommitKvSnapshotResponse());
17591762
} catch (Exception e) {
17601763
callback.completeExceptionally(e);
@@ -1764,7 +1767,6 @@ private void tryProcessCommitKvSnapshot(
17641767

17651768
private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent event) {
17661769
TableBucket tb = event.getTableBucket();
1767-
long logOffset = event.getLogOffset();
17681770
coordinatorRequestBatch.newBatch();
17691771
coordinatorContext
17701772
.getBucketLeaderAndIsr(tb)
@@ -1775,7 +1777,8 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
17751777
coordinatorContext.getFollowers(
17761778
tb, leaderAndIsr.leader()),
17771779
tb,
1778-
logOffset));
1780+
event.getLogOffset(),
1781+
event.getSnapshotId()));
17791782
coordinatorRequestBatch.sendNotifyKvSnapshotOffsetRequest(
17801783
coordinatorContext.getCoordinatorEpoch());
17811784
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,15 +365,18 @@ public void addNotifyRemoteLogOffsetsRequestForTabletServers(
365365
}
366366

367367
public void addNotifyKvSnapshotOffsetRequestForTabletServers(
368-
List<Integer> tabletServers, TableBucket tableBucket, long minRetainOffset) {
368+
List<Integer> tabletServers,
369+
TableBucket tableBucket,
370+
long minRetainOffset,
371+
long snapshotId) {
369372
tabletServers.stream()
370373
.filter(s -> s >= 0)
371374
.forEach(
372375
id ->
373376
notifyKvSnapshotOffsetRequestMap.put(
374377
id,
375378
makeNotifyKvSnapshotOffsetRequest(
376-
tableBucket, minRetainOffset)));
379+
tableBucket, minRetainOffset, snapshotId)));
377380
}
378381

379382
public void addNotifyLakeTableOffsetRequestForTableServers(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ public class NotifyKvSnapshotOffsetEvent implements CoordinatorEvent {
2525

2626
private final TableBucket tableBucket;
2727
private final long logOffset;
28+
private final long snapshotId;
2829

29-
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset) {
30+
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset, long snapshotId) {
3031
this.tableBucket = tableBucket;
3132
this.logOffset = logOffset;
33+
this.snapshotId = snapshotId;
3234
}
3335

3436
public TableBucket getTableBucket() {
@@ -38,4 +40,8 @@ public TableBucket getTableBucket() {
3840
public long getLogOffset() {
3941
return logOffset;
4042
}
43+
44+
public long getSnapshotId() {
45+
return snapshotId;
46+
}
4147
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElection.java

Lines changed: 165 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.server.zk.data.LeaderAndIsr;
2222

2323
import java.util.ArrayList;
24+
import java.util.Collections;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Optional;
@@ -41,23 +42,25 @@ public static class DefaultLeaderElection extends ReplicaLeaderElection {
4142
* @param assignments the assignments
4243
* @param aliveReplicas the alive replicas
4344
* @param leaderAndIsr the original leaderAndIsr
45+
* @param isPrimaryKeyTable whether this table bucket is primary key table
4446
* @return the election result
4547
*/
4648
public Optional<ElectionResult> leaderElection(
47-
List<Integer> assignments, List<Integer> aliveReplicas, LeaderAndIsr leaderAndIsr) {
48-
// currently, we always use the first replica in assignment, which also in aliveReplicas
49-
// and
50-
// isr as the leader replica.
49+
List<Integer> assignments,
50+
List<Integer> aliveReplicas,
51+
LeaderAndIsr leaderAndIsr,
52+
boolean isPrimaryKeyTable) {
5153
List<Integer> isr = leaderAndIsr.isr();
52-
for (int assignment : assignments) {
53-
if (aliveReplicas.contains(assignment) && isr.contains(assignment)) {
54-
return Optional.of(
55-
new TableBucketStateMachine.ElectionResult(
56-
aliveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
57-
}
58-
}
59-
60-
return Optional.empty();
54+
// First we will filter out the assignment list to only contain the alive replicas and
55+
// isr.
56+
List<Integer> availableReplicas =
57+
assignments.stream()
58+
.filter(
59+
replica ->
60+
aliveReplicas.contains(replica)
61+
&& isr.contains(replica))
62+
.collect(Collectors.toList());
63+
return electLeader(availableReplicas, aliveReplicas, leaderAndIsr, isPrimaryKeyTable);
6164
}
6265
}
6366

@@ -70,32 +73,66 @@ public static class ControlledShutdownLeaderElection extends ReplicaLeaderElecti
7073
* @param aliveReplicas the alive replicas
7174
* @param leaderAndIsr the original leaderAndIsr
7275
* @param shutdownTabletServers the shutdown tabletServers
76+
* @param isPrimaryKeyTable whether this table bucket is primary key table
7377
* @return the election result
7478
*/
7579
public Optional<ElectionResult> leaderElection(
7680
List<Integer> assignments,
7781
List<Integer> aliveReplicas,
7882
LeaderAndIsr leaderAndIsr,
79-
Set<Integer> shutdownTabletServers) {
83+
Set<Integer> shutdownTabletServers,
84+
boolean isPrimaryKeyTable) {
8085
List<Integer> originIsr = leaderAndIsr.isr();
8186
Set<Integer> isrSet = new HashSet<>(originIsr);
82-
for (Integer id : assignments) {
83-
if (aliveReplicas.contains(id)
84-
&& isrSet.contains(id)
85-
&& !shutdownTabletServers.contains(id)) {
86-
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
87-
newAliveReplicas.removeAll(shutdownTabletServers);
88-
List<Integer> newIsr =
89-
originIsr.stream()
90-
.filter(replica -> !shutdownTabletServers.contains(replica))
91-
.collect(Collectors.toList());
92-
return Optional.of(
93-
new ElectionResult(
94-
new ArrayList<>(newAliveReplicas),
95-
leaderAndIsr.newLeaderAndIsr(id, newIsr)));
96-
}
87+
// Filter out available replicas: alive, in ISR, and not shutting down.
88+
List<Integer> availableReplicas =
89+
assignments.stream()
90+
.filter(
91+
replica ->
92+
aliveReplicas.contains(replica)
93+
&& isrSet.contains(replica)
94+
&& !shutdownTabletServers.contains(replica))
95+
.collect(Collectors.toList());
96+
97+
if (availableReplicas.isEmpty()) {
98+
return Optional.empty();
9799
}
98-
return Optional.empty();
100+
101+
// For log table, simply use the first available replica as leader.
102+
if (!isPrimaryKeyTable) {
103+
List<Integer> newIsr =
104+
originIsr.stream()
105+
.filter(replica -> !shutdownTabletServers.contains(replica))
106+
.collect(Collectors.toList());
107+
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
108+
newAliveReplicas.removeAll(shutdownTabletServers);
109+
return Optional.of(
110+
new ElectionResult(
111+
new ArrayList<>(newAliveReplicas),
112+
leaderAndIsr.newLeaderAndIsr(
113+
availableReplicas.get(0),
114+
newIsr,
115+
Collections.emptyList())));
116+
}
117+
118+
// For PK table, elect leader and standby.
119+
LeaderAndStandby leaderAndStandby =
120+
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
121+
122+
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
123+
newAliveReplicas.removeAll(shutdownTabletServers);
124+
List<Integer> newIsr =
125+
originIsr.stream()
126+
.filter(replica -> !shutdownTabletServers.contains(replica))
127+
.collect(Collectors.toList());
128+
129+
return Optional.of(
130+
new ElectionResult(
131+
new ArrayList<>(newAliveReplicas),
132+
leaderAndIsr.newLeaderAndIsr(
133+
leaderAndStandby.leader,
134+
newIsr,
135+
leaderAndStandby.standbyReplicas)));
99136
}
100137
}
101138

@@ -108,20 +145,109 @@ public ReassignmentLeaderElection(List<Integer> newReplicas) {
108145
}
109146

110147
public Optional<ElectionResult> leaderElection(
111-
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr) {
112-
// currently, we always use the first replica in targetReplicas, which also in
148+
List<Integer> liveReplicas, LeaderAndIsr leaderAndIsr, boolean isPrimaryKeyTable) {
149+
// Currently, we always use the first replica in targetReplicas, which also in
113150
// liveReplicas and isr as the leader replica. For bucket reassignment, the first
114151
// replica is the target leader replica.
115152
List<Integer> isr = leaderAndIsr.isr();
116-
for (int assignment : newReplicas) {
117-
if (liveReplicas.contains(assignment) && isr.contains(assignment)) {
118-
return Optional.of(
119-
new ElectionResult(
120-
liveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
121-
}
122-
}
153+
List<Integer> availableReplicas =
154+
newReplicas.stream()
155+
.filter(
156+
replica ->
157+
liveReplicas.contains(replica) && isr.contains(replica))
158+
.collect(Collectors.toList());
159+
return electLeader(availableReplicas, liveReplicas, leaderAndIsr, isPrimaryKeyTable);
160+
}
161+
}
162+
163+
// ------------------------------------------------------------------------
164+
// Common election logic
165+
// ------------------------------------------------------------------------
123166

167+
private static Optional<ElectionResult> electLeader(
168+
List<Integer> availableReplicas,
169+
List<Integer> aliveReplicas,
170+
LeaderAndIsr leaderAndIsr,
171+
boolean isPrimaryKeyTable) {
172+
if (availableReplicas.isEmpty()) {
124173
return Optional.empty();
125174
}
175+
176+
List<Integer> isr = leaderAndIsr.isr();
177+
178+
// For log table, simply use the first available replica as leader.
179+
if (!isPrimaryKeyTable) {
180+
return Optional.of(
181+
new ElectionResult(
182+
aliveReplicas,
183+
leaderAndIsr.newLeaderAndIsr(
184+
availableReplicas.get(0), isr, Collections.emptyList())));
185+
}
186+
187+
// For PK table, elect leader and standby.
188+
LeaderAndStandby leaderAndStandby =
189+
electLeaderAndStandbyForPkTable(availableReplicas, leaderAndIsr);
190+
return Optional.of(
191+
new ElectionResult(
192+
aliveReplicas,
193+
leaderAndIsr.newLeaderAndIsr(
194+
leaderAndStandby.leader, isr, leaderAndStandby.standbyReplicas)));
195+
}
196+
197+
/**
198+
* Elect leader and standby for PK table.
199+
*
200+
* <p>Election strategy:
201+
*
202+
* <ul>
203+
* <li>If current standby exists and is available, promote it to leader
204+
* <li>Otherwise, use the first available replica as leader
205+
* <li>Select new standby from remaining available replicas (if any)
206+
* </ul>
207+
*/
208+
private static LeaderAndStandby electLeaderAndStandbyForPkTable(
209+
List<Integer> availableReplicas, LeaderAndIsr leaderAndIsr) {
210+
int currentStandby = getCurrentStandby(leaderAndIsr);
211+
int newLeader;
212+
213+
if (currentStandby != -1 && availableReplicas.contains(currentStandby)) {
214+
// Promote current standby to leader.
215+
newLeader = currentStandby;
216+
} else {
217+
// Use first available replica as leader.
218+
newLeader = availableReplicas.get(0);
219+
}
220+
221+
// Find new standby from remaining replicas.
222+
List<Integer> standbyReplicas = findNewStandby(availableReplicas, newLeader);
223+
return new LeaderAndStandby(newLeader, standbyReplicas);
224+
}
225+
226+
/** Get current standby replica ID, returns -1 if no standby exists. */
227+
private static int getCurrentStandby(LeaderAndIsr leaderAndIsr) {
228+
return leaderAndIsr.standbyReplicas().isEmpty()
229+
? -1
230+
: leaderAndIsr.standbyReplicas().get(0);
231+
}
232+
233+
/** Find new standby from available replicas, excluding the leader. */
234+
private static List<Integer> findNewStandby(
235+
List<Integer> availableReplicas, int excludeLeader) {
236+
return availableReplicas.stream()
237+
.filter(replica -> replica != excludeLeader)
238+
.findFirst()
239+
.map(Collections::singletonList)
240+
.orElse(Collections.emptyList());
241+
}
242+
243+
/** Internal class to hold leader and standby election result. */
244+
private static class LeaderAndStandby {
245+
final int leader;
246+
final List<Integer> standbyReplicas;
247+
248+
LeaderAndStandby(int leader, List<Integer> standbyReplicas) {
249+
this.leader = leader;
250+
this.standbyReplicas = standbyReplicas;
251+
}
126252
}
127253
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
461461
.collect(Collectors.toList());
462462
LeaderAndIsr adjustLeaderAndIsr =
463463
newLeader == LeaderAndIsr.NO_LEADER
464-
? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
464+
? leaderAndIsr.newLeaderAndIsr(
465+
newLeader, newIsr, leaderAndIsr.standbyReplicas())
465466
: leaderAndIsr.newLeaderAndIsr(newIsr);
466467
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
467468
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);

0 commit comments

Comments
 (0)