Skip to content

Commit 18b61fd

Browse files
garyrussellartembilan
authored andcommitted
Fix RejectedExecutionException during close
Resolves #994 `CCF.destroy()` abruptly shuts down the `channelsExecutor` (if present). This can cause `RejectedExecutionException` because the publisher callback channel uses the same executor to schedule nacks if necessary. - Use an `ActiveObjectCounter` to keep track of in-flight async closes - Wait until there are not active objects before calling `shutDown()` (instead of `shutDownNow()`) - Wait until all queued tasks are complete - Add `PublisherCallbackChannelFactory` to make testing possible **cherry-pick to 2.1.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java
1 parent 48d5e90 commit 18b61fd

File tree

4 files changed

+174
-29
lines changed

4 files changed

+174
-29
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.amqp.AmqpApplicationContextClosedException;
5151
import org.springframework.amqp.AmqpException;
5252
import org.springframework.amqp.AmqpTimeoutException;
53+
import org.springframework.amqp.rabbit.listener.ActiveObjectCounter;
5354
import org.springframework.amqp.support.ConditionalExceptionLogger;
5455
import org.springframework.beans.factory.InitializingBean;
5556
import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -105,6 +106,8 @@ public class CachingConnectionFactory extends AbstractConnectionFactory
105106

106107
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
107108

