Skip to content

Commit 5da2fc3

Browse files
committed
Check Netty event loop group is open before creating frame handler
No need to try to connect if the event loop group was shut down. This also triggers an infinite cycle of connection recovery in the following case: connection disconnected, recovery starts, event loop group closed, new connection attempt, Netty channel created and becomes inactive immediately, recovery restarts, etc. To avoid the recovery loop, stop recovery when the exception (IllegalStateException) is thrown. References #1663
1 parent 6d98b74 commit 5da2fc3

File tree

10 files changed

+101
-77
lines changed

10 files changed

+101
-77
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ private NettyFrameHandler(
200200
} else {
201201
this.eventLoopGroup = null;
202202
}
203+
204+
if (b.config().group() == null) {
205+
throw new IllegalStateException("The event loop group is not set");
206+
} else if (b.config().group().isShuttingDown()) {
207+
LOGGER.warn("The Netty loop group was shut down, it is not possible to connect or recover");
208+
throw new IllegalStateException("The event loop group was shut down");
209+
}
210+
203211
if (b.config().channelFactory() == null) {
204212
b.channel(NioSocketChannel.class);
205213
}
@@ -317,6 +325,10 @@ public void sendHeader() {
317325

318326
@Override
319327
public void initialize(AMQConnection connection) {
328+
LOGGER.debug(
329+
"Setting connection {} to AMQP handler {}",
330+
connection.getClientProvidedName(),
331+
this.handler.id);
320332
this.handler.connection = connection;
321333
}
322334

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -591,16 +591,16 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
591591
}
592592
LOGGER.debug("Connection {} has recovered", newConn);
593593
this.addAutomaticRecoveryListener(newConn);
594-
this.recoverShutdownListeners(newConn);
595-
this.recoverBlockedListeners(newConn);
596-
this.recoverChannels(newConn);
597-
// don't assign new delegate connection until channel recovery is complete
598-
this.delegate = newConn;
599-
if (this.params.isTopologyRecoveryEnabled()) {
600-
notifyTopologyRecoveryListenersStarted();
601-
recoverTopology(params.getTopologyRecoveryExecutor());
602-
}
603-
this.notifyRecoveryListenersComplete();
594+
this.recoverShutdownListeners(newConn);
595+
this.recoverBlockedListeners(newConn);
596+
this.recoverChannels(newConn);
597+
// don't assign new delegate connection until channel recovery is complete
598+
this.delegate = newConn;
599+
if (this.params.isTopologyRecoveryEnabled()) {
600+
notifyTopologyRecoveryListenersStarted();
601+
recoverTopology(params.getTopologyRecoveryExecutor());
602+
}
603+
this.notifyRecoveryListenersComplete();
604604
}
605605

606606
private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) {
@@ -624,25 +624,27 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
624624
attempts++;
625625
// No Sonar: no need to close this resource because we're the one that creates it
626626
// and hands it over to the user
627-
RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
628-
synchronized(recoveryLock) {
629-
if (!manuallyClosed) {
630-
// This is the standard case.
631-
return newConn;
632-
}
633-
}
634-
// This is the once in a blue moon case.
635-
// Application code just called close as the connection
636-
// was being re-established. So we attempt to close the newly created connection.
637-
newConn.abort();
638-
return null;
627+
RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
628+
synchronized(recoveryLock) {
629+
if (!manuallyClosed) {
630+
// This is the standard case.
631+
return newConn;
632+
}
633+
}
634+
// This is the once in a blue moon case.
635+
// Application code just called close as the connection
636+
// was being re-established. So we attempt to close the newly created connection.
637+
newConn.abort();
638+
return null;
639+
} catch (IllegalStateException e) {
640+
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
641+
return null;
639642
} catch (Exception e) {
640643
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
641644
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
642645
}
643646
}
644-
645-
return null;
647+
return null;
646648
}
647649

648650
private void recoverChannels(final RecoveryAwareAMQConnection newConn) {

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,8 @@ public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutExc
7171
conn.start();
7272
metricsCollector.newConnection(conn);
7373
return conn;
74-
} catch (IOException e) {
74+
} catch (IOException | TimeoutException e) {
7575
lastException = e;
76-
} catch (TimeoutException te) {
77-
lastException = te;
7876
}
7977
}
8078

