Skip to content

Commit 04e3acb

Browse files
authored
MINOR: Fix flaky test in ReplicaManagerTest (#21244)
Refer to #20082 (comment). Refactored the test to fix a race condition caused by dynamic Mockito stubbing during test execution. The previous implementation used `doReturn(false)` and `reset()` on a spy object while a background thread was running, causing a `ClassCastException`. This patch replaces that logic with a thread-safe `AtomicBoolean` and `doAnswer` approach to toggle the mock's behavior safely. ## Test Command ``` N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests ReplicaManagerTest -PmaxParallelForks=4 \ ; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done ``` ## Test Result ``` BUILD SUCCESSFUL in 12s 151 actionable tasks: 2 executed, 149 up-to-date Consider enabling configuration cache to speed up this build: https://docs.gradle.org/9.2.1/userguide/configuration_cache_enabling.html Completed run: 100 ``` Reviewers: Gaurav Narula <[email protected]>, Chia-Ping Tsai <[email protected]>, PoAn Yang <[email protected]>
1 parent 4498998 commit 04e3acb

File tree

1 file changed

+12
-6
lines changed

1 file changed

+12
-6
lines changed

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ import java.io.{ByteArrayInputStream, File}
9494
import java.net.InetAddress
9595
import java.nio.file.{Files, Paths}
9696
import java.util
97-
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
97+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
9898
import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, CountDownLatch, Future, TimeUnit}
9999
import java.util.function.{BiConsumer, Consumer}
100100
import java.util.stream.IntStream
@@ -5514,6 +5514,17 @@ class ReplicaManagerTest {
55145514

55155515
try {
55165516
val spiedPartition = spy(Partition(tpId, time, replicaManager))
5517+
5518+
// Prevent promotion of future replica
5519+
val blockPromotion = new AtomicBoolean(true)
5520+
doAnswer { invocation =>
5521+
if (blockPromotion.compareAndSet(true, false)) {
5522+
false
5523+
} else {
5524+
invocation.callRealMethod()
5525+
}
5526+
}.when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
5527+
55175528
replicaManager.addOnlinePartition(tp, spiedPartition)
55185529

55195530
val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, partitions = List(0, 1), List.empty, topic, topicIds(topic))
@@ -5526,9 +5537,6 @@ class ReplicaManagerTest {
55265537
val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == firstLogDir).head
55275538
replicaManager.alterReplicaLogDirs(Map(tp -> newReplicaFolder.getAbsolutePath))
55285539

5529-
// Prevent promotion of future replica
5530-
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
5531-
55325540
// Make sure the future log is created with the correct topic ID.
55335541
val futureLog = replicaManager.futureLocalLogOrException(tp)
55345542
assertEquals(Optional.of(topicId), futureLog.topicId)
@@ -5537,8 +5545,6 @@ class ReplicaManagerTest {
55375545
val finalReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(it => it == firstLogDir || it == newReplicaFolder).head
55385546
replicaManager.alterReplicaLogDirs(Map(tp -> finalReplicaFolder.getAbsolutePath))
55395547

5540-
reset(spiedPartition)
5541-
55425548
TestUtils.waitUntilTrue(() => {
55435549
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
55445550
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty

0 commit comments

Comments
 (0)