Skip to content

Commit 1283edf

Browse files
garyrussellartembilan
authored andcommitted
Close auto-recovering connection
When a connection is auto-recovered, the `RabbitAdmin` is not invoked to re-declare auto-delete queues because the connection listeners are not invoked. Close an auto-recovered connection before it is recovered. Tested with a stand-alone spring-cloud-bus application. **Cherry-pick to 2.0.x, 1.7.x** # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java
1 parent 8340b71 commit 1283edf

File tree

2 files changed

+32
-114
lines changed

2 files changed

+32
-114
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,27 @@ protected final Connection createBareConnection() {
374374
}
375375
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, connectionName);
376376
}
377-
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout);
377+
final Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout);
378+
if (rabbitConnection instanceof AutorecoveringConnection) {
379+
((AutorecoveringConnection) rabbitConnection).addRecoveryListener(new RecoveryListener() {
380+
381+
@Override
382+
public void handleRecoveryStarted(Recoverable recoverable) {
383+
handleRecovery(recoverable);
384+
}
385+
386+
@Override
387+
public void handleRecovery(Recoverable recoverable) {
388+
try {
389+
connection.close();
390+
}
391+
catch (Exception e) {
392+
AbstractConnectionFactory.this.logger.error("Failed to close auto-recover connection", e);
393+
}
394+
}
395+
396+
});
397+
}
378398
if (this.logger.isInfoEnabled()) {
379399
this.logger.info("Created new connection: " + connectionName + "/" + connection);
380400
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java

Lines changed: 11 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,12 @@
2525
import static org.junit.Assert.assertThat;
2626
import static org.junit.Assert.assertTrue;
2727
import static org.junit.Assert.fail;
28-
import static org.mockito.BDDMockito.willReturn;
2928
import static org.mockito.Matchers.anyString;
3029
import static org.mockito.Mockito.mock;
3130
import static org.mockito.Mockito.never;
3231
import static org.mockito.Mockito.spy;
3332
import static org.mockito.Mockito.verify;
3433

35-
import java.io.IOException;
3634
import java.net.ServerSocket;
3735
import java.net.Socket;
3836
import java.util.ArrayList;
@@ -58,7 +56,6 @@
5856
import org.junit.rules.ExpectedException;
5957

6058
import org.springframework.amqp.AmqpAuthenticationException;
61-
import org.springframework.amqp.AmqpException;
6259
import org.springframework.amqp.AmqpIOException;
6360
import org.springframework.amqp.AmqpResourceNotAvailableException;
6461
import org.springframework.amqp.AmqpTimeoutException;
@@ -76,12 +73,8 @@
7673

7774
import com.rabbitmq.client.Channel;
7875
import com.rabbitmq.client.DefaultConsumer;
79-
import com.rabbitmq.client.Recoverable;
80-
import com.rabbitmq.client.RecoveryListener;
8176
import com.rabbitmq.client.ShutdownListener;
8277
import com.rabbitmq.client.ShutdownSignalException;
83-
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
84-
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
8578

8679
/**
8780
* @author Dave Syer
@@ -171,13 +164,15 @@ public void testCachedConnectionsChannelLimit() throws Exception {
171164
channels.add(connections.get(0).createChannel(false));
172165
fail("Exception expected");
173166
}
174-
catch (AmqpTimeoutException e) { }
167+
catch (AmqpTimeoutException e) {
168+
}
175169
channels.add(connections.get(1).createChannel(false));
176170
try {
177171
channels.add(connections.get(1).createChannel(false));
178172
fail("Exception expected");
179173
}
180-
catch (AmqpTimeoutException e) { }
174+
catch (AmqpTimeoutException e) {
175+
}
181176
channels.get(0).close();
182177
channels.get(1).close();
183178
channels.add(connections.get(0).createChannel(false));
@@ -324,6 +319,7 @@ public void testMixTransactionalAndNonTransactional() throws Exception {
324319
exception.expect(AmqpIOException.class);
325320

326321
template2.execute(new ChannelCallback<Void>() {
322+
327323
@Override
328324
public Void doInRabbit(Channel channel) throws Exception {
329325
// Should be an exception because the channel is not transactional
@@ -346,9 +342,11 @@ public void testHardErrorAndReconnectNoAuto() throws Exception {
346342
final CountDownLatch latch = new CountDownLatch(1);
347343
try {
348344
template.execute(new ChannelCallback<Object>() {
345+
349346
@Override
350347
public Object doInRabbit(Channel channel) throws Exception {
351348
channel.getConnection().addShutdownListener(new ShutdownListener() {
349+
352350
@Override
353351
public void shutdownCompleted(ShutdownSignalException cause) {
354352
logger.info("Error", cause);
@@ -377,108 +375,6 @@ public void shutdownCompleted(ShutdownSignalException cause) {
377375
assertEquals(null, result);
378376
}
379377

380-
@Test
381-
public void testHardErrorAndReconnectAuto() throws Exception {
382-
this.connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
383-
Log cfLogger = spyOnLogger(this.connectionFactory);
384-
willReturn(true).given(cfLogger).isDebugEnabled();
385-
RabbitTemplate template = new RabbitTemplate(connectionFactory);
386-
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
387-
Queue queue = new Queue(CF_INTEGRATION_TEST_QUEUE);
388-
admin.declareQueue(queue);
389-
final String route = queue.getName();
390-
391-
final CountDownLatch latch = new CountDownLatch(1);
392-
final CountDownLatch recoveryLatch = new CountDownLatch(1);
393-
final RecoveryListener channelRecoveryListener = new RecoveryListener() {
394-
395-
@Override
396-
public void handleRecoveryStarted(Recoverable recoverable) {
397-
if (logger.isDebugEnabled()) {
398-
logger.debug("Channel recovery started: " + asString(recoverable));
399-
}
400-
}
401-
402-
@Override
403-
public void handleRecovery(Recoverable recoverable) {
404-
try {
405-
((Channel) recoverable).basicCancel("testHardErrorAndReconnect");
406-
}
407-
catch (IOException e) {
408-
}
409-
if (logger.isDebugEnabled()) {
410-
logger.debug("Channel recovery complete: " + asString(recoverable));
411-
}
412-
}
413-
414-
private String asString(Recoverable recoverable) {
415-
// TODO: https://github.com/rabbitmq/rabbitmq-java-client/issues/217
416-
return ((AutorecoveringChannel) recoverable).getDelegate().toString();
417-
}
418-
419-
};
420-
final RecoveryListener connectionRecoveryListener = new RecoveryListener() {
421-
422-
@Override
423-
public void handleRecoveryStarted(Recoverable recoverable) {
424-
if (logger.isDebugEnabled()) {
425-
logger.debug("Connection recovery started: " + recoverable);
426-
}
427-
}
428-
429-
@Override
430-
public void handleRecovery(Recoverable recoverable) {
431-
if (logger.isDebugEnabled()) {
432-
logger.debug("Connection recovery complete: " + recoverable);
433-
}
434-
recoveryLatch.countDown();
435-
}
436-
437-
};
438-
Object connection = ((ConnectionProxy) this.connectionFactory.createConnection()).getTargetConnection();
439-
connection = TestUtils.getPropertyValue(connection, "delegate");
440-
if (connection instanceof AutorecoveringConnection) {
441-
((AutorecoveringConnection) connection).addRecoveryListener(connectionRecoveryListener);
442-
}
443-
try {
444-
template.execute(channel -> {
445-
channel.getConnection().addShutdownListener(cause -> {
446-
logger.info("Error", cause);
447-
latch.countDown();
448-
// This will be thrown on the Connection thread just before it dies, so basically ignored
449-
throw new RuntimeException(cause);
450-
});
451-
Channel targetChannel = ((ChannelProxy) channel).getTargetChannel();
452-
if (targetChannel instanceof AutorecoveringChannel) {
453-
((AutorecoveringChannel) targetChannel).addRecoveryListener(channelRecoveryListener);
454-
}
455-
else {
456-
fail("Expected an AutorecoveringChannel");
457-
}
458-
String tag = channel.basicConsume(route, false, "testHardErrorAndReconnect",
459-
new DefaultConsumer(channel));
460-
// Consume twice with the same tag is a hard error (connection will be reset)
461-
String result = channel.basicConsume(route, false, tag, new DefaultConsumer(channel));
462-
fail("Expected IOException, got: " + result);
463-
return null;
464-
});
465-
fail("Expected AmqpIOException");
466-
}
467-
catch (AmqpException e) {
468-
// expected
469-
}
470-
assertTrue(recoveryLatch.await(10, TimeUnit.SECONDS));
471-
if (logger.isDebugEnabled()) {
472-
logger.debug("Resuming test after recovery complete");
473-
}
474-
template.convertAndSend(route, "message");
475-
assertTrue(latch.await(10, TimeUnit.SECONDS));
476-
String result = (String) template.receiveAndConvert(route);
477-
assertEquals("message", result);
478-
result = (String) template.receiveAndConvert(route);
479-
assertEquals(null, result);
480-
}
481-
482378
@Test
483379
public void testConnectionCloseLog() {
484380
Log logger = spy(TestUtils.getPropertyValue(this.connectionFactory, "logger", Log.class));
@@ -539,7 +435,8 @@ public void hangOnClose() throws Exception {
539435
socket.close();
540436
proxy.close();
541437
}
542-
catch (Exception ee) { }
438+
catch (Exception ee) {
439+
}
543440
}
544441
}
545442
});
@@ -555,7 +452,8 @@ public void hangOnClose() throws Exception {
555452
socket.close();
556453
proxy.close();
557454
}
558-
catch (Exception ee) { }
455+
catch (Exception ee) {
456+
}
559457
}
560458
}
561459
socket.close();

0 commit comments

Comments
 (0)