Skip to content

Commit 02c102f

Browse files
committed
KAFKA-19294: Fix BrokerLifecycleManager RPC timeouts (apache#19745)
Previously, we could wait for up to half of the broker session timeout for an RPC to complete, and then delay by up to half of the broker session timeout. When taken together, these two delays could lead to brokers erroneously missing heartbeats. This change removes exponential backoff for heartbeats sent from the broker to the controller. The load caused by heartbeats is not heavy, and controllers can easily time out heartbeats when the queue length is too long. Additionally, we now set the maximum RPC time to the length of the broker period. This minimizes the impact of heavy load. Reviewers: José Armando García Sancio <[email protected]>, David Arthur <[email protected]> Conflicts: - BrokerLifecycleManager.scala: fix minor conflict when removing ExponentialBackoff object
1 parent 713db44 commit 02c102f

File tree

2 files changed

+4
-19
lines changed

2 files changed

+4
-19
lines changed

core/src/main/scala/kafka/server/BrokerLifecycleManager.scala

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.kafka.common.protocol.Errors
2828
import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
2929
import org.apache.kafka.metadata.{BrokerState, VersionRange}
3030
import org.apache.kafka.queue.EventQueue.DeadlineFunction
31-
import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
31+
import org.apache.kafka.common.utils.{LogContext, Time}
3232
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
3333
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
3434

@@ -93,18 +93,6 @@ class BrokerLifecycleManager(
9393
private val initialTimeoutNs =
9494
MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs.longValue())
9595

96-
/**
97-
* The exponential backoff to use for resending communication.
98-
*/
99-
private val resendExponentialBackoff =
100-
new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong, 0.02)
101-
102-
/**
103-
* The number of times we've tried and failed to communicate. This variable can only be
104-
* read or written from the BrokerToControllerRequestThread.
105-
*/
106-
private var failedAttempts = 0L
107-
10896
/**
10997
* The broker incarnation ID. This ID uniquely identifies each time we start the broker
11098
*/
@@ -449,7 +437,6 @@ class BrokerLifecycleManager(
449437
val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse]
450438
val errorCode = Errors.forCode(message.data().errorCode())
451439
if (errorCode == Errors.NONE) {
452-
failedAttempts = 0
453440
_brokerEpoch = message.data().brokerEpoch()
454441
registered = true
455442
initialRegistrationSucceeded = true
@@ -523,7 +510,6 @@ class BrokerLifecycleManager(
523510
val errorCode = Errors.forCode(message.data().errorCode())
524511
if (errorCode == Errors.NONE) {
525512
val responseData = message.data()
526-
failedAttempts = 0
527513
currentOfflineDirs.foreach(cur => offlineDirs.put(cur, true))
528514
_state match {
529515
case BrokerState.STARTING =>
@@ -586,10 +572,9 @@ class BrokerLifecycleManager(
586572
}
587573

588574
private def scheduleNextCommunicationAfterFailure(): Unit = {
589-
val delayMs = resendExponentialBackoff.backoff(failedAttempts)
590-
failedAttempts = failedAttempts + 1
591575
nextSchedulingShouldBeImmediate = false // never immediately reschedule after a failure
592-
scheduleNextCommunication(NANOSECONDS.convert(delayMs, MILLISECONDS))
576+
scheduleNextCommunication(NANOSECONDS.convert(
577+
config.brokerHeartbeatIntervalMs.longValue() , MILLISECONDS))
593578
}
594579

595580
private def scheduleNextCommunicationAfterSuccess(): Unit = {

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ class BrokerServer(
365365
config,
366366
"heartbeat",
367367
s"broker-${config.nodeId}-",
368-
config.brokerSessionTimeoutMs / 2 // KAFKA-14392
368+
config.brokerHeartbeatIntervalMs
369369
)
370370
lifecycleManager.start(
371371
() => sharedServer.loader.lastAppliedOffset(),

0 commit comments

Comments
 (0)