Skip to content

Commit 163c3f1

Browse files
committed
KAFKA-19905: Fix tight reconnection loop during shutdown
This patch fixes a tight reconnection loop that may happen during shutdown. 1. Node 1 and 2 (brokers) request controlled shutdown 2. Controller grants the shutdown 3. Controller itself shuts down (RaftManager shutdown) 4. Node 1 and 2 continue trying to heartbeat to the now-dead controller 5. They get stuck in this reconnection loop because the NodeToControllerRequestThread is still running and hasn't been shut down properly Signed-off-by: Federico Valeri <[email protected]>
1 parent 8b119e5 commit 163c3f1

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,10 @@ class NodeToControllerRequestThread(
317317
}
318318

319319
override def doWork(): Unit = {
320-
val controllerInformation = controllerNodeProvider.getControllerInfo()
321320
if (activeControllerAddress().isDefined) {
322321
super.pollOnce(Long.MaxValue)
323-
} else {
322+
} else if (isRunning()) {
323+
val controllerInformation = controllerNodeProvider.getControllerInfo()
324324
debug("Controller isn't cached, looking for local metadata changes")
325325
controllerInformation.node match {
326326
case Some(controllerNode) =>

core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,40 @@ class NodeToControllerRequestThreadTest {
456456
assertEquals(0, testRequestThread.queueSize)
457457
}
458458

459+
@Test
460+
def testNoControllerReconnectAfterShutdown(): Unit = {
461+
val time = new MockTime()
462+
val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
463+
464+
val metadata = mock(classOf[Metadata])
465+
val mockClient = new MockClient(time, metadata)
466+
467+
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
468+
val controller = new Node(2, "host", 1234)
469+
470+
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(controller)))
471+
472+
val testRequestThread = new NodeToControllerRequestThread(
473+
mockClient, new ManualMetadataUpdater(),
474+
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
475+
testRequestThread.started = true
476+
477+
// Initiate shutdown
478+
testRequestThread.initiateShutdown()
479+
assertFalse(testRequestThread.isRunning())
480+
481+
// Track initial invocation count
482+
val initialGetControllerInfoCalls = mockingDetails(controllerNodeProvider).getInvocations.size()
483+
484+
// Call doWork after shutdown
485+
// It should not try to get controller info since isRunning() is false
486+
testRequestThread.doWork()
487+
488+
// Verify that getControllerInfo was not called again
489+
val finalGetControllerInfoCalls = mockingDetails(controllerNodeProvider).getInvocations.size()
490+
assertEquals(initialGetControllerInfoCalls, finalGetControllerInfoCalls)
491+
}
492+
459493
private def pollUntil(
460494
requestThread: NodeToControllerRequestThread,
461495
condition: () => Boolean,

0 commit comments

Comments
 (0)