@@ -718,6 +718,43 @@ void testWithConsumerStartWhenNotActive() {
718718 assertThat (start .getCount ()).isEqualTo (0L );
719719 }
720720
721+ @ Test
722+ @ SuppressWarnings ("unchecked" )
723+ void testShutdownWithPendingReplies () {
724+ ConnectionFactory connectionFactory = mock (ConnectionFactory .class );
725+ Connection connection = mock (Connection .class );
726+ Channel channel = mock (Channel .class );
727+ given (connectionFactory .createConnection ()).willReturn (connection );
728+ given (connection .createChannel (false )).willReturn (channel );
729+ given (channel .isOpen ()).willReturn (true );
730+
731+ RabbitTemplate template = new RabbitTemplate (connectionFactory );
732+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer (connectionFactory );
733+ container .setQueueNames ("foo" );
734+ container .setMessageListener (mock (MessageListener .class ));
735+
736+ long shutdownTimeout = 2000L ;
737+ long checkInterval = 500L ;
738+ container .setShutdownTimeout (shutdownTimeout );
739+ container .setPendingReplyCheckInterval (checkInterval );
740+ container .setPendingReplyProvider (template ::getPendingReplyCount );
741+
742+ Map <String , Object > replyHolder = (Map <String , Object >) ReflectionTestUtils .getField (template , "replyHolder" );
743+ assertThat (replyHolder ).isNotNull ();
744+ replyHolder .put ("foo" , new CompletableFuture <Message >());
745+
746+ assertThat (template .getPendingReplyCount ()).isEqualTo (1 );
747+
748+ container .start ();
749+
750+ long startTime = System .currentTimeMillis ();
751+ container .stop ();
752+ long stopDuration = System .currentTimeMillis () - startTime ;
753+
754+ assertThat (stopDuration ).isGreaterThanOrEqualTo (shutdownTimeout - 500 );
755+ assertThat (template .getPendingReplyCount ()).isEqualTo (1 );
756+ }
757+
721758 private Answer <Object > messageToConsumer (final Channel mockChannel , final SimpleMessageListenerContainer container ,
722759 final boolean cancel , final CountDownLatch latch ) {
723760 return invocation -> {
@@ -801,40 +838,4 @@ public void execute(Runnable task) {
801838
802839 }
803840
804- @ Test
805- @ SuppressWarnings ("unchecked" )
806- void testShutdownWithPendingReplies () {
807- ConnectionFactory connectionFactory = mock (ConnectionFactory .class );
808- Connection connection = mock (Connection .class );
809- Channel channel = mock (Channel .class );
810- given (connectionFactory .createConnection ()).willReturn (connection );
811- given (connection .createChannel (false )).willReturn (channel );
812- given (channel .isOpen ()).willReturn (true );
813-
814- RabbitTemplate template = new RabbitTemplate (connectionFactory );
815- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer (connectionFactory );
816- container .setQueueNames ("foo" );
817- container .setMessageListener (mock (MessageListener .class ));
818-
819- long shutdownTimeout = 2000L ;
820- long checkInterval = 500L ;
821- container .setShutdownTimeout (shutdownTimeout );
822- container .setPendingReplyCheckInterval (checkInterval );
823- container .setPendingReplyProvider (template ::getPendingReplyCount );
824-
825- Map <String , Object > replyHolder = (Map <String , Object >) ReflectionTestUtils .getField (template , "replyHolder" );
826- assertThat (replyHolder ).isNotNull ();
827- replyHolder .put ("foo" , new CompletableFuture <Message >());
828-
829- assertThat (template .getPendingReplyCount ()).isEqualTo (1 );
830-
831- container .start ();
832-
833- long startTime = System .currentTimeMillis ();
834- container .stop ();
835- long stopDuration = System .currentTimeMillis () - startTime ;
836-
837- assertThat (stopDuration ).isGreaterThanOrEqualTo (shutdownTimeout - 500 );
838- assertThat (template .getPendingReplyCount ()).isEqualTo (1 );
839- }
840841}
0 commit comments