1717
1818import com .rabbitmq .client .ConnectionFactory ;
1919import com .rabbitmq .client .DefaultConsumer ;
20+ import com .rabbitmq .client .Envelope ;
21+ import com .rabbitmq .client .Recoverable ;
22+ import com .rabbitmq .client .RecoveryListener ;
23+ import com .rabbitmq .client .AMQP .BasicProperties ;
24+ import com .rabbitmq .client .impl .recovery .AutorecoveringConnection ;
25+ import com .rabbitmq .client .impl .recovery .RecordedBinding ;
26+ import com .rabbitmq .client .impl .recovery .RecordedConsumer ;
2027import com .rabbitmq .client .test .BrokerTestCase ;
2128import com .rabbitmq .client .test .TestUtils ;
29+ import com .rabbitmq .tools .Host ;
30+ import org .junit .After ;
2231import org .junit .Test ;
23-
32+ import java . io . IOException ;
2433import java .util .HashMap ;
34+ import java .util .UUID ;
35+ import java .util .concurrent .CountDownLatch ;
36+ import java .util .concurrent .TimeUnit ;
37+ import java .util .function .Consumer ;
2538
2639import static com .rabbitmq .client .impl .recovery .TopologyRecoveryRetryLogic .RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER ;
2740import static com .rabbitmq .client .test .TestUtils .closeAllConnectionsAndWaitForRecovery ;
3245 */
3346public class TopologyRecoveryRetry extends BrokerTestCase {
3447
48+ private volatile Consumer <Integer > backoffConsumer ;
49+
50+ @ After
51+ public void cleanup () {
52+ backoffConsumer = null ;
53+ }
54+
3555 @ Test
3656 public void topologyRecoveryRetry () throws Exception {
3757 int nbQueues = 200 ;
@@ -40,18 +60,149 @@ public void topologyRecoveryRetry() throws Exception {
4060 String queue = prefix + i ;
4161 channel .queueDeclare (queue , false , false , true , new HashMap <>());
4262 channel .queueBind (queue , "amq.direct" , queue );
63+ channel .queueBind (queue , "amq.direct" , queue + "2" );
4364 channel .basicConsume (queue , true , new DefaultConsumer (channel ));
4465 }
4566
4667 closeAllConnectionsAndWaitForRecovery (this .connection );
4768
4869 assertTrue (channel .isOpen ());
4970 }
71+
72+ @ Test
73+ public void topologyRecoveryBindingFailure () throws Exception {
74+ final String queue = "topology-recovery-retry-binding-failure" + System .currentTimeMillis ();
75+ channel .queueDeclare (queue , false , false , true , new HashMap <>());
76+ channel .queueBind (queue , "amq.topic" , "topic1" );
77+ channel .queueBind (queue , "amq.topic" , "topic2" );
78+ final CountDownLatch messagesReceivedLatch = new CountDownLatch (2 );
79+ channel .basicConsume (queue , true , new DefaultConsumer (channel ) {
80+ @ Override
81+ public void handleDelivery (String consumerTag , Envelope envelope , BasicProperties properties , byte [] body ) throws IOException {
82+ System .out .println ("Got message=" + new String (body ));
83+ messagesReceivedLatch .countDown ();
84+ }
85+ });
86+ final CountDownLatch recoveryLatch = new CountDownLatch (1 );
87+ ((AutorecoveringConnection )connection ).addRecoveryListener (new RecoveryListener () {
88+ @ Override
89+ public void handleRecoveryStarted (Recoverable recoverable ) {
90+ // no-op
91+ }
92+ @ Override
93+ public void handleRecovery (Recoverable recoverable ) {
94+ recoveryLatch .countDown ();
95+ }
96+ });
97+
98+ // we want recovery to fail when recovering the 2nd binding
99+ // give the 2nd recorded binding a bad queue name so it fails
100+ final RecordedBinding binding2 = ((AutorecoveringConnection )connection ).getRecordedBindings ().get (1 );
101+ binding2 .destination (UUID .randomUUID ().toString ());
102+
103+ // use the backoffConsumer to know that it has failed
104+ // then delete the real queue & fix the recorded binding
105+ // it should fail once more because queue is gone, and then succeed
106+ final CountDownLatch backoffLatch = new CountDownLatch (1 );
107+ backoffConsumer = attempt -> {
108+ if (attempt == 1 ) {
109+ binding2 .destination (queue );
110+ try {
111+ Host .rabbitmqctl ("delete_queue " + queue );
112+ Thread .sleep (2000 );
113+ } catch (Exception e ) {
114+ e .printStackTrace ();
115+ }
116+ }
117+ backoffLatch .countDown ();
118+ };
119+
120+ // close connection
121+ Host .closeAllConnections ();
122+
123+ // assert backoff was called
124+ assertTrue (backoffLatch .await (90 , TimeUnit .SECONDS ));
125+ // wait for full recovery
126+ assertTrue (recoveryLatch .await (90 , TimeUnit .SECONDS ));
127+
128+ // publish messages to verify both bindings were recovered
129+ basicPublishVolatile ("test1" .getBytes (), "amq.topic" , "topic1" );
130+ basicPublishVolatile ("test2" .getBytes (), "amq.topic" , "topic2" );
131+
132+ assertTrue (messagesReceivedLatch .await (10 , TimeUnit .SECONDS ));
133+ }
134+
135+ @ Test
136+ public void topologyRecoveryConsumerFailure () throws Exception {
137+ final String queue = "topology-recovery-retry-consumer-failure" + System .currentTimeMillis ();
138+ channel .queueDeclare (queue , false , false , true , new HashMap <>());
139+ channel .queueBind (queue , "amq.topic" , "topic1" );
140+ channel .queueBind (queue , "amq.topic" , "topic2" );
141+ final CountDownLatch messagesReceivedLatch = new CountDownLatch (2 );
142+ channel .basicConsume (queue , true , new DefaultConsumer (channel ) {
143+ @ Override
144+ public void handleDelivery (String consumerTag , Envelope envelope , BasicProperties properties , byte [] body ) throws IOException {
145+ System .out .println ("Got message=" + new String (body ));
146+ messagesReceivedLatch .countDown ();
147+ }
148+ });
149+ final CountDownLatch recoveryLatch = new CountDownLatch (1 );
150+ ((AutorecoveringConnection )connection ).addRecoveryListener (new RecoveryListener () {
151+ @ Override
152+ public void handleRecoveryStarted (Recoverable recoverable ) {
153+ // no-op
154+ }
155+ @ Override
156+ public void handleRecovery (Recoverable recoverable ) {
157+ recoveryLatch .countDown ();
158+ }
159+ });
160+
161+ // we want recovery to fail when recovering the consumer
162+ // give the recorded consumer a bad queue name so it fails
163+ final RecordedConsumer consumer = ((AutorecoveringConnection )connection ).getRecordedConsumers ().values ().iterator ().next ();
164+ consumer .setQueue (UUID .randomUUID ().toString ());
165+
166+ // use the backoffConsumer to know that it has failed
167+ // then delete the real queue & fix the recorded consumer
168+ // it should fail once more because queue is gone, and then succeed
169+ final CountDownLatch backoffLatch = new CountDownLatch (1 );
170+ backoffConsumer = attempt -> {
171+ if (attempt == 1 ) {
172+ consumer .setQueue (queue );
173+ try {
174+ Host .rabbitmqctl ("delete_queue " + queue );
175+ Thread .sleep (2000 );
176+ } catch (Exception e ) {
177+ e .printStackTrace ();
178+ }
179+ }
180+ backoffLatch .countDown ();
181+ };
182+
183+ // close connection
184+ Host .closeAllConnections ();
185+
186+ // assert backoff was called
187+ assertTrue (backoffLatch .await (90 , TimeUnit .SECONDS ));
188+ // wait for full recovery
189+ assertTrue (recoveryLatch .await (90 , TimeUnit .SECONDS ));
190+
191+ // publish messages to verify both bindings & consumer were recovered
192+ basicPublishVolatile ("test1" .getBytes (), "amq.topic" , "topic1" );
193+ basicPublishVolatile ("test2" .getBytes (), "amq.topic" , "topic2" );
194+
195+ assertTrue (messagesReceivedLatch .await (10 , TimeUnit .SECONDS ));
196+ }
50197
51198 @ Override
52199 protected ConnectionFactory newConnectionFactory () {
53200 ConnectionFactory connectionFactory = TestUtils .connectionFactory ();
54- connectionFactory .setTopologyRecoveryRetryHandler (RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER .build ());
201+ connectionFactory .setTopologyRecoveryRetryHandler (RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER .backoffPolicy (attempt -> {
202+ if (backoffConsumer != null ) {
203+ backoffConsumer .accept (attempt );
204+ }
205+ }).build ());
55206 connectionFactory .setNetworkRecoveryInterval (1000 );
56207 return connectionFactory ;
57208 }
0 commit comments