2121import static com .rabbitmq .client .amqp .Management .ExchangeType .FANOUT ;
2222import static com .rabbitmq .client .amqp .Management .QueueType .*;
2323import static com .rabbitmq .client .amqp .Management .QueueType .STREAM ;
24+ import static com .rabbitmq .client .amqp .impl .Assertions .assertThat ;
2425import static com .rabbitmq .client .amqp .impl .TestConditions .BrokerVersion .RABBITMQ_4_0_3 ;
2526import static com .rabbitmq .client .amqp .impl .TestUtils .*;
2627import static java .nio .charset .StandardCharsets .*;
@@ -64,7 +65,7 @@ void queueInfoTest() {
6465 management .queue (name ).quorum ().queue ().declare ();
6566
6667 Management .QueueInfo queueInfo = management .queueInfo (name );
67- Assertions . assertThat (queueInfo )
68+ assertThat (queueInfo )
6869 .hasName (name )
6970 .is (QUORUM )
7071 .isDurable ()
@@ -100,10 +101,10 @@ void queueDeclareDeletePublishConsume(String subject) {
100101 acceptedCallback (confirmSync ));
101102 });
102103
103- Assertions . assertThat (confirmSync ).completes ();
104+ assertThat (confirmSync ).completes ();
104105
105106 Management .QueueInfo queueInfo = connection .management ().queueInfo (name );
106- Assertions . assertThat (queueInfo ).hasName (name ).hasNoConsumers ().hasMessageCount (messageCount );
107+ assertThat (queueInfo ).hasName (name ).hasNoConsumers ().hasMessageCount (messageCount );
107108
108109 AtomicReference <String > receivedSubject = new AtomicReference <>();
109110 Sync consumeSync = TestUtils .sync (messageCount );
@@ -119,11 +120,13 @@ void queueDeclareDeletePublishConsume(String subject) {
119120 })
120121 .build ();
121122
122- Assertions .assertThat (consumeSync ).completes ();
123- assertThat (receivedSubject ).doesNotHaveNullValue ().hasValue (subject );
123+ assertThat (consumeSync ).completes ();
124+ org .assertj .core .api .Assertions .assertThat (receivedSubject )
125+ .doesNotHaveNullValue ()
126+ .hasValue (subject );
124127
125128 queueInfo = connection .management ().queueInfo (name );
126- Assertions . assertThat (queueInfo ).hasConsumerCount (1 ).isEmpty ();
129+ assertThat (queueInfo ).hasConsumerCount (1 ).isEmpty ();
127130
128131 consumer .close ();
129132 publisher .close ();
@@ -181,7 +184,7 @@ void binding(String prefix, boolean addBindingArguments, TestInfo info) {
181184 range (0 , messageCount ).forEach (ignored -> publish .accept (publisher1 ));
182185 range (0 , messageCount ).forEach (ignored -> publish .accept (publisher2 ));
183186
184- Assertions . assertThat (confirmSync ).completes ();
187+ assertThat (confirmSync ).completes ();
185188
186189 Sync consumeSync = sync (messageCount * 2 );
187190 com .rabbitmq .client .amqp .Consumer consumer =
@@ -194,7 +197,7 @@ void binding(String prefix, boolean addBindingArguments, TestInfo info) {
194197 consumeSync .down ();
195198 })
196199 .build ();
197- Assertions . assertThat (consumeSync ).completes ();
200+ assertThat (consumeSync ).completes ();
198201 publisher1 .close ();
199202 publisher2 .close ();
200203 consumer .close ();
@@ -239,8 +242,10 @@ void sameTypeMessagesInQueue() {
239242 publisher .publish (publisher .message ("one" .getBytes (UTF_8 )), ctx -> {});
240243 publisher .publish (publisher .message ("two" .getBytes (UTF_8 )), ctx -> {});
241244
242- com .rabbitmq .client .amqp .impl .Assertions .assertThat (consumeLatch ).completes ();
243- assertThat (messageBodies ).hasSize (2 ).containsOnly ("one" .getBytes (UTF_8 ), "two" .getBytes (UTF_8 ));
245+ assertThat (consumeLatch ).completes ();
246+ org .assertj .core .api .Assertions .assertThat (messageBodies )
247+ .hasSize (2 )
248+ .containsOnly ("one" .getBytes (UTF_8 ), "two" .getBytes (UTF_8 ));
244249 }
245250
246251 @ Test
@@ -252,7 +257,7 @@ void pauseShouldStopMessageArrivalUnpauseShouldResumeIt() {
252257 Publisher .Callback callback = ctx -> publishLatch .countDown ();
253258 range (0 , messageCount ).forEach (ignored -> publisher .publish (publisher .message (), callback ));
254259
255- com . rabbitmq . client . amqp . impl . Assertions . assertThat (publishLatch ).completes ();
260+ assertThat (publishLatch ).completes ();
256261
257262 int initialCredits = 10 ;
258263 Set <com .rabbitmq .client .amqp .Consumer .Context > messageContexts = ConcurrentHashMap .newKeySet ();
@@ -266,10 +271,10 @@ void pauseShouldStopMessageArrivalUnpauseShouldResumeIt() {
266271
267272 waitAtMost (() -> messageContexts .size () == initialCredits );
268273
269- Assertions .assertThat (connection .management ().queueInfo (q ))
270- .hasMessageCount (messageCount - initialCredits );
274+ assertThat (connection .management ().queueInfo (q )).hasMessageCount (messageCount - initialCredits );
271275
272- assertThat (Cli .queueInfo (q ).unackedMessageCount ()).isEqualTo (initialCredits );
276+ org .assertj .core .api .Assertions .assertThat (Cli .queueInfo (q ).unackedMessageCount ())
277+ .isEqualTo (initialCredits );
273278
274279 consumer .pause ();
275280 new ArrayList <>(messageContexts ).forEach (com .rabbitmq .client .amqp .Consumer .Context ::accept );
@@ -306,7 +311,7 @@ void publisherSendingShouldThrowWhenExchangeHasBeenDeleted() {
306311 connection .management ().binding ().sourceExchange (name ).destinationQueue (q ).bind ();
307312 Sync sync = sync ();
308313 publisher .publish (publisher .message (), acceptedCallback (sync ));
309- Assertions . assertThat (sync ).completes ();
314+ assertThat (sync ).completes ();
310315 } finally {
311316 connection .management ().exchangeDeletion ().delete (name );
312317 }
@@ -321,11 +326,11 @@ void publisherSendingShouldThrowWhenExchangeHasBeenDeleted() {
321326 return true ;
322327 }
323328 });
324- Assertions . assertThat (closedSync ).completes ();
329+ assertThat (closedSync ).completes ();
325330 of (exception .get (), closedException .get ())
326331 .forEach (
327332 e ->
328- assertThat (e )
333+ org . assertj . core . api . Assertions . assertThat (e )
329334 .isInstanceOf (AmqpException .AmqpEntityDoesNotExistException .class )
330335 .hasMessageContaining (name )
331336 .hasMessageContaining (ExceptionUtils .ERROR_NOT_FOUND ));
@@ -354,7 +359,7 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() {
354359 try {
355360 Sync sync = sync ();
356361 publisher .publish (publisher .message (), acceptedCallback (sync ));
357- Assertions . assertThat (sync ).completes ();
362+ assertThat (sync ).completes ();
358363 } finally {
359364 connection .management ().queueDeletion ().delete (name );
360365 }
@@ -369,11 +374,11 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() {
369374 return true ;
370375 }
371376 });
372- Assertions . assertThat (closedSync ).completes ();
377+ assertThat (closedSync ).completes ();
373378 of (exception .get (), closedException .get ())
374379 .forEach (
375380 e ->
376- assertThat (e )
381+ org . assertj . core . api . Assertions . assertThat (e )
377382 .isInstanceOf (AmqpException .AmqpEntityDoesNotExistException .class )
378383 .hasMessageContaining (ExceptionUtils .ERROR_RESOURCE_DELETED ));
379384 }
@@ -397,8 +402,8 @@ void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToPropert
397402 Sync acceptedSync = sync ();
398403 publisher .publish (
399404 publisher .message ().toAddress ().queue (name ).message (), ctx -> acceptedSync .down ());
400- Assertions . assertThat (acceptedSync ).completes ();
401- Assertions . assertThat (consumedSync ).completes ();
405+ assertThat (acceptedSync ).completes ();
406+ assertThat (consumedSync ).completes ();
402407
403408 acceptedSync .reset ();
404409 consumedSync .reset ();
@@ -407,13 +412,13 @@ void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToPropert
407412 publisher .publish (
408413 publisher .message ().toAddress ().exchange (doesNotExist ).message (),
409414 ctx -> rejectedSync .down ());
410- Assertions . assertThat (rejectedSync ).completes ();
415+ assertThat (rejectedSync ).completes ();
411416
412- Assertions . assertThat (consumedSync ).hasNotCompleted ();
417+ assertThat (consumedSync ).hasNotCompleted ();
413418 publisher .publish (
414419 publisher .message ().toAddress ().queue (name ).message (), ctx -> acceptedSync .down ());
415- Assertions . assertThat (acceptedSync ).completes ();
416- Assertions . assertThat (consumedSync ).completes ();
420+ assertThat (acceptedSync ).completes ();
421+ assertThat (consumedSync ).completes ();
417422 }
418423
419424 @ Test
@@ -455,10 +460,10 @@ void consumerShouldGetClosedWhenQueueIsDeleted() {
455460 .build ();
456461 Publisher publisher = connection .publisherBuilder ().queue (name ).build ();
457462 publisher .publish (publisher .message (), ctx -> {});
458- Assertions . assertThat (consumeSync ).completes ();
463+ assertThat (consumeSync ).completes ();
459464 connection .management ().queueDeletion ().delete (name );
460- Assertions . assertThat (closedSync ).completes ();
461- assertThat (exception .get ())
465+ assertThat (closedSync ).completes ();
466+ org . assertj . core . api . Assertions . assertThat (exception .get ())
462467 .isInstanceOf (AmqpException .AmqpEntityDoesNotExistException .class )
463468 .hasMessageContaining (ExceptionUtils .ERROR_RESOURCE_DELETED );
464469 }
@@ -472,7 +477,7 @@ void consumerPauseThenClose() {
472477 Sync publishSync = sync (messageCount );
473478 range (0 , messageCount )
474479 .forEach (ignored -> publisher .publish (publisher .message (), ctx -> publishSync .down ()));
475- Assertions . assertThat (publishSync ).completes ();
480+ assertThat (publishSync ).completes ();
476481
477482 AtomicInteger receivedCount = new AtomicInteger (0 );
478483 Set <com .rabbitmq .client .amqp .Consumer .Context > unsettledMessages =
@@ -493,13 +498,13 @@ void consumerPauseThenClose() {
493498 .build ();
494499
495500 int unsettledMessageCount = waitUntilStable (unsettledMessages ::size );
496- assertThat (unsettledMessageCount ).isNotZero ();
501+ org . assertj . core . api . Assertions . assertThat (unsettledMessageCount ).isNotZero ();
497502 consumer .pause ();
498503 int receivedCountAfterPausing = receivedCount .get ();
499504 unsettledMessages .forEach (com .rabbitmq .client .amqp .Consumer .Context ::accept );
500505 consumer .close ();
501- assertThat (receivedCount ).hasValue (receivedCountAfterPausing );
502- Assertions . assertThat (connection .management ().queueInfo (name ))
506+ org . assertj . core . api . Assertions . assertThat (receivedCount ).hasValue (receivedCountAfterPausing );
507+ assertThat (connection .management ().queueInfo (name ))
503508 .hasMessageCount (messageCount - receivedCount .get ());
504509 }
505510
@@ -512,7 +517,7 @@ void consumerGracefulShutdownExample() {
512517 Sync publishSync = sync (messageCount );
513518 range (0 , messageCount )
514519 .forEach (ignored -> publisher .publish (publisher .message (), ctx -> publishSync .down ()));
515- Assertions . assertThat (publishSync ).completes ();
520+ assertThat (publishSync ).completes ();
516521
517522 AtomicInteger receivedCount = new AtomicInteger (0 );
518523 Random random = new Random ();
@@ -534,7 +539,7 @@ void consumerGracefulShutdownExample() {
534539 consumer .pause ();
535540 waitAtMost (() -> consumer .unsettledMessageCount () == 0 );
536541 consumer .close ();
537- Assertions . assertThat (connection .management ().queueInfo (name ))
542+ assertThat (connection .management ().queueInfo (name ))
538543 .hasMessageCount (messageCount - receivedCount .get ());
539544 }
540545
@@ -548,7 +553,7 @@ void consumerUnsettledMessagesGoBackToQueueAfterClosing() {
548553 Sync publishSync = sync (messageCount );
549554 range (0 , messageCount )
550555 .forEach (ignored -> publisher .publish (publisher .message (), ctx -> publishSync .down ()));
551- Assertions . assertThat (publishSync ).completes ();
556+ assertThat (publishSync ).completes ();
552557
553558 AtomicInteger receivedCount = new AtomicInteger (0 );
554559 com .rabbitmq .client .amqp .Consumer consumer =
@@ -567,7 +572,7 @@ void consumerUnsettledMessagesGoBackToQueueAfterClosing() {
567572
568573 waitAtMost (() -> receivedCount .get () > settledCount );
569574 consumer .close ();
570- Assertions . assertThat (connection .management ().queueInfo (name ))
575+ assertThat (connection .management ().queueInfo (name ))
571576 .hasMessageCount (messageCount - settledCount );
572577 }
573578
@@ -612,17 +617,17 @@ void consumerWithHigherPriorityShouldGetMessagesFirst() {
612617
613618 publish .run ();
614619
615- Assertions . assertThat (consumeSync ).completes ();
616- assertThat (lowCount ).hasValue (0 );
617- assertThat (highCount ).hasValue (messageCount );
620+ assertThat (consumeSync ).completes ();
621+ org . assertj . core . api . Assertions . assertThat (lowCount ).hasValue (0 );
622+ org . assertj . core . api . Assertions . assertThat (highCount ).hasValue (messageCount );
618623
619624 highPriorityConsumer .close ();
620625
621626 consumeSync .reset (messageCount );
622627 publish .run ();
623- Assertions . assertThat (consumeSync ).completes ();
624- assertThat (lowCount ).hasValue (messageCount );
625- assertThat (highCount ).hasValue (messageCount );
628+ assertThat (consumeSync ).completes ();
629+ org . assertj . core . api . Assertions . assertThat (lowCount ).hasValue (messageCount );
630+ org . assertj . core . api . Assertions . assertThat (highCount ).hasValue (messageCount );
626631
627632 lowPriorityConsumer .close ();
628633 }
@@ -649,7 +654,7 @@ void redeclareExchangesWithDifferentArguments() {
649654 management .exchange (name ).type (FANOUT ).declare ();
650655 fail ("Declaring an existing exchange with different arguments should trigger an exception" );
651656 } catch (AmqpException e ) {
652- assertThat (e ).hasMessageContaining ("409" );
657+ org . assertj . core . api . Assertions . assertThat (e ).hasMessageContaining ("409" );
653658 // OK
654659 } finally {
655660 management .exchangeDeletion ().delete (name );
@@ -670,7 +675,7 @@ void declareQueueWithUnsupportedArgument() {
670675 operation .accept (management );
671676 fail ("Creating a queue with unsupported arguments should trigger an exception" );
672677 } catch (AmqpException e ) {
673- assertThat (e ).hasMessageContaining ("409" );
678+ org . assertj . core . api . Assertions . assertThat (e ).hasMessageContaining ("409" );
674679 }
675680 });
676681 }
@@ -696,7 +701,7 @@ void publishedMessageShouldBeRejectedWhenQueueLimitIsReached(TestInfo info) {
696701 Publisher publisher = connection .publisherBuilder ().queue (q ).build ();
697702 IntStream .range (0 , maxLength + 1 )
698703 .forEach (ignored -> publisher .publish (publisher .message (), callback ));
699- Assertions . assertThat (rejectedLatch ).completes ();
704+ assertThat (rejectedLatch ).completes ();
700705 } finally {
701706 management .queueDeletion ().delete (q );
702707 }
0 commit comments