Skip to content

Commit 671e1b1

Browse files
committed
Wait until expected number of message is reached in test
Instead of waiting for a stable value.
1 parent 0f1ec55 commit 671e1b1

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
2727
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0;
2828
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
29+
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
2930
import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable;
3031
import static java.nio.charset.StandardCharsets.UTF_8;
3132
import static java.util.stream.IntStream.range;
@@ -590,13 +591,15 @@ void publish(int messageCount, String filterValue) {
590591
void publish(int messageCount, UnaryOperator<Message> messageLogic) {
591592
try (Publisher publisher = connection.publisherBuilder().queue(name).build()) {
592593
Sync publishSync = sync(messageCount);
593-
Publisher.Callback callback = ctx -> {
594-
if (ctx.status() == ACCEPTED) {
595-
publishSync.down();
596-
} else {
597-
LOGGER.warn("Outbound message not accepted by the broker, status is {}", ctx.status());
598-
}
599-
};
594+
Publisher.Callback callback =
595+
ctx -> {
596+
if (ctx.status() == ACCEPTED) {
597+
publishSync.down();
598+
} else {
599+
LOGGER.warn(
600+
"Outbound message not accepted by the broker, status is {}", ctx.status());
601+
}
602+
};
600603
range(0, messageCount)
601604
.forEach(ignored -> publisher.publish(messageLogic.apply(publisher.message()), callback));
602605
assertThat(publishSync).completes();
@@ -625,7 +628,7 @@ List<Message> consume(
625628

626629
try (Consumer ignored = builder.build()) {
627630
assertThat(consumedSync).completes();
628-
waitUntilStable(receivedMessageCount::get, Duration.ofMillis(50));
631+
waitAtMost(() -> receivedMessageCount.get() >= expectedMessageCount);
629632
assertThat(receivedMessageCount).hasValue(expectedMessageCount);
630633
}
631634

0 commit comments

Comments
 (0)