Skip to content

Commit ec7aeb4

Browse files
committed
Fix race condition for batches in the BatchIndividualRecordObservationTests
There is no guarantee the Apache Kafka would deliver an expected number of records into a single batch to process. * Rework the test configuration logic to count down every single record independently of the received batched
1 parent a8bf449 commit ec7aeb4

File tree

1 file changed

+4
-15
lines changed

1 file changed

+4
-15
lines changed

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/BatchIndividualRecordObservationTests.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,6 @@ void batchIndividualRecordObservationCreatesObservationPerRecord(@Autowired Batc
146146
.containsEntry("foo", "some foo value")
147147
.containsEntry("bar", "some bar value")
148148
.containsEntry("message-id", "msg-3");
149-
150-
assertThat(listener.processedRecords).hasSize(3);
151149
}
152150

153151
@Test
@@ -170,8 +168,6 @@ void batchIndividualRecordObservationDisabledCreatesNoIndividualObservations(
170168
assertThat(observationHandler.getStartedObservations())
171169
.as("No individual observations should be created when batch individual observation is disabled")
172170
.isZero();
173-
174-
assertThat(batchListener.processedRecords).hasSize(2);
175171
}
176172

177173
@Configuration
@@ -328,36 +324,29 @@ BatchListenerWithoutIndividualObservation batchListenerWithoutIndividualObservat
328324

329325
static class BatchListener {
330326

331-
final CountDownLatch latch = new CountDownLatch(1);
332-
333-
final List<String> processedRecords = new ArrayList<>();
327+
CountDownLatch latch = new CountDownLatch(3);
334328

335329
@KafkaListener(topics = BATCH_INDIVIDUAL_OBSERVATION_TOPIC,
336330
containerFactory = "observationListenerContainerFactory")
337331
public void listen(List<ConsumerRecord<Integer, String>> records) {
338-
339332
for (ConsumerRecord<Integer, String> record : records) {
340-
processedRecords.add(record.value());
333+
latch.countDown();
341334
}
342-
latch.countDown();
343335
}
344336

345337
}
346338

347339
static class BatchListenerWithoutIndividualObservation {
348340

349-
final CountDownLatch latch = new CountDownLatch(1);
350-
351-
final List<String> processedRecords = new ArrayList<>();
341+
CountDownLatch latch = new CountDownLatch(2);
352342

353343
@KafkaListener(topics = BATCH_ONLY_OBSERVATION_TOPIC,
354344
containerFactory = "batchOnlyObservationListenerContainerFactory")
355345

356346
public void listen(List<ConsumerRecord<Integer, String>> records) {
357347
for (ConsumerRecord<Integer, String> record : records) {
358-
processedRecords.add(record.value());
348+
latch.countDown();
359349
}
360-
latch.countDown();
361350
}
362351

363352
}

0 commit comments

Comments
 (0)