Skip to content

Commit 93b57cf

Browse files
committed
KAFKA-19905: Fix tight reconnection loop during shutdown
This patch fixes a tight reconnection loop that may happen during shutdown. 1. Node 1 and 2 (brokers) request controlled shutdown 2. Controller grants the shutdown 3. Controller itself shuts down (RaftManager shutdown) 4. Node 1 and 2 continue trying to heartbeat to the now-dead controller 5. They get stuck in this reconnection loop because the NodeToControllerRequestThread is still running and hasn't been shut down properly Signed-off-by: Federico Valeri <[email protected]>
1 parent d04f171 commit 93b57cf

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ class NodeToControllerRequestThread(
220220
initialNetworkClient,
221221
Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt,
222222
time,
223-
false
223+
true
224224
) with Logging {
225225

226226
this.logIdent = logPrefix

core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,35 @@ class NodeToControllerRequestThreadTest {
456456
assertEquals(0, testRequestThread.queueSize)
457457
}
458458

459+
@Test
460+
def testThreadInterruptibleDuringShutdown(): Unit = {
461+
val time = new MockTime()
462+
val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
463+
464+
val metadata = mock(classOf[Metadata])
465+
val mockClient = new MockClient(time, metadata)
466+
467+
val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
468+
val controller = new Node(2, "host", 1234)
469+
470+
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(controller)))
471+
472+
val testRequestThread = new NodeToControllerRequestThread(
473+
mockClient, new ManualMetadataUpdater(),
474+
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
475+
476+
// Start the thread
477+
testRequestThread.start()
478+
TestUtils.waitUntilTrue(() => testRequestThread.isStarted, "Thread did not start", 5000)
479+
480+
// Shutdown the thread (should interrupt it since isInterruptible=true)
481+
testRequestThread.shutdown()
482+
483+
// Verify shutdown completed successfully
484+
assertTrue(testRequestThread.isShutdownComplete())
485+
assertFalse(testRequestThread.isRunning())
486+
}
487+
459488
private def pollUntil(
460489
requestThread: NodeToControllerRequestThread,
461490
condition: () => Boolean,

0 commit comments

Comments
 (0)