|
21 | 21 | import static com.rabbitmq.client.amqp.ConsumerBuilder.StreamOffsetSpecification.LAST; |
22 | 22 | import static com.rabbitmq.client.amqp.ConsumerBuilder.StreamOffsetSpecification.NEXT; |
23 | 23 | import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; |
| 24 | +import static com.rabbitmq.client.amqp.Publisher.Status.ACCEPTED; |
24 | 25 | import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; |
25 | 26 | import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0; |
26 | 27 | import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0; |
|
61 | 62 | import org.junit.jupiter.api.BeforeEach; |
62 | 63 | import org.junit.jupiter.api.Test; |
63 | 64 | import org.junit.jupiter.api.TestInfo; |
| 65 | +import org.slf4j.Logger; |
| 66 | +import org.slf4j.LoggerFactory; |
64 | 67 |
|
65 | 68 | @AmqpTestInfrastructure |
66 | 69 | public class SourceFiltersTest { |
67 | 70 |
|
| 71 | + private static final Logger LOGGER = LoggerFactory.getLogger(SourceFiltersTest.class); |
| 72 | + |
68 | 73 | Connection connection; |
69 | 74 | String name; |
70 | 75 | ArrayArbitrary<Byte, byte[]> binaryArbitrary; |
@@ -585,7 +590,13 @@ void publish(int messageCount, String filterValue) { |
585 | 590 | void publish(int messageCount, UnaryOperator<Message> messageLogic) { |
586 | 591 | try (Publisher publisher = connection.publisherBuilder().queue(name).build()) { |
587 | 592 | Sync publishSync = sync(messageCount); |
588 | | - Publisher.Callback callback = ctx -> publishSync.down(); |
| 593 | + Publisher.Callback callback = ctx -> { |
| 594 | + if (ctx.status() == ACCEPTED) { |
| 595 | + publishSync.down(); |
| 596 | + } else { |
| 597 | + LOGGER.warn("Outbound message not accepted by the broker, status is {}", ctx.status()); |
| 598 | + } |
| 599 | + }; |
589 | 600 | range(0, messageCount) |
590 | 601 | .forEach(ignored -> publisher.publish(messageLogic.apply(publisher.message()), callback)); |
591 | 602 | assertThat(publishSync).completes(); |
|
0 commit comments