src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,19 @@ public void afterAll(ExtensionContext context) {
143143
.getRoot()
144144
.getStore(ExtensionContext.Namespace.GLOBAL)
145145
.getOrComputeIfAbsent(ExecutorServiceCloseableResourceWrapper.class);
146-
wrapper.executorService.submit(
147-
() -> {
148-
try {
149-
eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS);
150-
} catch (InterruptedException e) {
151-
Thread.currentThread().interrupt();
152-
} catch (Exception e) {
153-
LOGGER.warn("Error while asynchronously closing Netty event loop group", e);
154-
}
155-
});
146+
147+
wrapper
148+
.executorService
149+
.submit(
150+
() -> {
151+
try {
152+
eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS);
153+
} catch (InterruptedException e) {
154+
Thread.currentThread().interrupt();
155+
} catch (Exception e) {
156+
LOGGER.warn("Error while asynchronously closing Netty event loop group", e);
157+
}
158+
});
156159
}
157160
}
158161

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected void bareRestart()
125125
public void openConnection()
126126
throws IOException, TimeoutException {
127127
if (connection == null) {
128-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
128+
connection = connectionFactory.newConnection(generateConnectionName());
129129
}
130130
}
131131

@@ -327,6 +327,11 @@ protected String generateExchangeName() {
327327
this.testInfo.getTestMethod().get().getName());
328328
}
329329

330+
protected String generateConnectionName() {
331+
return name("conn", this.testInfo.getTestClass().get(),
332+
this.testInfo.getTestMethod().get().getName());
333+
}
334+
330335
private static String name(String prefix, Class<?> testClass, String testMethodName) {
331336
String uuid = UUID.randomUUID().toString();
332337
return String.format(

src/test/java/com/rabbitmq/client/test/ClientTestSuite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
ValueWriterTest.class,
7474
BlockedConnectionTest.class,
7575
NettyTest.class,
76+
IoDeadlockOnConnectionClosing.class,
7677
ProtocolVersionMismatch.class
7778
})
7879
public class ClientTestSuite {

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class ConnectionRecovery extends BrokerTestCase {
5757

5858
@Test public void namedConnectionRecovery()
5959
throws IOException, InterruptedException, TimeoutException {
60-
String connectionName = "custom-name";
60+
String connectionName = generateConnectionName();
6161
RecoverableConnection c = newRecoveringConnection(connectionName);
6262
try {
6363
assertThat(c.isOpen()).isTrue();
@@ -151,7 +151,7 @@ public String getPassword() {
151151
return password;
152152
}
153153
});
154-
RecoverableConnection c = (RecoverableConnection) cf.newConnection(UUID.randomUUID().toString());
154+
RecoverableConnection c = (RecoverableConnection) cf.newConnection(generateConnectionName());
155155
try {
156156
assertThat(c.isOpen()).isTrue();
157157
assertThat(usernameRequested.get()).isEqualTo(1);
@@ -787,13 +787,14 @@ public void handleDelivery(String consumerTag,
787787
@Test public void recoveryWithExponentialBackoffDelayHandler() throws Exception {
788788
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
789789
connectionFactory.setRecoveryDelayHandler(new RecoveryDelayHandler.ExponentialBackoffDelayHandler());
790-
Connection testConnection = connectionFactory.newConnection(UUID.randomUUID().toString());
790+
String connName = generateConnectionName();
791+
Connection testConnection = connectionFactory.newConnection(connName);
791792
try {
792793
assertThat(testConnection.isOpen()).isTrue();
793794
TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection);
794795
assertThat(testConnection.isOpen()).isTrue();
795796
} finally {
796-
connection.close();
797+
testConnection.close();
797798
}
798799
}
799800

@@ -807,7 +808,7 @@ public void handleDelivery(String consumerTag,
807808
connectionFactory.setTopologyRecoveryExecutor(executor);
808809
assertThat(connectionFactory.getTopologyRecoveryExecutor()).isEqualTo(executor);
809810
RecoverableConnection testConnection = (RecoverableConnection) connectionFactory.newConnection(
810-
UUID.randomUUID().toString()
811+
generateConnectionName()
811812
);
812813
try {
813814
final List<Channel> channels = new ArrayList<Channel>();
@@ -970,26 +971,26 @@ protected ConnectionFactory newConnectionFactory() {
970971
return buildConnectionFactoryWithRecoveryEnabled(false);
971972
}
972973

973-
private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery)
974+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery)
974975
throws IOException, TimeoutException {
975976
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
976-
return (AutorecoveringConnection) cf.newConnection(UUID.randomUUID().toString());
977+
return (AutorecoveringConnection) cf.newConnection(generateConnectionName());
977978
}
978979

979-
private static RecoverableConnection newRecoveringConnection(Address[] addresses)
980+
private RecoverableConnection newRecoveringConnection(Address[] addresses)
980981
throws IOException, TimeoutException {
981982
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(false);
982983
// specifically use the Address[] overload
983-
return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString());
984+
return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName());
984985
}
985986

986-
private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
987+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
987988
throws IOException, TimeoutException {
988989
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
989-
return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString());
990+
return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName());
990991
}
991992

