Skip to content

Commit 3a9772e

Browse files
committed
Check Netty event loop group is open before creating frame handler
No need to try to connect if the event loop group was shut down. This also triggers an infinite cycle of connection recovery in the following case: connection disconnected, recovery starts, event loop group closed, new connection attempt, Netty channel created and becomes inactive immediately, recovery restarts, etc. To avoid the recovery loop, stop recovery when the exception (IllegalStateException) is thrown. References #1663 (cherry picked from commit 5da2fc3)
1 parent 8e32775 commit 3a9772e

File tree

10 files changed

+101
-77
lines changed

10 files changed

+101
-77
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ private NettyFrameHandler(
200200
} else {
201201
this.eventLoopGroup = null;
202202
}
203+
204+
if (b.config().group() == null) {
205+
throw new IllegalStateException("The event loop group is not set");
206+
} else if (b.config().group().isShuttingDown()) {
207+
LOGGER.warn("The Netty loop group was shut down, it is not possible to connect or recover");
208+
throw new IllegalStateException("The event loop group was shut down");
209+
}
210+
203211
if (b.config().channelFactory() == null) {
204212
b.channel(NioSocketChannel.class);
205213
}
@@ -317,6 +325,10 @@ public void sendHeader() {
317325

318326
@Override
319327
public void initialize(AMQConnection connection) {
328+
LOGGER.debug(
329+
"Setting connection {} to AMQP handler {}",
330+
connection.getClientProvidedName(),
331+
this.handler.id);
320332
this.handler.connection = connection;
321333
}
322334

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -592,16 +592,16 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
592592
}
593593
LOGGER.debug("Connection {} has recovered", newConn);
594594
this.addAutomaticRecoveryListener(newConn);
595-
this.recoverShutdownListeners(newConn);
596-
this.recoverBlockedListeners(newConn);
597-
this.recoverChannels(newConn);
598-
// don't assign new delegate connection until channel recovery is complete
599-
this.delegate = newConn;
600-
if (this.params.isTopologyRecoveryEnabled()) {
601-
notifyTopologyRecoveryListenersStarted();
602-
recoverTopology(params.getTopologyRecoveryExecutor());
603-
}
604-
this.notifyRecoveryListenersComplete();
595+
this.recoverShutdownListeners(newConn);
596+
this.recoverBlockedListeners(newConn);
597+
this.recoverChannels(newConn);
598+
// don't assign new delegate connection until channel recovery is complete
599+
this.delegate = newConn;
600+
if (this.params.isTopologyRecoveryEnabled()) {
601+
notifyTopologyRecoveryListenersStarted();
602+
recoverTopology(params.getTopologyRecoveryExecutor());
603+
}
604+
this.notifyRecoveryListenersComplete();
605605
}
606606

607607
private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) {
@@ -625,25 +625,27 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
625625
attempts++;
626626
// No Sonar: no need to close this resource because we're the one that creates it
627627
// and hands it over to the user
628-
RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
629-
synchronized(recoveryLock) {
630-
if (!manuallyClosed) {
631-
// This is the standard case.
632-
return newConn;
633-
}
634-
}
635-
// This is the once in a blue moon case.
636-
// Application code just called close as the connection
637-
// was being re-established. So we attempt to close the newly created connection.
638-
newConn.abort();
639-
return null;
628+
RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
629+
synchronized(recoveryLock) {
630+
if (!manuallyClosed) {
631+
// This is the standard case.
632+
return newConn;
633+
}
634+
}
635+
// This is the once in a blue moon case.
636+
// Application code just called close as the connection
637+
// was being re-established. So we attempt to close the newly created connection.
638+
newConn.abort();
639+
return null;
640+
} catch (IllegalStateException e) {
641+
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
642+
return null;
640643
} catch (Exception e) {
641644
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
642645
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
643646
}
644647
}
645-
646-
return null;
648+
return null;
647649
}
648650

649651
private void recoverChannels(final RecoveryAwareAMQConnection newConn) {

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,8 @@ public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutExc
7171
conn.start();
7272
metricsCollector.newConnection(conn);
7373
return conn;
74-
} catch (IOException e) {
74+
} catch (IOException | TimeoutException e) {
7575
lastException = e;
76-
} catch (TimeoutException te) {
77-
lastException = te;
7876
}
7977
}
8078

