Skip to content

Commit 257f8d4

Browse files
ahuang98jsancio
authored andcommitted
KAFKA-18345; Prevent livelocked elections (apache#19658)
At the retry limit binaryExponentialElectionBackoffMs it becomes statistically likely that the exponential backoff returned electionBackoffMaxMs. This is an issue as multiple replicas can get stuck starting elections at the same cadence. This change fixes that by added a random jitter to the max election backoff. Reviewers: José Armando García Sancio <[email protected]>, TaiJuWu <[email protected]>, Yung <[email protected]>
1 parent 6314965 commit 257f8d4

File tree

4 files changed

+123
-14
lines changed

4 files changed

+123
-14
lines changed

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@
164164
* are not necessarily offset-aligned.
165165
*/
166166
public final class KafkaRaftClient<T> implements RaftClient<T> {
167-
private static final int RETRY_BACKOFF_BASE_MS = 100;
167+
// visible for testing
168+
static final int RETRY_BACKOFF_BASE_MS = 50;
168169
private static final int MAX_NUMBER_OF_BATCHES = 10;
169170
public static final int MAX_FETCH_WAIT_MS = 500;
170171
public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
@@ -921,7 +922,12 @@ private boolean handleVoteResponse(
921922

922923
state.startBackingOff(
923924
currentTimeMs,
924-
binaryExponentialElectionBackoffMs(state.retries())
925+
RaftUtil.binaryExponentialElectionBackoffMs(
926+
quorumConfig.electionBackoffMaxMs(),
927+
RETRY_BACKOFF_BASE_MS,
928+
state.retries(),
929+
random
930+
)
925931
);
926932
}
927933
}
@@ -935,15 +941,6 @@ private boolean handleVoteResponse(
935941
}
936942
}
937943

938-
private int binaryExponentialElectionBackoffMs(int retries) {
939-
if (retries <= 0) {
940-
throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
941-
}
942-
// upper limit exponential co-efficients at 20 to avoid overflow
943-
return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)),
944-
quorumConfig.electionBackoffMaxMs());
945-
}
946-
947944
private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) {
948945
if (positionInSuccessors == 0) {
949946
return 0;
@@ -3029,7 +3026,12 @@ private long pollCandidate(long currentTimeMs) {
30293026
}
30303027
return state.remainingBackoffMs(currentTimeMs);
30313028
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
3032-
long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries());
3029+
long backoffDurationMs = RaftUtil.binaryExponentialElectionBackoffMs(
3030+
quorumConfig.electionBackoffMaxMs(),
3031+
RETRY_BACKOFF_BASE_MS,
3032+
state.retries(),
3033+
random
3034+
);
30333035
logger.info("Election has timed out, backing off for {}ms before becoming a candidate again",
30343036
backoffDurationMs);
30353037
state.startBackingOff(currentTimeMs, backoffDurationMs);

raft/src/main/java/org/apache/kafka/raft/RaftUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Collections;
4949
import java.util.List;
5050
import java.util.Optional;
51+
import java.util.Random;
5152
import java.util.function.Consumer;
5253
import java.util.function.UnaryOperator;
5354
import java.util.stream.Collectors;
@@ -762,4 +763,18 @@ static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti
762763
data.topics().get(0).partitions().size() == 1 &&
763764
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
764765
}
766+
767+
static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int backoffBaseMs, int retries, Random random) {
768+
if (retries <= 0) {
769+
throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
770+
}
771+
// Takes minimum of the following:
772+
// 1. exponential backoff calculation (maxes out at 102.5 seconds with backoffBaseMs of 50ms)
773+
// 2. configurable electionBackoffMaxMs + jitter
774+
// The jitter is added to prevent livelock of elections.
775+
return Math.min(
776+
backoffBaseMs * (1 + random.nextInt(2 << Math.min(10, retries - 1))),
777+
backoffMaxMs + random.nextInt(backoffBaseMs)
778+
);
779+
}
765780
}

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,7 @@ public void testEndQuorumIgnoredAsCandidateIfOlderEpoch(boolean withKip853Rpc) t
10831083
context.assertVotedCandidate(epoch, localId);
10841084

