diff --git a/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java b/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java index d4b5a345..b7287c95 100644 --- a/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java +++ b/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -368,7 +368,11 @@ public void sendChainFailure() { StepVerifier.create(chain.then()) .expectError(InvalidTopicException.class) .verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT)); - assertEquals(maxInflight, producer.sendCount.get()); + // We have to wait until all the tasks are finished in order to validate the sendCount, + // otherwise we're risking a race condition. The sender's internal scheduler is not something we can control + // at this time since it's created internally. + await().atMost(Duration.ofMillis(DEFAULT_TEST_TIMEOUT)).pollInterval(Duration.ofMillis(20)) + .untilAsserted(() -> assertEquals(maxInflight, producer.sendCount.get())); } /**