992-
private static RecoverableConnection newRecoveringConnection(List<Address> addresses)
993+
private RecoverableConnection newRecoveringConnection(List<Address> addresses)
993994
throws IOException, TimeoutException {
994995
return newRecoveringConnection(false, addresses);
995996
}

src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
public class ExceptionHandling {
4141
private ConnectionFactory newConnectionFactory(ExceptionHandler eh) {
4242
ConnectionFactory cf = TestUtils.connectionFactory();
43+
cf.setNetworkRecoveryInterval(2000);
4344
cf.setExceptionHandler(eh);
4445
return cf;
4546
}
@@ -74,21 +75,22 @@ protected void testConsumerHandleConsumerException(ExceptionHandler eh, CountDow
7475
throws InterruptedException, TimeoutException, IOException {
7576
ConnectionFactory cf = newConnectionFactory(eh);
7677
assertEquals(cf.getExceptionHandler(), eh);
77-
Connection conn = cf.newConnection();
78-
assertEquals(conn.getExceptionHandler(), eh);
79-
Channel ch = conn.createChannel();
80-
String q = ch.queueDeclare().getQueue();
81-
ch.basicConsume(q, new DefaultConsumer(ch) {
82-
@Override
83-
public void handleDelivery(String consumerTag, Envelope envelope,
84-
AMQP.BasicProperties properties, byte[] body) throws IOException {
85-
throw new RuntimeException("exception expected here, don't freak out");
86-
}
87-
});
88-
ch.basicPublish("", q, null, "".getBytes());
89-
wait(latch);
78+
try (Connection conn = cf.newConnection()) {
79+
assertEquals(conn.getExceptionHandler(), eh);
80+
Channel ch = conn.createChannel();
81+
String q = ch.queueDeclare().getQueue();
82+
ch.basicConsume(q, new DefaultConsumer(ch) {
83+
@Override
84+
public void handleDelivery(String consumerTag, Envelope envelope,
85+
AMQP.BasicProperties properties, byte[] body) throws IOException {
86+
throw new RuntimeException("exception expected here, don't freak out");
87+
}
88+
});
89+
ch.basicPublish("", q, null, "".getBytes());
90+
wait(latch);
9091

91-
assertEquals(!expectChannelClose, ch.isOpen());
92+
assertEquals(!expectChannelClose, ch.isOpen());
93+
}
9294
}
9395

9496
@Test public void nullExceptionHandler() {

src/test/java/com/rabbitmq/client/test/functional/Metrics.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim
8282
Connection connection1 = null;
8383
Connection connection2 = null;
8484
try {
85-
connection1 = connectionFactory.newConnection();
85+
connection1 = connectionFactory.newConnection(generateConnectionName());
8686
assertThat(metrics.getConnections().getCount()).isEqualTo(1L);
8787

8888
connection1.createChannel();
@@ -102,7 +102,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim
102102
channel.basicGet(QUEUE, true);
103103
assertThat(metrics.getConsumedMessages().getCount()).isEqualTo(2L);
104104

105-
connection2 = connectionFactory.newConnection();
105+
connection2 = connectionFactory.newConnection(generateConnectionName());
106106
assertThat(metrics.getConnections().getCount()).isEqualTo(2L);
107107

108108
connection2.createChannel();
@@ -142,7 +142,7 @@ public void metricsPublisherUnrouted(ConnectionFactory connectionFactory) throws
142142
connectionFactory.setMetricsCollector(metrics);
143143
Connection connection = null;
144144
try {
145-
connection = connectionFactory.newConnection();
145+
connection = connectionFactory.newConnection(generateConnectionName());
146146
Channel channel = connection.createChannel();
147147
channel.confirmSelect();
148148
assertThat(metrics.getPublishUnroutedMessages().getCount()).isEqualTo(0L);
@@ -168,7 +168,7 @@ public void metricsPublisherAck(ConnectionFactory connectionFactory) throws IOEx
168168
connectionFactory.setMetricsCollector(metrics);
169169
Connection connection = null;
170170
try {
171-
connection = connectionFactory.newConnection();
171+
connection = connectionFactory.newConnection(generateConnectionName());
172172
Channel channel = connection.createChannel();
173173
channel.confirmSelect();
174174
assertThat(metrics.getPublishAcknowledgedMessages().getCount()).isEqualTo(0L);
@@ -196,7 +196,7 @@ public void metricsAck(ConnectionFactory connectionFactory) throws IOException,
196196

197197
Connection connection = null;
198198
try {
199-
connection = connectionFactory.newConnection();
199+
connection = connectionFactory.newConnection(generateConnectionName());
200200
Channel channel1 = connection.createChannel();
201201
Channel channel2 = connection.createChannel();
202202

@@ -264,7 +264,7 @@ public void metricsReject(ConnectionFactory connectionFactory) throws IOExceptio
264264

265265
Connection connection = null;
266266
try {
267-
connection = connectionFactory.newConnection();
267+
connection = connectionFactory.newConnection(generateConnectionName());
268268
Channel channel = connection.createChannel();
269269

270270
sendMessage(channel);
@@ -304,7 +304,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
304304
try {
305305
Channel [] channels = new Channel[nbChannels];
306306
for(int i = 0; i < nbConnections; i++) {
307-
connections[i] = connectionFactory.newConnection();
307+
connections[i] = connectionFactory.newConnection(generateConnectionName());
308308
for(int j = 0; j < nbChannelsPerConnection; j++) {
309309
Channel channel = connections[i].createChannel();
310310
channel.basicQos(1);
@@ -347,7 +347,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
347347
executorService.shutdownNow();
348348

349349
executorService = Executors.newFixedThreadPool(nbTasks);
350-
tasks = new ArrayList<Callable<Void>>();
350+
tasks = new ArrayList<>();
351351
for(int i = 0; i < nbTasks; i++) {
352352
Channel channelForConsuming = channels[i];
353353
tasks.add(random.nextBoolean() ?
@@ -376,7 +376,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
376376
executorService.shutdownNow();
377377

378378
executorService = Executors.newFixedThreadPool(nbTasks);
379-
tasks = new ArrayList<Callable<Void>>();
379+
tasks = new ArrayList<>();
380380
for(int i = 0; i < nbTasks; i++) {
381381
Channel channelForConsuming = channels[i];
382382
tasks.add(random.nextBoolean() ?
@@ -405,7 +405,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
405405

406406
Connection connection = null;
407407
try {
408-
connection = connectionFactory.newConnection();
408+
connection = connectionFactory.newConnection(generateConnectionName());
409409
Channel channel = connection.createChannel();
410410

411411
assertThat(metrics.getConnections().getCount()).isEqualTo(1L);
@@ -429,7 +429,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
429429

430430
Connection connection = null;
431431
try {
432-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
432+
connection = connectionFactory.newConnection(generateConnectionName());
433433

434434
Collection<?> shutdownHooks = getShutdownHooks(connection);
435435
assertThat(shutdownHooks.size()).isEqualTo(0);
@@ -459,7 +459,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
459459

460460
Connection connection = null;
461461
try {
462-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
462+
connection = connectionFactory.newConnection(generateConnectionName());
463463

464464
Channel channel1 = connection.createChannel();
465465
AtomicInteger ackedMessages = new AtomicInteger(0);

0 commit comments

Comments
 (0)