Skip to content

Commit 98c1640

Browse files
committed
Fix Flaky Reactor Template Test
Wait for send offsets to complete.
1 parent 3203058 commit 98c1640

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
import reactor.core.publisher.Flux;
5454
import reactor.core.publisher.Mono;
55+
import reactor.core.scheduler.Schedulers;
5556
import reactor.kafka.receiver.ReceiverOptions;
5657
import reactor.kafka.receiver.ReceiverRecord;
5758
import reactor.kafka.sender.SenderOptions;
@@ -298,8 +299,7 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
298299
StepVerifier.create(
299300
reactiveKafkaConsumerTemplate
300301
.receiveExactlyOnce(this.reactiveKafkaProducerTemplate.transactionManager())
301-
.concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux, false))
302-
.delayElements(Duration.ofMillis(100)))
302+
.concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux)))
303303
.assertNext(senderResult -> {
304304
assertThat(senderResult.correlationMetadata().intValue()).isEqualTo(DEFAULT_KEY);
305305
assertThat(senderResult.recordMetadata().offset()).isGreaterThan(0);
@@ -319,6 +319,16 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
319319
.verify(DEFAULT_VERIFY_TIMEOUT);
320320
}
321321

322+
private Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, String>> fluxConsumerRecord) {
323+
return reactiveKafkaProducerTemplate
324+
.send(fluxConsumerRecord.map(this::toSenderRecord))
325+
.publishOn(Schedulers.boundedElastic())
326+
.map(sr -> {
327+
reactiveKafkaProducerTemplate.transactionManager().commit().block();
328+
return sr;
329+
});
330+
}
331+
322332
private Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, String>> fluxConsumerRecord,
323333
boolean failCommit) {
324334

0 commit comments

Comments
 (0)