src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,19 @@ public void afterAll(ExtensionContext context) {
143143
.getRoot()
144144
.getStore(ExtensionContext.Namespace.GLOBAL)
145145
.getOrComputeIfAbsent(ExecutorServiceCloseableResourceWrapper.class);
146-
wrapper.executorService.submit(
147-
() -> {
148-
try {
149-
eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS);
150-
} catch (InterruptedException e) {
151-
Thread.currentThread().interrupt();
152-
} catch (Exception e) {
153-
LOGGER.warn("Error while asynchronously closing Netty event loop group", e);
154-
}
155-
});
146+
147+
wrapper
148+
.executorService
149+
.submit(
150+
() -> {
151+
try {
152+
eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS);
153+
} catch (InterruptedException e) {
154+
Thread.currentThread().interrupt();
155+
} catch (Exception e) {
156+
LOGGER.warn("Error while asynchronously closing Netty event loop group", e);
157+
}
158+
});
156159
}
157160
}
158161

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected void bareRestart()
133133
public void openConnection()
134134
throws IOException, TimeoutException {
135135
if (connection == null) {
136-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
136+
connection = connectionFactory.newConnection(generateConnectionName());
137137
}
138138
}
139139

@@ -335,6 +335,11 @@ protected String generateExchangeName() {
335335
this.testInfo.getTestMethod().get().getName());
336336
}
337337

338+
protected String generateConnectionName() {
339+
return name("conn", this.testInfo.getTestClass().get(),
340+
this.testInfo.getTestMethod().get().getName());
341+
}
342+
338343
private static String name(String prefix, Class<?> testClass, String testMethodName) {
339344
String uuid = UUID.randomUUID().toString();
340345
return String.format(

src/test/java/com/rabbitmq/client/test/ClientTestSuite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
ValueWriterTest.class,
8787
BlockedConnectionTest.class,
8888
NettyTest.class,
89+
IoDeadlockOnConnectionClosing.class,
8990
ProtocolVersionMismatch.class
9091
})
9192
public class ClientTestSuite {

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class ConnectionRecovery extends BrokerTestCase {
5757

5858
@Test public void namedConnectionRecovery()
5959
throws IOException, InterruptedException, TimeoutException {
60-
String connectionName = "custom-name";
60+
String connectionName = generateConnectionName();
6161
RecoverableConnection c = newRecoveringConnection(connectionName);
6262
try {
6363
assertThat(c.isOpen()).isTrue();
@@ -151,7 +151,7 @@ public String getPassword() {
151151
return password;
152152
}
153153
});
154-
RecoverableConnection c = (RecoverableConnection) cf.newConnection(UUID.randomUUID().toString());
154+
RecoverableConnection c = (RecoverableConnection) cf.newConnection(generateConnectionName());
155155
try {
156156
assertThat(c.isOpen()).isTrue();
157157
assertThat(usernameRequested.get()).isEqualTo(1);
@@ -787,13 +787,14 @@ public void handleDelivery(String consumerTag,
787787
@Test public void recoveryWithExponentialBackoffDelayHandler() throws Exception {
788788
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
789789
connectionFactory.setRecoveryDelayHandler(new RecoveryDelayHandler.ExponentialBackoffDelayHandler());
790-
Connection testConnection = connectionFactory.newConnection(UUID.randomUUID().toString());
790+
String connName = generateConnectionName();
791+
Connection testConnection = connectionFactory.newConnection(connName);
791792
try {
792793
assertThat(testConnection.isOpen()).isTrue();
793794
TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection);
794795
assertThat(testConnection.isOpen()).isTrue();
795796
} finally {
796-
connection.close();
797+
testConnection.close();
797798
}
798799
}
799800

@@ -807,7 +808,7 @@ public void handleDelivery(String consumerTag,
807808
connectionFactory.setTopologyRecoveryExecutor(executor);
808809
assertThat(connectionFactory.getTopologyRecoveryExecutor()).isEqualTo(executor);
809810
RecoverableConnection testConnection = (RecoverableConnection) connectionFactory.newConnection(
810-
UUID.randomUUID().toString()
811+
generateConnectionName()
811812
);
812813
try {
813814
final List<Channel> channels = new ArrayList<Channel>();
@@ -970,26 +971,26 @@ protected ConnectionFactory newConnectionFactory() {
970971
return buildConnectionFactoryWithRecoveryEnabled(false);
971972
}
972973

973-
private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery)
974+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery)
974975
throws IOException, TimeoutException {
975976
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
976-
return (AutorecoveringConnection) cf.newConnection(UUID.randomUUID().toString());
977+
return (AutorecoveringConnection) cf.newConnection(generateConnectionName());
977978
}
978979

979-
private static RecoverableConnection newRecoveringConnection(Address[] addresses)
980+
private RecoverableConnection newRecoveringConnection(Address[] addresses)
980981
throws IOException, TimeoutException {
981982
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(false);
982983
// specifically use the Address[] overload
983-
return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString());
984+
return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName());
984985
}
985986

986-
private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
987+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
987988
throws IOException, TimeoutException {
988989
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
989-
return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString());
990+
return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName());
990991
}
991992

