Skip to content

Commit f16d1f3

Browse files
KAFKA-19299: Fix race condition in RemoteIndexCacheTest (#19927)
This MR should be couple of race conditions in RemoteIndexCacheTest. 1. There was a race condition between cache-cleanup-thread and test thread, which wants to check that cache is gone. This was fixed with TestUtils#waitForCondition 2. After each test we check that there is not thread leak. This check wasn't working properly, because live of thread status is set by JVM level, we can only set interrupted status (using private native void interrupt0(); method under the hood), but we don't really know when JVM will change the live status of thread. To fix this I've refactored TestUtils#assertNoLeakedThreadsWithNameAndDaemonStatus method to use TestUtils#waitForCondition. This fix should also affect few other tests, which were flaky because of this check. See gradle run on [develocity](https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.storage.internals.log.RemoteIndexCacheTest&tests.sortField=FLAKY) After fix test were run 10000 times with repeated test annotation: `./gradlew clean storage:test --tests org.apache.kafka.storage.internals.log.RemoteIndexCacheTest.testCacheEntryIsDeletedOnRemoval` ... `Gradle Test Run :storage:test > Gradle Test Executor 20 > RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval() > repetition 9998 of 10000 PASSED` `Gradle Test Run :storage:test > Gradle Test Executor 20 > RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval() > repetition 9999 of 10000 PASSED` `Gradle Test Run :storage:test > Gradle Test Executor 20 > RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval() > repetition 10000 of 10000 PASSED` `BUILD SUCCESSFUL in 20m 9s` `148 actionable tasks: 148 executed` Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent da6a562 commit f16d1f3

File tree

3 files changed

+13
-15
lines changed

3 files changed

+13
-15
lines changed

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void setup(TestInfo testInfo) {
219219
}
220220

221221
@AfterEach
222-
public void detectLeaks() {
222+
public void detectLeaks() throws InterruptedException {
223223
// Assert no thread leakage of Kafka producer.
224224
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(NETWORK_THREAD_PREFIX, Boolean.TRUE);
225225
}

clients/src/test/java/org/apache/kafka/test/TestUtils.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import java.util.function.Supplier;
7575
import java.util.regex.Matcher;
7676
import java.util.regex.Pattern;
77-
import java.util.stream.Collectors;
7877

7978
import static java.util.Arrays.asList;
8079
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -161,22 +160,21 @@ public static MetadataSnapshot metadataSnapshotWith(final int nodes, final Map<S
161160
* Asserts that there are no leaked threads with a specified name prefix and daemon status.
162161
* This method checks all threads in the JVM, filters them by the provided thread name prefix
163162
* and daemon status, and verifies that no matching threads are alive.
164-
* If any matching threads are found, the test will fail.
163+
* Use the {@link #waitForCondition(TestCondition, String) waitForCondition} to retry the check at a regular interval
164+
* until either no matching threads are found or the timeout is exceeded.
165+
* If any matching, alive threads are found after the timeout has elapsed, the assertion will fail.
165166
*
166167
* @param threadName The prefix of the thread names to check. Only threads whose names
167168
* start with this prefix will be considered.
168169
* @param isDaemon The daemon status to check. Only threads with the specified
169170
* daemon status (either true for daemon threads or false for non-daemon threads)
170171
* will be considered.
171172
*
172-
* @throws AssertionError If any thread with the specified name prefix and daemon status is found and is alive.
173+
* @throws AssertionError If any thread with the specified name prefix and daemon status are found after the timeout.
173174
*/
174-
public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) {
175-
List<Thread> threads = Thread.getAllStackTraces().keySet().stream()
176-
.filter(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName))
177-
.collect(Collectors.toList());
178-
int threadCount = threads.size();
179-
assertEquals(0, threadCount);
175+
public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) throws InterruptedException {
176+
waitForCondition(() -> Thread.getAllStackTraces().keySet().stream()
177+
.noneMatch(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)), String.format("Thread leak detected: %s", threadName));
180178
}
181179

182180
/**

storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void setup() throws IOException, RemoteStorageException {
127127
}
128128

129129
@AfterEach
130-
public void cleanup() {
130+
public void cleanup() throws InterruptedException {
131131
reset(rsm);
132132
// the files created for the test will be deleted automatically on thread exit since we use temp dir
133133
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
@@ -344,13 +344,13 @@ public void testCacheEntryIsDeletedOnRemoval() throws IOException, InterruptedEx
344344
}, "Failed to delete index file");
345345

346346
// verify no index files on disk
347-
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent(),
347+
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(),
348348
"Offset index file should not be present on disk at " + tpDir.toPath());
349-
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(),
349+
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(),
350350
"Txn index file should not be present on disk at " + tpDir.toPath());
351-
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(),
351+
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(),
352352
"Time index file should not be present on disk at " + tpDir.toPath());
353-
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent(),
353+
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
354354
"Index file marked for deletion should not be present on disk at " + tpDir.toPath());
355355
}
356356

0 commit comments

Comments
 (0)