Skip to content

Commit f831072

Browse files
committed
GH-2222: Fix Race in Test
1 parent 051ceb5 commit f831072

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2544,17 +2544,25 @@ public void rePausePartitionAfterRebalance() throws Exception {
25442544
TopicPartition tp1 = new TopicPartition("foo", 1);
25452545
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
25462546
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2547-
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
2547+
final CountDownLatch suspendConsumerThread = new CountDownLatch(1);
25482548
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2549+
Thread testThread = Thread.currentThread();
2550+
AtomicBoolean paused = new AtomicBoolean();
25492551
willAnswer(i -> {
25502552
pausedParts.clear();
25512553
pausedParts.addAll(i.getArgument(0));
2552-
pauseLatch1.countDown();
2553-
pauseLatch2.countDown();
2554+
if (!Thread.currentThread().equals(testThread)) {
2555+
paused.set(true);
2556+
}
25542557
return null;
25552558
}).given(consumer).pause(any());
25562559
given(consumer.paused()).willReturn(pausedParts);
25572560
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2561+
if (paused.get()) {
2562+
pauseLatch1.countDown();
2563+
// hold up the consumer thread while we revoke/assign partitions on the test thread
2564+
suspendConsumerThread.await(10, TimeUnit.SECONDS);
2565+
}
25582566
Thread.sleep(50);
25592567
return ConsumerRecords.empty();
25602568
});
@@ -2585,15 +2593,14 @@ public void rePausePartitionAfterRebalance() throws Exception {
25852593
.contains(tp0, tp1);
25862594
rebal.get().onPartitionsRevoked(Set.of(tp0, tp1));
25872595
rebal.get().onPartitionsAssigned(Collections.singleton(tp0));
2588-
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
25892596
assertThat(pausedParts).hasSize(1)
25902597
.contains(tp0);
25912598
assertThat(container).extracting("listenerConsumer")
25922599
.extracting("pausedPartitions")
25932600
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
25942601
.hasSize(1)
2595-
.containsExactlyInAnyOrder(tp0);
2596-
2602+
.contains(tp0);
2603+
suspendConsumerThread.countDown();
25972604
container.stop();
25982605
}
25992606

0 commit comments

Comments
 (0)