992-
private static RecoverableConnection newRecoveringConnection(List<Address> addresses)
993+
private RecoverableConnection newRecoveringConnection(List<Address> addresses)
993994
throws IOException, TimeoutException {
994995
return newRecoveringConnection(false, addresses);
995996
}

src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
public class ExceptionHandling {
4141
private ConnectionFactory newConnectionFactory(ExceptionHandler eh) {
4242
ConnectionFactory cf = TestUtils.connectionFactory();
43+
cf.setNetworkRecoveryInterval(2000);
4344
cf.setExceptionHandler(eh);
4445
return cf;
4546
}
@@ -74,21 +75,22 @@ protected void testConsumerHandleConsumerException(ExceptionHandler eh, CountDow
7475
throws InterruptedException, TimeoutException, IOException {
7576
ConnectionFactory cf = newConnectionFactory(eh);
7677
assertEquals(cf.getExceptionHandler(), eh);
77-
Connection conn = cf.newConnection();
78-
assertEquals(conn.getExceptionHandler(), eh);
79-
Channel ch = conn.createChannel();
80-
String q = ch.queueDeclare().getQueue();
81-
ch.basicConsume(q, new DefaultConsumer(ch) {
82-
@Override
83-
public void handleDelivery(String consumerTag, Envelope envelope,
84-
AMQP.BasicProperties properties, byte[] body) throws IOException {
85-
throw new RuntimeException("exception expected here, don't freak out");
86-
}
87-
});
88-
ch.basicPublish("", q, null, "".getBytes());
89-
wait(latch);
78+
try (Connection conn = cf.newConnection()) {
79+
assertEquals(conn.getExceptionHandler(), eh);
80+
Channel ch = conn.createChannel();
81+
String q = ch.queueDeclare().getQueue();
82+
ch.basicConsume(q, new DefaultConsumer(ch) {
83+
@Override
84+
public void handleDelivery(String consumerTag, Envelope envelope,
85+
AMQP.BasicProperties properties, byte[] body) throws IOException {
86+
throw new RuntimeException("exception expected here, don't freak out");
87+
}
88+
});
89+
ch.basicPublish("", q, null, "".getBytes());
90+
wait(latch);
9091

91-
assertEquals(!expectChannelClose, ch.isOpen());
92+
assertEquals(!expectChannelClose, ch.isOpen());
93+
}
9294
}
9395

9496
@Test public void nullExceptionHandler() {

src/test/java/com/rabbitmq/client/test/functional/Metrics.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim
8282
Connection connection1 = null;
8383
Connection connection2 = null;
8484
try {
85-
connection1 = connectionFactory.newConnection();
85+
connection1 = connectionFactory.newConnection(generateConnectionName());
8686
assertThat(metrics.getConnections().getCount()).isEqualTo(1L);
8787

8888
connection1.createChannel();
@@ -102,7 +102,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim
102102
channel.basicGet(QUEUE, true);
103103
assertThat(metrics.getConsumedMessages().getCount()).isEqualTo(2L);
104104

105-
connection2 = connectionFactory.newConnection();
105+
connection2 = connectionFactory.newConnection(generateConnectionName());
106106
assertThat(metrics.getConnections().getCount()).isEqualTo(2L);
107107

108108
connection2.createChannel();
@@ -142,7 +142,7 @@ public void metricsPublisherUnrouted(ConnectionFactory connectionFactory) throws
142142
connectionFactory.setMetricsCollector(metrics);
143143
Connection connection = null;
144144
try {
145-
connection = connectionFactory.newConnection();
145+
connection = connectionFactory.newConnection(generateConnectionName());
146146
Channel channel = connection.createChannel();
147147
channel.confirmSelect();
148148
assertThat(metrics.getPublishUnroutedMessages().getCount()).isEqualTo(0L);
@@ -168,7 +168,7 @@ public void metricsPublisherAck(ConnectionFactory connectionFactory) throws IOEx
168168
connectionFactory.setMetricsCollector(metrics);
169169
Connection connection = null;
170170
try {
171-
connection = connectionFactory.newConnection();
171+
connection = connectionFactory.newConnection(generateConnectionName());
172172
Channel channel = connection.createChannel();
173173
channel.confirmSelect();
174174
assertThat(metrics.getPublishAcknowledgedMessages().getCount()).isEqualTo(0L);
@@ -191,7 +191,7 @@ public void metricsAck(ConnectionFactory connectionFactory) throws IOException,
191191

192192
Connection connection = null;
193193
try {
194-
connection = connectionFactory.newConnection();
194+
connection = connectionFactory.newConnection(generateConnectionName());
195195
Channel channel1 = connection.createChannel();
196196
Channel channel2 = connection.createChannel();
197197

@@ -259,7 +259,7 @@ public void metricsReject(ConnectionFactory connectionFactory) throws IOExceptio
259259

260260
Connection connection = null;
261261
try {
262-
connection = connectionFactory.newConnection();
262+
connection = connectionFactory.newConnection(generateConnectionName());
263263
Channel channel = connection.createChannel();
264264

265265
sendMessage(channel);
@@ -299,7 +299,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
299299
try {
300300
Channel [] channels = new Channel[nbChannels];
301301
for(int i = 0; i < nbConnections; i++) {
302-
connections[i] = connectionFactory.newConnection();
302+
connections[i] = connectionFactory.newConnection(generateConnectionName());
303303
for(int j = 0; j < nbChannelsPerConnection; j++) {
304304
Channel channel = connections[i].createChannel();
305305
channel.basicQos(1);
@@ -342,7 +342,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
342342
executorService.shutdownNow();
343343

344344
executorService = Executors.newFixedThreadPool(nbTasks);
345-
tasks = new ArrayList<Callable<Void>>();
345+
tasks = new ArrayList<>();
346346
for(int i = 0; i < nbTasks; i++) {
347347
Channel channelForConsuming = channels[i];
348348
tasks.add(random.nextBoolean() ?
@@ -371,7 +371,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF
371371
executorService.shutdownNow();
372372

373373
executorService = Executors.newFixedThreadPool(nbTasks);
374-
tasks = new ArrayList<Callable<Void>>();
374+
tasks = new ArrayList<>();
375375
for(int i = 0; i < nbTasks; i++) {
376376
Channel channelForConsuming = channels[i];
377377
tasks.add(random.nextBoolean() ?
@@ -400,7 +400,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
400400

401401
Connection connection = null;
402402
try {
403-
connection = connectionFactory.newConnection();
403+
connection = connectionFactory.newConnection(generateConnectionName());
404404
Channel channel = connection.createChannel();
405405

406406
assertThat(metrics.getConnections().getCount()).isEqualTo(1L);
@@ -424,7 +424,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
424424

425425
Connection connection = null;
426426
try {
427-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
427+
connection = connectionFactory.newConnection(generateConnectionName());
428428

429429
Collection<?> shutdownHooks = getShutdownHooks(connection);
430430
assertThat(shutdownHooks.size()).isEqualTo(0);
@@ -454,7 +454,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti
454454

455455
Connection connection = null;
456456
try {
457-
connection = connectionFactory.newConnection(UUID.randomUUID().toString());
457+
connection = connectionFactory.newConnection(generateConnectionName());
458458

459459
Channel channel1 = connection.createChannel();
460460
AtomicInteger ackedMessages = new AtomicInteger(0);

0 commit comments

Comments
 (0)