10851085
// After backoff, we will become a candidate again
1086-
context.time.sleep(context.electionBackoffMaxMs);
1086+
context.time.sleep(context.electionBackoffMaxMs + jitterMs);
10871087
context.client.poll();
10881088
context.assertVotedCandidate(epoch + 1, localId);
10891089
}
@@ -1681,6 +1681,7 @@ public void testRetryElection(boolean withKip853Rpc) throws Exception {
16811681
context.time.sleep(2L * context.electionTimeoutMs());
16821682
context.pollUntilRequest();
16831683
context.assertVotedCandidate(epoch, localId);
1684+
CandidateState candidate = context.client.quorum().candidateStateOrThrow();
16841685

16851686
// Quorum size is two. If the other member rejects, then we need to schedule a revote.
16861687
RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 0L, 1);
@@ -1691,13 +1692,18 @@ public void testRetryElection(boolean withKip853Rpc) throws Exception {
16911692
);
16921693

16931694
context.client.poll();
1695+
assertTrue(candidate.isBackingOff());
1696+
assertEquals(
1697+
context.electionBackoffMaxMs + exponentialFactor,
1698+
candidate.remainingBackoffMs(context.time.milliseconds())
1699+
);
16941700

16951701
// All nodes have rejected our candidacy, but we should still remember that we had voted
16961702
context.assertVotedCandidate(epoch, localId);
16971703

16981704
// Even though our candidacy was rejected, we will backoff for jitter period
16991705
// before we bump the epoch and start a new election.
1700-
context.time.sleep(context.electionBackoffMaxMs - 1);
1706+
context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 1);
17011707
context.client.poll();
17021708
context.assertVotedCandidate(epoch, localId);
17031709

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.raft;
18+
19+
import org.junit.jupiter.params.ParameterizedTest;
20+
import org.junit.jupiter.params.provider.ValueSource;
21+
import org.mockito.ArgumentCaptor;
22+
import org.mockito.Mockito;
23+
24+
import java.util.List;
25+
import java.util.Random;
26+
27+
import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
28+
import static org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
31+
32+
public class RaftUtilTest {
33+
@ParameterizedTest
34+
@ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
35+
public void testExponentialBoundOfExponentialElectionBackoffMs(int retries) {
36+
Random mockedRandom = Mockito.mock(Random.class);
37+
int electionBackoffMaxMs = 1000;
38+
39+
// test the bound of the method's first call to random.nextInt
40+
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
41+
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
42+
Mockito.verify(mockedRandom, Mockito.times(2)).nextInt(nextIntCaptor.capture());
43+
List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
44+
int actualBound = allCapturedBounds.get(0);
45+
int expectedBound = (int) (2 * Math.pow(2, retries - 1));
46+
// after the 10th retry, the bound of the first call to random.nextInt will remain capped to
47+
// (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
48+
if (retries > 10) {
49+
expectedBound = 2048;
50+
}
51+
assertEquals(expectedBound, actualBound, "Incorrect bound for retries=" + retries);
52+
}
53+
54+
// test that the return value of the method is capped to QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
55+
// any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS) - 1 = 20 will result in this cap
56+
@ParameterizedTest
57+
@ValueSource(ints = {1, 2, 19, 20, 21, 2048})
58+
public void testExponentialElectionBackoffMsIsCapped(int exponential) {
59+
Random mockedRandom = Mockito.mock(Random.class);
60+
int electionBackoffMaxMs = 1000;
61+
// this is the max bound of the method's first call to random.nextInt
62+
int firstNextIntMaxBound = 2048;
63+
64+
int jitterMs = 50;
65+
Mockito.when(mockedRandom.nextInt(firstNextIntMaxBound)).thenReturn(exponential);
66+
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
67+
68+
int returnedBackoffMs = binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 11, mockedRandom);
69+
70+
// verify nextInt was called on both expected bounds
71+
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
72+
Mockito.verify(mockedRandom, Mockito.times(2)).nextInt(nextIntCaptor.capture());
73+
List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
74+
assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
75+
assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
76+
77+
// finally verify the backoff returned is capped to electionBackoffMaxMs + jitterMs
78+
int backoffValueCap = electionBackoffMaxMs + jitterMs;
79+
if (exponential < 20) {
80+
assertEquals(RETRY_BACKOFF_BASE_MS * (exponential + 1), returnedBackoffMs);
81+
assertTrue(returnedBackoffMs < backoffValueCap);
82+
} else {
83+
assertEquals(backoffValueCap, returnedBackoffMs);
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)