109+
private static final int CHANNEL_EXEC_SHUTDOWN_TIMEOUT = 30;
110+
108111
/**
109112
* Create a unique ID for the pool.
110113
*/
@@ -159,6 +162,8 @@ public enum CacheMode {
159162
/** Synchronization monitor for the shared Connection. */
160163
private final Object connectionMonitor = new Object();
161164

165+
private final ActiveObjectCounter<Channel> inFlightAsyncCloses = new ActiveObjectCounter<>();
166+
162167
private long channelCheckoutTimeout = 0;
163168

164169
private CacheMode cacheMode = CacheMode.CHANNEL;
@@ -177,6 +182,8 @@ public enum CacheMode {
177182

178183
private ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();
179184

185+
private PublisherCallbackChannelFactory publisherChannelFactory = PublisherCallbackChannelImpl.factory();
186+
180187
private volatile boolean active = true;
181188

182189
private volatile boolean initialized;
@@ -430,6 +437,16 @@ public void setCloseExceptionLogger(ConditionalExceptionLogger closeExceptionLog
430437
}
431438
}
432439

440+
/**
441+
* Set the factory to use to create {@link PublisherCallbackChannel} instances.
442+
* @param publisherChannelFactory the factory.
443+
* @since 2.1.6
444+
*/
445+
public void setPublisherChannelFactory(PublisherCallbackChannelFactory publisherChannelFactory) {
446+
Assert.notNull(publisherChannelFactory, "'publisherChannelFactory' cannot be null");
447+
this.publisherChannelFactory = publisherChannelFactory;
448+
}
449+
433450
@Override
434451
public void afterPropertiesSet() {
435452
this.initialized = true;
@@ -651,8 +668,8 @@ else if (this.cacheMode == CacheMode.CONNECTION) {
651668
return null; // NOSONAR doCreate will throw an exception
652669
}
653670

654-
private Channel doCreateBareChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
655-
Channel channel = connection.createBareChannel(transactional);
671+
private Channel doCreateBareChannel(ChannelCachingConnectionProxy conn, boolean transactional) {
672+
Channel channel = conn.createBareChannel(transactional);
656673
if (this.publisherConfirms || this.simplePublisherConfirms) {
657674
try {
658675
channel.confirmSelect();
@@ -663,7 +680,7 @@ private Channel doCreateBareChannel(ChannelCachingConnectionProxy connection, bo
663680
}
664681
if ((this.publisherConfirms || this.publisherReturns)
665682
&& !(channel instanceof PublisherCallbackChannelImpl)) {
666-
channel = new PublisherCallbackChannelImpl(channel, getChannelsExecutor());
683+
channel = this.publisherChannelFactory.createChannel(channel, getChannelsExecutor());
667684
}
668685
if (channel != null) {
669686
channel.addShutdownListener(this);
@@ -819,7 +836,18 @@ public final void destroy() {
819836
if (getContextStopped()) {
820837
this.stopped = true;
821838
if (this.channelsExecutor != null) {
822-
this.channelsExecutor.shutdownNow();
839+
try {
840+
if (!this.inFlightAsyncCloses.await(CHANNEL_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
841+
this.logger.warn("Async closes are still in-flight: " + this.inFlightAsyncCloses.getCount());
842+
}
843+
this.channelsExecutor.shutdown();
844+
if (!this.channelsExecutor.awaitTermination(CHANNEL_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
845+
this.logger.warn("Channel executor failed to shut down");
846+
}
847+
}
848+
catch (@SuppressWarnings("unused") InterruptedException e) {
849+
Thread.currentThread().interrupt();
850+
}
823851
}
824852
}
825853
}
@@ -1276,37 +1304,46 @@ private void physicalClose() throws IOException, TimeoutException {
12761304
private void asyncClose() {
12771305
ExecutorService executorService = getChannelsExecutor();
12781306
final Channel channel = CachedChannelInvocationHandler.this.target;
1279-
executorService.execute(() -> {
1280-
try {
1281-
if (CachingConnectionFactory.this.publisherConfirms) {
1282-
channel.waitForConfirmsOrDie(ASYNC_CLOSE_TIMEOUT);
1283-
}
1284-
else {
1285-
Thread.sleep(ASYNC_CLOSE_TIMEOUT);
1286-
}
1287-
}
1288-
catch (InterruptedException e1) {
1289-
Thread.currentThread().interrupt();
1290-
}
1291-
catch (Exception e2) {
1292-
}
1293-
finally {
1307+
CachingConnectionFactory.this.inFlightAsyncCloses.add(channel);
1308+
try {
1309+
executorService.execute(() -> {
12941310
try {
1295-
channel.close();
1296-
}
1297-
catch (IOException e3) {
1311+
if (CachingConnectionFactory.this.publisherConfirms) {
1312+
channel.waitForConfirmsOrDie(ASYNC_CLOSE_TIMEOUT);
1313+
}
1314+
else {
1315+
Thread.sleep(ASYNC_CLOSE_TIMEOUT);
1316+
}
12981317
}
1299-
catch (AlreadyClosedException e4) {
1318+
catch (InterruptedException e1) {
1319+
Thread.currentThread().interrupt();
13001320
}
1301-
catch (TimeoutException e5) {
1321+
catch (Exception e2) {
13021322
}
1303-
catch (ShutdownSignalException e6) {
1304-
if (!RabbitUtils.isNormalShutdown(e6)) {
1305-
logger.debug("Unexpected exception on deferred close", e6);
1323+
finally {
1324+
try {
1325+
channel.close();
1326+
}
1327+
catch (IOException e3) {
1328+
}
1329+
catch (AlreadyClosedException e4) {
1330+
}
1331+
catch (TimeoutException e5) {
1332+
}
1333+
catch (ShutdownSignalException e6) {
1334+
if (!RabbitUtils.isNormalShutdown(e6)) {
1335+
logger.debug("Unexpected exception on deferred close", e6);
1336+
}
1337+
}
1338+
finally {
1339+
CachingConnectionFactory.this.inFlightAsyncCloses.release(channel);
13061340
}
13071341
}
1308-
}
1309-
});
1342+
});
1343+
}
1344+
catch (@SuppressWarnings("unused") RuntimeException e) {
1345+
CachingConnectionFactory.this.inFlightAsyncCloses.release(channel);
1346+
}
13101347
}
13111348

13121349
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.connection;
18+
19+
import java.util.concurrent.ExecutorService;
20+
21+
import com.rabbitmq.client.Channel;
22+
23+
/**
24+
* A factory for {@link PublisherCallbackChannel}s.
25+
*
26+
* @author Gary Russell
27+
* @since 2.1.6
28+
*
29+
*/
30+
@FunctionalInterface
31+
public interface PublisherCallbackChannelFactory {
32+
33+
/**
34+
* Create a {@link PublisherCallbackChannel} instance based on the provided delegate
35+
* and executor.
36+
* @param delegate the delegate channel.
37+
* @param executor the executor.
38+
* @return the channel.
39+
*/
40+
PublisherCallbackChannel createChannel(Channel delegate, ExecutorService executor);
41+
42+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,4 +1125,8 @@ public String toString() {
11251125
return "PublisherCallbackChannelImpl: " + this.delegate.toString();
11261126
}
11271127

1128+
public static PublisherCallbackChannelFactory factory() {
1129+
return (channel, exec) -> new PublisherCallbackChannelImpl(channel, exec);
1130+
}
1131+
11281132
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.mockito.ArgumentMatchers.anyInt;
2929
import static org.mockito.ArgumentMatchers.anyString;
3030
import static org.mockito.ArgumentMatchers.isNull;
31+
import static org.mockito.BDDMockito.given;
3132
import static org.mockito.BDDMockito.willAnswer;
3233
import static org.mockito.Mockito.atLeastOnce;
3334
import static org.mockito.Mockito.doAnswer;
@@ -36,6 +37,7 @@
3637
import static org.mockito.Mockito.inOrder;
3738
import static org.mockito.Mockito.mock;
3839
import static org.mockito.Mockito.never;
40+
import static org.mockito.Mockito.spy;
3941
import static org.mockito.Mockito.times;
4042
import static org.mockito.Mockito.verify;
4143
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -54,6 +56,7 @@
5456
import java.util.concurrent.CountDownLatch;
5557
import java.util.concurrent.ExecutorService;
5658
import java.util.concurrent.Executors;
59+
import java.util.concurrent.RejectedExecutionException;
5760
import java.util.concurrent.Semaphore;
5861
import java.util.concurrent.TimeUnit;
5962
import java.util.concurrent.TimeoutException;
@@ -72,6 +75,8 @@
7275
import org.springframework.amqp.AmqpTimeoutException;
7376
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
7477
import org.springframework.amqp.utils.test.TestUtils;
78+
import org.springframework.context.ApplicationContext;
79+
import org.springframework.context.event.ContextClosedEvent;
7580
import org.springframework.test.util.ReflectionTestUtils;
7681

7782
import com.rabbitmq.client.Address;
@@ -1655,4 +1660,61 @@ public void testReturnsNormalCloseDeferredClose() throws Exception {
16551660
Thread.sleep(6000);
16561661
}
16571662

1663+
@Test
1664+
public void testOrderlyShutDown() throws Exception {
1665+
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
1666+
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
1667+
Channel mockChannel = mock(Channel.class);
1668+
1669+
given(mockConnectionFactory.newConnection((ExecutorService) isNull(), anyString())).willReturn(mockConnection);
1670+
given(mockConnection.createChannel()).willReturn(mockChannel);
1671+
given(mockChannel.isOpen()).willReturn(true);
1672+
given(mockConnection.isOpen()).willReturn(true);
1673+
1674+
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
1675+
ccf.setPublisherConfirms(true);
1676+
ApplicationContext ac = mock(ApplicationContext.class);
1677+
ccf.setApplicationContext(ac);
1678+
PublisherCallbackChannel pcc = mock(PublisherCallbackChannel.class);
1679+
given(pcc.isOpen()).willReturn(true);
1680+
AtomicReference<ExecutorService> executor = new AtomicReference<>();
1681+
AtomicBoolean rejected = new AtomicBoolean(true);
1682+
CountDownLatch closeLatch = new CountDownLatch(1);
1683+
ccf.setPublisherChannelFactory((channel, exec) -> {
1684+
executor.set(spy(exec));
1685+
return pcc;
1686+
});
1687+
willAnswer(invoc -> {
1688+
try {
1689+
executor.get().execute(() -> {
1690+
});
1691+
rejected.set(false);
1692+
}
1693+
catch (@SuppressWarnings("unused") RejectedExecutionException e) {
1694+
rejected.set(true);
1695+
}
1696+
closeLatch.countDown();
1697+
return null;
1698+
}).given(pcc).close();
1699+
Channel channel = ccf.createConnection().createChannel(false);
1700+
ExecutorService closeExec = Executors.newSingleThreadExecutor();
1701+
CountDownLatch asyncClosingLatch = new CountDownLatch(1);
1702+
closeExec.execute(() -> {
1703+
RabbitUtils.setPhysicalCloseRequired(channel, true);
1704+
try {
1705+
channel.close();
1706+
asyncClosingLatch.countDown();
1707+
}
1708+
catch (@SuppressWarnings("unused") IOException | TimeoutException e) {
1709+
// ignore
1710+
}
1711+
});
1712+
assertThat(asyncClosingLatch.await(10, TimeUnit.SECONDS)).isTrue();
1713+
ccf.onApplicationEvent(new ContextClosedEvent(ac));
1714+
ccf.destroy();
1715+
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1716+
assertThat(rejected.get()).isFalse();
1717+
closeExec.shutdownNow();
1718+
}
1719+
16581720
}

0 commit comments

Comments
 (0)