1616package com .rabbitmq .client .test .functional ;
1717
1818import com .rabbitmq .client .*;
19+ import com .rabbitmq .client .AMQP .BasicProperties ;
1920import com .rabbitmq .client .impl .CredentialsProvider ;
2021import com .rabbitmq .client .impl .NetworkConnection ;
2122import com .rabbitmq .client .impl .recovery .*;
3435import java .util .concurrent .TimeUnit ;
3536import java .util .concurrent .TimeoutException ;
3637import java .util .concurrent .atomic .AtomicInteger ;
37- import java .util .concurrent .atomic .AtomicLong ;
3838import java .util .concurrent .atomic .AtomicReference ;
3939
4040import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
@@ -168,11 +168,13 @@ public String getPassword() {
168168 final List <String > events = new CopyOnWriteArrayList <String >();
169169 final CountDownLatch latch = new CountDownLatch (2 ); // one when started, another when complete
170170 connection .addShutdownListener (new ShutdownListener () {
171+ @ Override
171172 public void shutdownCompleted (ShutdownSignalException cause ) {
172173 events .add ("shutdown hook 1" );
173174 }
174175 });
175176 connection .addShutdownListener (new ShutdownListener () {
177+ @ Override
176178 public void shutdownCompleted (ShutdownSignalException cause ) {
177179 events .add ("shutdown hook 2" );
178180 }
@@ -211,6 +213,7 @@ public void handleRecoveryStarted(Recoverable recoverable) {
211213 @ Test public void shutdownHooksRecoveryOnConnection () throws IOException , InterruptedException {
212214 final CountDownLatch latch = new CountDownLatch (2 );
213215 connection .addShutdownListener (new ShutdownListener () {
216+ @ Override
214217 public void shutdownCompleted (ShutdownSignalException cause ) {
215218 latch .countDown ();
216219 }
@@ -225,6 +228,7 @@ public void shutdownCompleted(ShutdownSignalException cause) {
225228 @ Test public void shutdownHooksRecoveryOnChannel () throws IOException , InterruptedException {
226229 final CountDownLatch latch = new CountDownLatch (3 );
227230 channel .addShutdownListener (new ShutdownListener () {
231+ @ Override
228232 public void shutdownCompleted (ShutdownSignalException cause ) {
229233 latch .countDown ();
230234 }
@@ -241,10 +245,12 @@ public void shutdownCompleted(ShutdownSignalException cause) {
241245 @ Test public void blockedListenerRecovery () throws IOException , InterruptedException {
242246 final CountDownLatch latch = new CountDownLatch (2 );
243247 connection .addBlockedListener (new BlockedListener () {
248+ @ Override
244249 public void handleBlocked (String reason ) throws IOException {
245250 latch .countDown ();
246251 }
247252
253+ @ Override
248254 public void handleUnblocked () throws IOException {
249255 latch .countDown ();
250256 }
@@ -270,6 +276,7 @@ public void handleUnblocked() throws IOException {
270276 @ Test public void returnListenerRecovery () throws IOException , InterruptedException {
271277 final CountDownLatch latch = new CountDownLatch (1 );
272278 channel .addReturnListener (new ReturnListener () {
279+ @ Override
273280 public void handleReturn (int replyCode , String replyText , String exchange ,
274281 String routingKey , AMQP .BasicProperties properties ,
275282 byte [] body ) throws IOException {
@@ -285,10 +292,12 @@ public void handleReturn(int replyCode, String replyText, String exchange,
285292 @ Test public void confirmListenerRecovery () throws IOException , InterruptedException , TimeoutException {
286293 final CountDownLatch latch = new CountDownLatch (1 );
287294 channel .addConfirmListener (new ConfirmListener () {
295+ @ Override
288296 public void handleAck (long deliveryTag , boolean multiple ) throws IOException {
289297 latch .countDown ();
290298 }
291299
300+ @ Override
292301 public void handleNack (long deliveryTag , boolean multiple ) throws IOException {
293302 latch .countDown ();
294303 }
@@ -693,9 +702,11 @@ public void consumerRecovered(String oldConsumerTag, String newConsumerTag) {
693702 final CountDownLatch latch = new CountDownLatch (2 );
694703 final CountDownLatch startLatch = new CountDownLatch (2 );
695704 final RecoveryListener listener = new RecoveryListener () {
705+ @ Override
696706 public void handleRecovery (Recoverable recoverable ) {
697707 latch .countDown ();
698708 }
709+ @ Override
699710 public void handleRecoveryStarted (Recoverable recoverable ) {
700711 startLatch .countDown ();
701712 }
@@ -795,33 +806,93 @@ public void handleDelivery(String consumerTag,
795806 connection .close ();
796807 }
797808 }
809+
810+ @ Test public void recoveryWithMultipleThreads () throws Exception {
811+ // test with 8 recovery threads
812+ ConnectionFactory connectionFactory = buildConnectionFactoryWithRecoveryEnabled (false , 8 );
813+ assertEquals (8 , connectionFactory .getTopologyRecoveryThreadCount ());
814+ RecoverableConnection testConnection = (RecoverableConnection ) connectionFactory .newConnection ();
815+ try {
816+ final List <Channel > channels = new ArrayList <Channel >();
817+ final List <String > exchanges = new ArrayList <String >();
818+ final List <String > queues = new ArrayList <String >();
819+ // create 16 channels
820+ final int channelCount = 16 ;
821+ final int queuesPerChannel = 20 ;
822+ final CountDownLatch latch = new CountDownLatch (channelCount * queuesPerChannel );
823+ for (int i =0 ; i < channelCount ; i ++) {
824+ final Channel testChannel = testConnection .createChannel ();
825+ String x = "tmp-x-topic-" + i ;
826+ exchanges .add (x );
827+ testChannel .exchangeDeclare (x , "topic" );
828+ // create 20 queues and bindings per channel
829+ for (int j =0 ; j < queuesPerChannel ; j ++) {
830+ String q = "tmp-q-" + i + "-" + j ;
831+ queues .add (q );
832+ testChannel .queueDeclare (q , false , false , true , null );
833+ testChannel .queueBind (q , x , "tmp-key-" + i + "-" + j );
834+ testChannel .basicConsume (q , new DefaultConsumer (testChannel ) {
835+ @ Override
836+ public void handleDelivery (String consumerTag , Envelope envelope , BasicProperties properties , byte [] body )
837+ throws IOException {
838+ testChannel .basicAck (envelope .getDeliveryTag (), false );
839+ latch .countDown ();
840+ }
841+ });
842+ }
843+ }
844+ // now do recovery
845+ closeAndWaitForRecovery (testConnection );
846+
847+ // verify channels & topology recovered by publishing a message to each
848+ for (int i =0 ; i < channelCount ; i ++) {
849+ Channel ch = channels .get (i );
850+ expectChannelRecovery (ch );
851+ // publish message to each queue/consumer
852+ for (int j =0 ; j < queuesPerChannel ; j ++) {
853+ ch .basicPublish ("tmp-x-topic-" + i , "tmp-key-" + i + "-" + j , null , "msg" .getBytes ());
854+ }
855+ }
856+ // verify all queues/consumers got it
857+ assertTrue (latch .await (30 , TimeUnit .SECONDS ));
858+
859+ // cleanup
860+ Channel cleanupChannel = testConnection .createChannel ();
861+ for (String q : queues )
862+ cleanupChannel .queueDelete (q );
863+ for (String x : exchanges )
864+ cleanupChannel .exchangeDelete (x );
865+ } finally {
866+ testConnection .close ();
867+ }
868+ }
798869
799870 private void assertConsumerCount (int exp , String q ) throws IOException {
800871 assertEquals (exp , channel .queueDeclarePassive (q ).getConsumerCount ());
801872 }
802873
803- private AMQP .Queue .DeclareOk declareClientNamedQueue (Channel ch , String q ) throws IOException {
874+ private static AMQP .Queue .DeclareOk declareClientNamedQueue (Channel ch , String q ) throws IOException {
804875 return ch .queueDeclare (q , true , false , false , null );
805876 }
806877
807- private AMQP .Queue .DeclareOk declareClientNamedAutoDeleteQueue (Channel ch , String q ) throws IOException {
878+ private static AMQP .Queue .DeclareOk declareClientNamedAutoDeleteQueue (Channel ch , String q ) throws IOException {
808879 return ch .queueDeclare (q , true , false , true , null );
809880 }
810881
811882
812- private void declareClientNamedQueueNoWait (Channel ch , String q ) throws IOException {
883+ private static void declareClientNamedQueueNoWait (Channel ch , String q ) throws IOException {
813884 ch .queueDeclareNoWait (q , true , false , false , null );
814885 }
815886
816- private AMQP .Exchange .DeclareOk declareExchange (Channel ch , String x ) throws IOException {
887+ private static AMQP .Exchange .DeclareOk declareExchange (Channel ch , String x ) throws IOException {
817888 return ch .exchangeDeclare (x , "fanout" , false );
818889 }
819890
820- private void declareExchangeNoWait (Channel ch , String x ) throws IOException {
891+ private static void declareExchangeNoWait (Channel ch , String x ) throws IOException {
821892 ch .exchangeDeclareNoWait (x , "fanout" , false , false , false , null );
822893 }
823894
824- private void expectQueueRecovery (Channel ch , String q ) throws IOException , InterruptedException , TimeoutException {
895+ private static void expectQueueRecovery (Channel ch , String q ) throws IOException , InterruptedException , TimeoutException {
825896 ch .confirmSelect ();
826897 ch .queuePurge (q );
827898 AMQP .Queue .DeclareOk ok1 = declareClientNamedQueue (ch , q );
@@ -832,7 +903,7 @@ private void expectQueueRecovery(Channel ch, String q) throws IOException, Inter
832903 assertEquals (1 , ok2 .getMessageCount ());
833904 }
834905
835- private void expectAutoDeleteQueueAndBindingRecovery (Channel ch , String x , String q ) throws IOException , InterruptedException ,
906+ private static void expectAutoDeleteQueueAndBindingRecovery (Channel ch , String x , String q ) throws IOException , InterruptedException ,
836907 TimeoutException {
837908 ch .confirmSelect ();
838909 ch .queuePurge (q );
@@ -845,7 +916,7 @@ private void expectAutoDeleteQueueAndBindingRecovery(Channel ch, String x, Strin
845916 assertEquals (1 , ok2 .getMessageCount ());
846917 }
847918
848- private void expectExchangeRecovery (Channel ch , String x ) throws IOException , InterruptedException , TimeoutException {
919+ private static void expectExchangeRecovery (Channel ch , String x ) throws IOException , InterruptedException , TimeoutException {
849920 ch .confirmSelect ();
850921 String q = ch .queueDeclare ().getQueue ();
851922 final String rk = "routing-key" ;
@@ -855,22 +926,25 @@ private void expectExchangeRecovery(Channel ch, String x) throws IOException, In
855926 ch .exchangeDeclarePassive (x );
856927 }
857928
858- private CountDownLatch prepareForRecovery (Connection conn ) {
929+ private static CountDownLatch prepareForRecovery (Connection conn ) {
859930 final CountDownLatch latch = new CountDownLatch (1 );
860931 ((AutorecoveringConnection )conn ).addRecoveryListener (new RecoveryListener () {
932+ @ Override
861933 public void handleRecovery (Recoverable recoverable ) {
862934 latch .countDown ();
863935 }
936+ @ Override
864937 public void handleRecoveryStarted (Recoverable recoverable ) {
865938 // No-op
866939 }
867940 });
868941 return latch ;
869942 }
870943
871- private CountDownLatch prepareForShutdown (Connection conn ) throws InterruptedException {
944+ private static CountDownLatch prepareForShutdown (Connection conn ) {
872945 final CountDownLatch latch = new CountDownLatch (1 );
873946 conn .addShutdownListener (new ShutdownListener () {
947+ @ Override
874948 public void shutdownCompleted (ShutdownSignalException cause ) {
875949 latch .countDown ();
876950 }
@@ -882,7 +956,7 @@ private void closeAndWaitForRecovery() throws IOException, InterruptedException
882956 closeAndWaitForRecovery ((AutorecoveringConnection )this .connection );
883957 }
884958
885- private void closeAndWaitForRecovery (RecoverableConnection connection ) throws IOException , InterruptedException {
959+ private static void closeAndWaitForRecovery (RecoverableConnection connection ) throws IOException , InterruptedException {
886960 CountDownLatch latch = prepareForRecovery (connection );
887961 Host .closeConnection ((NetworkConnection ) connection );
888962 wait (latch );
@@ -900,7 +974,7 @@ private void restartPrimaryAndWaitForRecovery(Connection connection) throws IOEx
900974 wait (latch );
901975 }
902976
903- private void expectChannelRecovery (Channel ch ) throws InterruptedException {
977+ private static void expectChannelRecovery (Channel ch ) {
904978 assertTrue (ch .isOpen ());
905979 }
906980
@@ -909,48 +983,53 @@ protected ConnectionFactory newConnectionFactory() {
909983 return buildConnectionFactoryWithRecoveryEnabled (false );
910984 }
911985
912- private RecoverableConnection newRecoveringConnection (boolean disableTopologyRecovery )
986+ private static RecoverableConnection newRecoveringConnection (boolean disableTopologyRecovery )
913987 throws IOException , TimeoutException {
914988 ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled (disableTopologyRecovery );
915989 return (AutorecoveringConnection ) cf .newConnection ();
916990 }
917991
918- private RecoverableConnection newRecoveringConnection (Address [] addresses )
992+ private static RecoverableConnection newRecoveringConnection (Address [] addresses )
919993 throws IOException , TimeoutException {
920994 ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled (false );
921995 // specifically use the Address[] overload
922996 return (AutorecoveringConnection ) cf .newConnection (addresses );
923997 }
924998
925- private RecoverableConnection newRecoveringConnection (boolean disableTopologyRecovery , List <Address > addresses )
999+ private static RecoverableConnection newRecoveringConnection (boolean disableTopologyRecovery , List <Address > addresses )
9261000 throws IOException , TimeoutException {
9271001 ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled (disableTopologyRecovery );
9281002 return (AutorecoveringConnection ) cf .newConnection (addresses );
9291003 }
9301004
931- private RecoverableConnection newRecoveringConnection (List <Address > addresses )
1005+ private static RecoverableConnection newRecoveringConnection (List <Address > addresses )
9321006 throws IOException , TimeoutException {
9331007 return newRecoveringConnection (false , addresses );
9341008 }
9351009
936- private RecoverableConnection newRecoveringConnection (boolean disableTopologyRecovery , String connectionName )
1010+ private static RecoverableConnection newRecoveringConnection (boolean disableTopologyRecovery , String connectionName )
9371011 throws IOException , TimeoutException {
9381012 ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled (disableTopologyRecovery );
9391013 return (RecoverableConnection ) cf .newConnection (connectionName );
9401014 }
9411015
942- private RecoverableConnection newRecoveringConnection (String connectionName )
1016+ private static RecoverableConnection newRecoveringConnection (String connectionName )
9431017 throws IOException , TimeoutException {
9441018 return newRecoveringConnection (false , connectionName );
9451019 }
1020+
1021+ private static ConnectionFactory buildConnectionFactoryWithRecoveryEnabled (boolean disableTopologyRecovery ) {
1022+ return buildConnectionFactoryWithRecoveryEnabled (disableTopologyRecovery , 1 );
1023+ }
9461024
947- private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled (boolean disableTopologyRecovery ) {
1025+ private static ConnectionFactory buildConnectionFactoryWithRecoveryEnabled (boolean disableTopologyRecovery , final int recoveryThreads ) {
9481026 ConnectionFactory cf = TestUtils .connectionFactory ();
9491027 cf .setNetworkRecoveryInterval (RECOVERY_INTERVAL );
9501028 cf .setAutomaticRecoveryEnabled (true );
9511029 if (disableTopologyRecovery ) {
9521030 cf .setTopologyRecoveryEnabled (false );
9531031 }
1032+ cf .setTopologyRecoveryThreadCount (recoveryThreads );
9541033 return cf ;
9551034 }
9561035
@@ -960,15 +1039,15 @@ private static void wait(CountDownLatch latch) throws InterruptedException {
9601039 assertTrue (latch .await (90 , TimeUnit .SECONDS ));
9611040 }
9621041
963- private void waitForConfirms (Channel ch ) throws InterruptedException , TimeoutException {
1042+ private static void waitForConfirms (Channel ch ) throws InterruptedException , TimeoutException {
9641043 ch .waitForConfirms (30 * 60 * 1000 );
9651044 }
9661045
967- private void assertRecordedQueues (Connection conn , int size ) {
1046+ private static void assertRecordedQueues (Connection conn , int size ) {
9681047 assertEquals (size , ((AutorecoveringConnection )conn ).getRecordedQueues ().size ());
9691048 }
9701049
971- private void assertRecordedExchanges (Connection conn , int size ) {
1050+ private static void assertRecordedExchanges (Connection conn , int size ) {
9721051 assertEquals (size , ((AutorecoveringConnection )conn ).getRecordedExchanges ().size ());
9731052 }
9741053}
0 commit comments