1919
2020import static com .rabbitmq .client .amqp .Management .ExchangeType .DIRECT ;
2121import static com .rabbitmq .client .amqp .Management .ExchangeType .FANOUT ;
22+ import static com .rabbitmq .client .amqp .Publisher .Status .ACCEPTED ;
2223import static com .rabbitmq .client .amqp .impl .Assertions .assertThat ;
2324import static com .rabbitmq .client .amqp .impl .TestUtils .waitAtMost ;
2425import static java .time .Duration .ofMillis ;
3536import com .rabbitmq .client .amqp .Management ;
3637import com .rabbitmq .client .amqp .Publisher ;
3738import com .rabbitmq .client .amqp .Resource ;
39+ import com .rabbitmq .client .amqp .impl .TestUtils .Sync ;
3840import java .util .List ;
3941import java .util .Set ;
4042import java .util .concurrent .*;
4446import org .junit .jupiter .api .*;
4547import org .junit .jupiter .params .ParameterizedTest ;
4648import org .junit .jupiter .params .provider .ValueSource ;
49+ import org .slf4j .Logger ;
50+ import org .slf4j .LoggerFactory ;
4751
4852@ TestUtils .DisabledIfRabbitMqCtlNotSet
4953@ AmqpTestInfrastructure
5054public class TopologyRecoveryTest {
5155
56+ private static final Logger LOGGER = LoggerFactory .getLogger (TopologyRecoveryTest .class );
57+
5258 static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy .fixed (ofMillis (100 ));
5359 static Environment environment ;
5460 TestInfo testInfo ;
5561 String connectionName ;
56- TestUtils . Sync recoveredSync ;
62+ Sync recoveredSync ;
5763 AtomicInteger connectionAttemptCount ;
5864
5965 @ BeforeEach
@@ -247,7 +253,7 @@ void autoDeleteExchangeAndExclusiveQueueShouldBeRedeclared(boolean isolateResour
247253 connection .management ().queue (q ).autoDelete (true ).exclusive (true ).declare ();
248254 connection .management ().binding ().sourceExchange (e ).key ("foo" ).destinationQueue (q ).bind ();
249255
250- TestUtils . Sync consumeSync = TestUtils .sync ();
256+ Sync consumeSync = TestUtils .sync ();
251257 Publisher publisher = connection .publisherBuilder ().exchange (e ).key ("foo" ).build ();
252258 connection
253259 .consumerBuilder ()
@@ -288,7 +294,7 @@ void autoDeleteExchangeAndExclusiveQueueWithE2eBindingShouldBeRedeclared(
288294 connection .management ().binding ().sourceExchange (e1 ).destinationExchange (e2 ).bind ();
289295 connection .management ().binding ().sourceExchange (e2 ).destinationQueue (q ).bind ();
290296
291- TestUtils . Sync consumeSync = TestUtils .sync ();
297+ Sync consumeSync = TestUtils .sync ();
292298 Publisher publisher = connection .publisherBuilder ().exchange (e1 ).build ();
293299 connection
294300 .consumerBuilder ()
@@ -326,7 +332,7 @@ void deletedQueueBindingIsNotRecovered(boolean isolateResources) {
326332 connection .management ().queue (q ).declare ();
327333 connection .management ().binding ().sourceExchange (e ).destinationQueue (q ).bind ();
328334
329- TestUtils . Sync consumeSync = TestUtils .sync ();
335+ Sync consumeSync = TestUtils .sync ();
330336 Publisher publisher = connection .publisherBuilder ().exchange (e ).build ();
331337 Consumer consumer =
332338 connection
@@ -382,7 +388,7 @@ void deletedExchangeBindingIsNotRecovered(boolean isolateResources) {
382388 connection .management ().binding ().sourceExchange (e1 ).destinationExchange (e2 ).bind ();
383389 connection .management ().binding ().sourceExchange (e2 ).destinationQueue (q ).bind ();
384390
385- TestUtils . Sync consumeSync = TestUtils .sync ();
391+ Sync consumeSync = TestUtils .sync ();
386392 Publisher publisher = connection .publisherBuilder ().exchange (e1 ).build ();
387393 Consumer consumer =
388394 connection
@@ -486,7 +492,7 @@ void recoverConsumers(boolean isolateResources) {
486492 connection .management ().queue (q ).declare ();
487493 connection .management ().binding ().sourceExchange (e ).destinationQueue (q ).bind ();
488494 int consumerCount = 60 ;
489- TestUtils . Sync consumeSync = TestUtils .sync (consumerCount );
495+ Sync consumeSync = TestUtils .sync (consumerCount );
490496 Consumer .MessageHandler handler =
491497 (ctx , m ) -> {
492498 consumeSync .down ();
@@ -526,7 +532,7 @@ void recoverPublisherConsumerSeveralTimes(boolean isolateResources) {
526532 connection .management ().binding ().sourceExchange (e ).destinationQueue (q ).bind ();
527533
528534 Publisher publisher = connection .publisherBuilder ().exchange (e ).build ();
529- TestUtils . Sync consumeSync = TestUtils .sync ();
535+ Sync consumeSync = TestUtils .sync ();
530536 connection
531537 .consumerBuilder ()
532538 .queue (q )
@@ -614,7 +620,7 @@ void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) {
614620
615621 String queueName = queueInfo .name ();
616622
617- TestUtils . Sync consumeSync = TestUtils .sync ();
623+ Sync consumeSync = TestUtils .sync ();
618624 Publisher publisher = connection .publisherBuilder ().queue (queueName ).build ();
619625 connection
620626 .consumerBuilder ()
@@ -638,31 +644,43 @@ void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) {
638644 }
639645
640646 @ Test
641- void shouldRecoverEvenIfManagementIsClosed () {
647+ void shouldRecoverEvenIfManagementIsClosed (TestInfo info ) {
642648 try (Connection connection = connection ()) {
643649 Management management = connection .management ();
644650 Management .QueueInfo queueInfo = management .queue ().exclusive (true ).declare ();
645651 Publisher publisher = connection .publisherBuilder ().queue (queueInfo .name ()).build ();
646- publisher .publish (publisher .message (), ctx -> {});
647- TestUtils .Sync consumeSync = TestUtils .sync ();
652+ Sync publishSync = TestUtils .sync ();
653+ Publisher .Callback callback =
654+ ctx -> {
655+ if (ctx .status () == ACCEPTED ) {
656+ publishSync .down ();
657+ } else {
658+ LOGGER .warn (
659+ "Unexpected status: {} ({})" , ctx .status (), info .getTestMethod ().get ().getName ());
660+ }
661+ };
662+ publisher .publish (publisher .message (), callback );
663+ assertThat (publishSync ).completes ();
664+ Sync consumeSync = TestUtils .sync ();
648665 connection
649666 .consumerBuilder ()
650667 .queue (queueInfo .name ())
651668 .messageHandler (
652669 (ctx , message ) -> {
653- ctx .accept ();
654670 consumeSync .down ();
671+ ctx .accept ();
655672 })
656673 .build ();
657674
658675 assertThat (consumeSync ).completes ();
659676 waitAtMost (() -> management .queueInfo (queueInfo .name ()).messageCount () == 0 );
660677 management .close ();
661678
679+ publishSync .reset ();
662680 consumeSync .reset ();
663681
664682 closeConnectionAndWaitForRecovery ();
665- publisher .publish (publisher .message (), ctx -> {} );
683+ publisher .publish (publisher .message (), callback );
666684 assertThat (consumeSync ).completes ();
667685 assertThatThrownBy (() -> management .queueInfo (queueInfo .name ()))
668686 .isInstanceOf (AmqpResourceClosedException .class );
@@ -710,7 +728,7 @@ Connection connection() {
710728 Connection connection (
711729 String name ,
712730 boolean isolateResources ,
713- TestUtils . Sync recoveredSync ,
731+ Sync recoveredSync ,
714732 java .util .function .Consumer <AmqpConnectionBuilder > builderCallback ) {
715733 AmqpConnectionBuilder builder = (AmqpConnectionBuilder ) environment .connectionBuilder ();
716734 builder
0 commit comments