diff --git a/pom.xml b/pom.xml index bdc40b4c9..2d4497644 100644 --- a/pom.xml +++ b/pom.xml @@ -774,8 +774,10 @@ + src/main/java/com/rabbitmq/AmqpClientTestExtension.java src/main/java/com/rabbitmq/client/ConnectionFactory.java src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java + src/main/java/com/rabbitmq/client/impl/Environment.java src/main/java/com/rabbitmq/client/observation/**/*.java src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java src/test/java/com/rabbitmq/client/test/NettyTest.java diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index ad587697f..4dc19ba58 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -1063,6 +1063,10 @@ public ConnectionFactory setCredentialsRefreshService( protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException { if (netty) { if (this.frameHandlerFactory == null) { + Predicate recoveryCondition = + this.connectionRecoveryTriggeringCondition == null + ? AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION + : this.connectionRecoveryTriggeringCondition; this.frameHandlerFactory = new NettyFrameHandlerFactory( this.nettyConf.eventLoopGroup, @@ -1072,7 +1076,9 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO this.nettyConf.enqueuingTimeout, connectionTimeout, socketConf, - maxInboundMessageBodySize); + maxInboundMessageBodySize, + this.automaticRecovery, + recoveryCondition); } return this.frameHandlerFactory; } else { diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 8f7c9b332..185072b8f 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -442,6 +442,7 @@ private ChannelState(Channel channel) { * * @deprecated Use {@link #markRejectedMessage(boolean)} instead */ + @Deprecated protected abstract void markRejectedMessage(); /** diff --git a/src/main/java/com/rabbitmq/client/impl/Environment.java b/src/main/java/com/rabbitmq/client/impl/Environment.java index 4475ae2eb..7ff0be958 100644 --- a/src/main/java/com/rabbitmq/client/impl/Environment.java +++ b/src/main/java/com/rabbitmq/client/impl/Environment.java @@ -1,4 +1,5 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -12,40 +13,45 @@ // // If you have any questions regarding licensing, please contact us at // info@rabbitmq.com. - package com.rabbitmq.client.impl; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** - * Infers information about the execution environment, e.g. - * security permissions. - * Package-protected API. + * Infers information about the execution environment, e.g. security permissions. Package-protected + * API. */ public class Environment { - /** - * This method is deprecated and subject to removal in the next major release. - * - * There is no replacement for this method, as it used to use the - * {@link SecurityManager}, which is itself deprecated and subject to removal. - * @deprecated - * @return always returns true - */ - @Deprecated - public static boolean isAllowedToModifyThreads() { - return true; - } + /** + * This method is deprecated and subject to removal in the next major release. + * + *

There is no replacement for this method, as it used to use the {@link SecurityManager}, + * which is itself deprecated and subject to removal. + * + * @deprecated + * @return always returns true + */ + @Deprecated + public static boolean isAllowedToModifyThreads() { + return true; + } + + static Thread newThread(Runnable runnable, String name) { + return newThread(Executors.defaultThreadFactory(), runnable, name); + } - public static Thread newThread(ThreadFactory factory, Runnable runnable, String name) { - Thread t = factory.newThread(runnable); - t.setName(name); - return t; - } + public static Thread newThread(ThreadFactory factory, Runnable runnable, String name) { + Thread t = factory.newThread(runnable); + t.setName(name); + return t; + } - public static Thread newThread(ThreadFactory factory, Runnable runnable, String name, boolean isDaemon) { - Thread t = newThread(factory, runnable, name); - t.setDaemon(isDaemon); - return t; - } + public static Thread newThread( + ThreadFactory factory, Runnable runnable, String name, boolean isDaemon) { + Thread t = newThread(factory, runnable, name); + t.setDaemon(isDaemon); + return t; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index 263a0c01c..abac9151a 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -23,6 +23,7 @@ import com.rabbitmq.client.Address; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MalformedFrameException; +import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.SocketConfigurator; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; @@ -51,16 +52,16 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.time.Duration; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import javax.net.ssl.SSLHandshakeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +74,7 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory private final Consumer channelCustomizer; private final Consumer bootstrapCustomizer; private final Duration enqueuingTimeout; + private final Predicate willRecover; public NettyFrameHandlerFactory( EventLoopGroup eventLoopGroup, @@ -82,7 +84,9 @@ public NettyFrameHandlerFactory( Duration enqueuingTimeout, int connectionTimeout, SocketConfigurator configurator, - int maxInboundMessageBodySize) { + int maxInboundMessageBodySize, + boolean automaticRecovery, + Predicate recoveryCondition) { super(connectionTimeout, configurator, sslContextFactory != null, maxInboundMessageBodySize); this.eventLoopGroup = eventLoopGroup; this.sslContextFactory = sslContextFactory == null ? connName -> null : sslContextFactory; @@ -90,6 +94,20 @@ public NettyFrameHandlerFactory( this.bootstrapCustomizer = bootstrapCustomizer == null ? Utils.noOpConsumer() : bootstrapCustomizer; this.enqueuingTimeout = enqueuingTimeout; + this.willRecover = + sse -> { + if (!automaticRecovery) { + return false; + } else { + try { + return recoveryCondition.test(sse); + } catch (Exception e) { + // we assume it will recover, so we take the safe path to dispatch the closing + // it avoids the risk of deadlock + return true; + } + } + }; } private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGroup) { @@ -133,6 +151,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti sslContext, this.eventLoopGroup, this.enqueuingTimeout, + this.willRecover, this.channelCustomizer, this.bootstrapCustomizer); } @@ -163,6 +182,7 @@ private NettyFrameHandler( SslContext sslContext, EventLoopGroup elg, Duration enqueuingTimeout, + Predicate willRecover, Consumer channelCustomizer, Consumer bootstrapCustomizer) throws IOException { @@ -180,6 +200,14 @@ private NettyFrameHandler( } else { this.eventLoopGroup = null; } + + if (b.config().group() == null) { + throw new IllegalStateException("The event loop group is not set"); + } else if (b.config().group().isShuttingDown()) { + LOGGER.warn("The Netty loop group was shut down, it is not possible to connect or recover"); + throw new IllegalStateException("The event loop group was shut down"); + } + if (b.config().channelFactory() == null) { b.channel(NioSocketChannel.class); } @@ -195,7 +223,8 @@ private NettyFrameHandler( int lengthFieldOffset = 3; int lengthFieldLength = 4; int lengthAdjustement = 1; - AmqpHandler amqpHandler = new AmqpHandler(maxInboundMessageBodySize, this::close); + AmqpHandler amqpHandler = + new AmqpHandler(maxInboundMessageBodySize, this::close, willRecover); int port = ConnectionFactory.portOrDefault(addr.getPort(), sslContext != null); b.handler( new ChannelInitializer() { @@ -296,6 +325,10 @@ public void sendHeader() { @Override public void initialize(AMQConnection connection) { + LOGGER.debug( + "Setting connection {} to AMQP handler {}", + connection.getClientProvidedName(), + this.handler.id); this.handler.connection = connection; } @@ -333,7 +366,6 @@ public void writeFrame(Frame frame) throws IOException { if (canWriteNow) { this.doWriteFrame(frame); } else { - this.handler.logEvents(); throw new IOException("Frame enqueuing failed"); } } catch (InterruptedException e) { @@ -404,14 +436,30 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter { private final int maxPayloadSize; private final Runnable closeSequence; + private final Predicate willRecover; private volatile AMQConnection connection; + private volatile Channel ch; private final AtomicBoolean writable = new AtomicBoolean(true); private final AtomicReference writableLatch = new AtomicReference<>(new CountDownLatch(1)); - - private AmqpHandler(int maxPayloadSize, Runnable closeSequence) { + private final AtomicBoolean shutdownDispatched = new AtomicBoolean(false); + private static final AtomicInteger SEQUENCE = new AtomicInteger(0); + private final String id; + + private AmqpHandler( + int maxPayloadSize, + Runnable closeSequence, + Predicate willRecover) { this.maxPayloadSize = maxPayloadSize; this.closeSequence = closeSequence; + this.willRecover = willRecover; + this.id = "amqp-handler-" + SEQUENCE.getAndIncrement(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + this.ch = ctx.channel(); + super.channelActive(ctx); } @Override @@ -444,49 +492,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (noProblem && (!this.connection.isRunning() || this.connection.hasBrokerInitiatedShutdown())) { // looks like the frame was Close-Ok or Close - ctx.executor().submit(() -> this.connection.doFinalShutdown()); + this.dispatchShutdownToConnection(() -> this.connection.doFinalShutdown()); } } finally { m.release(); } } - private static class Event { - private final long time; - private final String label; - - public Event(long time, String label) { - this.time = time; - this.label = label; - } - - @Override - public String toString() { - return this.label + " " + this.time; - } - } - - private static final int MAX_EVENTS = 100; - private final Queue events = new ConcurrentLinkedQueue<>(); - - private void logEvents() { - if (this.events.size() > 0) { - long start = this.events.peek().time; - LOGGER.info("channel writability history:"); - events.forEach(e -> LOGGER.info("{}: {}", (e.time - start) / 1_000_000, e.label)); - } - } - @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { boolean canWrite = ctx.channel().isWritable(); - Event event = new Event(System.nanoTime(), Boolean.toString(canWrite)); - if (this.events.size() >= MAX_EVENTS) { - this.events.poll(); - this.events.offer(event); - } - this.events.add(event); - if (this.writable.compareAndSet(!canWrite, canWrite)) { if (canWrite) { CountDownLatch latch = writableLatch.getAndSet(new CountDownLatch(1)); @@ -502,12 +517,13 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio public void channelInactive(ChannelHandlerContext ctx) { if (needToDispatchIoError()) { AMQConnection c = this.connection; + LOGGER.debug("Dispatching shutdown when channel became inactive ({})", this.id); if (c.isOpen()) { // it is likely to be an IO exception - c.handleIoError(null); + this.dispatchShutdownToConnection(() -> c.handleIoError(null)); } else { // just in case, the call is idempotent anyway - c.doFinalShutdown(); + this.dispatchShutdownToConnection(c::doFinalShutdown); } } } @@ -533,7 +549,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc this.connection.getAddress().getHostName(), this.connection.getPort()); if (needToDispatchIoError()) { - this.connection.handleHeartbeatFailure(); + this.dispatchShutdownToConnection(() -> this.connection.handleHeartbeatFailure()); } } else if (e.state() == IdleState.WRITER_IDLE) { this.connection.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0)); @@ -545,7 +561,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc private void handleIoError(Throwable cause) { if (needToDispatchIoError()) { - this.connection.handleIoError(cause); + this.dispatchShutdownToConnection(() -> this.connection.handleIoError(cause)); } else { this.closeSequence.run(); } @@ -563,6 +579,32 @@ private boolean isWritable() { private CountDownLatch writableLatch() { return this.writableLatch.get(); } + + protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable) { + if (this.shutdownDispatched.compareAndSet(false, true)) { + String name = "rabbitmq-connection-shutdown-" + this.id; + AMQConnection c = this.connection; + if (c == null || ch == null) { + // not enough information, we dispatch in separate thread + Environment.newThread(connectionShutdownRunnable, name).start(); + } else { + if (ch.eventLoop().inEventLoop()) { + if (this.willRecover.test(c.getCloseReason()) || ch.eventLoop().isShuttingDown()) { + // the connection will recover, we don't want this to happen in the event loop, + // it could cause a deadlock, so using a separate thread + // name = name + "-" + c; + Environment.newThread(connectionShutdownRunnable, name).start(); + } else { + // no recovery, it is safe to dispatch in the event loop + ch.eventLoop().submit(connectionShutdownRunnable); + } + } else { + // not in the event loop, we can run it in the same thread + connectionShutdownRunnable.run(); + } + } + } + } } private static final class ProtocolVersionMismatchHandler extends ChannelInboundHandlerAdapter { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 0e3e82d95..cfc2fbce4 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -591,16 +591,16 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException { } LOGGER.debug("Connection {} has recovered", newConn); this.addAutomaticRecoveryListener(newConn); - this.recoverShutdownListeners(newConn); - this.recoverBlockedListeners(newConn); - this.recoverChannels(newConn); - // don't assign new delegate connection until channel recovery is complete - this.delegate = newConn; - if (this.params.isTopologyRecoveryEnabled()) { - notifyTopologyRecoveryListenersStarted(); - recoverTopology(params.getTopologyRecoveryExecutor()); - } - this.notifyRecoveryListenersComplete(); + this.recoverShutdownListeners(newConn); + this.recoverBlockedListeners(newConn); + this.recoverChannels(newConn); + // don't assign new delegate connection until channel recovery is complete + this.delegate = newConn; + if (this.params.isTopologyRecoveryEnabled()) { + notifyTopologyRecoveryListenersStarted(); + recoverTopology(params.getTopologyRecoveryExecutor()); + } + this.notifyRecoveryListenersComplete(); } private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) { @@ -624,25 +624,27 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti attempts++; // No Sonar: no need to close this resource because we're the one that creates it // and hands it over to the user - RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR - synchronized(recoveryLock) { - if (!manuallyClosed) { - // This is the standard case. - return newConn; - } - } - // This is the once in a blue moon case. - // Application code just called close as the connection - // was being re-established. So we attempt to close the newly created connection. - newConn.abort(); - return null; + RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR + synchronized(recoveryLock) { + if (!manuallyClosed) { + // This is the standard case. + return newConn; + } + } + // This is the once in a blue moon case. + // Application code just called close as the connection + // was being re-established. So we attempt to close the newly created connection. + newConn.abort(); + return null; + } catch (IllegalStateException e) { + this.getExceptionHandler().handleConnectionRecoveryException(this, e); + return null; } catch (Exception e) { Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts)); this.getExceptionHandler().handleConnectionRecoveryException(this, e); } } - - return null; + return null; } private void recoverChannels(final RecoveryAwareAMQConnection newConn) { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java index b4754a217..ac4d529bc 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java @@ -71,10 +71,8 @@ public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutExc conn.start(); metricsCollector.newConnection(conn); return conn; - } catch (IOException e) { + } catch (IOException | TimeoutException e) { lastException = e; - } catch (TimeoutException te) { - lastException = te; } } diff --git a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java index 78242e9d5..3e6793218 100644 --- a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java +++ b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java @@ -31,6 +31,9 @@ import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; @@ -105,7 +108,8 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con @Override public void beforeAll(ExtensionContext context) { if (TestUtils.isNetty()) { - EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + ThreadFactory tf = new NamedThreadFactory(context.getTestClass().get().getSimpleName() + "-"); + EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(tf, NioIoHandler.newFactory()); store(context) .put("nettyEventLoopGroup", eventLoopGroup); TestUtils.eventLoopGroup(eventLoopGroup); @@ -139,17 +143,19 @@ public void afterAll(ExtensionContext context) { .getRoot() .getStore(ExtensionContext.Namespace.GLOBAL) .getOrComputeIfAbsent(ExecutorServiceCloseableResourceWrapper.class); - wrapper.executorService.submit( - () -> { - try { - eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS); - } catch (InterruptedException e) { - LOGGER.debug("Error while asynchronously closing Netty event loop group", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOGGER.warn("Error while asynchronously closing Netty event loop group", e); - } - }); + + wrapper + .executorService + .submit( + () -> { + try { + eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.warn("Error while asynchronously closing Netty event loop group", e); + } + }); } } @@ -166,4 +172,29 @@ public void close() { this.executorService.shutdownNow(); } } + + private static class NamedThreadFactory implements ThreadFactory { + + private final ThreadFactory backingThreadFactory; + + private final String prefix; + + private final AtomicLong count = new AtomicLong(0); + + private NamedThreadFactory(String prefix) { + this(Executors.defaultThreadFactory(), prefix); + } + + private NamedThreadFactory(ThreadFactory backingThreadFactory, String prefix) { + this.backingThreadFactory = backingThreadFactory; + this.prefix = prefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = this.backingThreadFactory.newThread(r); + thread.setName(prefix + count.getAndIncrement()); + return thread; + } + } } diff --git a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java index 4bb63c02c..26fcffb0d 100644 --- a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java +++ b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java @@ -125,7 +125,7 @@ protected void bareRestart() public void openConnection() throws IOException, TimeoutException { if (connection == null) { - connection = connectionFactory.newConnection(UUID.randomUUID().toString()); + connection = connectionFactory.newConnection(generateConnectionName()); } } @@ -327,6 +327,11 @@ protected String generateExchangeName() { this.testInfo.getTestMethod().get().getName()); } + protected String generateConnectionName() { + return name("conn", this.testInfo.getTestClass().get(), + this.testInfo.getTestMethod().get().getName()); + } + private static String name(String prefix, Class testClass, String testMethodName) { String uuid = UUID.randomUUID().toString(); return String.format( diff --git a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java index a48710fe9..2e968b21b 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java @@ -73,6 +73,7 @@ ValueWriterTest.class, BlockedConnectionTest.class, NettyTest.class, + IoDeadlockOnConnectionClosing.class, ProtocolVersionMismatch.class }) public class ClientTestSuite { diff --git a/src/test/java/com/rabbitmq/client/test/IoDeadlockOnConnectionClosing.java b/src/test/java/com/rabbitmq/client/test/IoDeadlockOnConnectionClosing.java new file mode 100644 index 000000000..6eb5a10e9 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/IoDeadlockOnConnectionClosing.java @@ -0,0 +1,94 @@ +// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.rabbitmq.client.test.TestUtils.IO_NETTY; +import static com.rabbitmq.client.test.TestUtils.IO_SOCKET; +import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * + */ +public class IoDeadlockOnConnectionClosing { + + static final Logger LOGGER = LoggerFactory.getLogger(IoDeadlockOnConnectionClosing.class); + + EventLoopGroup eventLoopGroup; + ConnectionFactory cf; + List connections; + + @ParameterizedTest + @ValueSource(strings = {IO_NETTY, IO_SOCKET}) + public void connectionClosing(String io) throws Exception { + init(io); + try { + for (int i = 0; i < 10; i++) { + connections.add(cf.newConnection()); + } + closeAllConnectionsAndWaitForRecovery(connections); + for (Connection connection : connections) { + assertTrue(connection.isOpen()); + } + } finally { + tearDown(io); + } + } + + private void init(String io) { + connections = new ArrayList<>(); + cf = TestUtils.connectionFactory(); + if (IO_NETTY.equals(io)) { + IoHandlerFactory ioHandlerFactory = NioIoHandler.newFactory(); + this.eventLoopGroup = new MultiThreadIoEventLoopGroup(2, ioHandlerFactory); + cf.netty().eventLoopGroup(eventLoopGroup); + } else if (IO_SOCKET.equals(io)) { + cf.useBlockingIo(); + } else { + throw new IllegalArgumentException("Unknow IO layer: " + io); + } + } + + private void tearDown(String io) { + for (Connection connection : connections) { + try { + connection.close(2000); + } catch (Exception e) { + LOGGER.warn("Error while closing test connection", e); + } + } + if (IO_NETTY.equals(io)) { + this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + + } + +} diff --git a/src/test/java/com/rabbitmq/client/test/TestUtils.java b/src/test/java/com/rabbitmq/client/test/TestUtils.java index 69ce57afa..2deacc708 100644 --- a/src/test/java/com/rabbitmq/client/test/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/test/TestUtils.java @@ -50,7 +50,7 @@ public class TestUtils { public static final String IO_LAYER = System.getProperty("io.layer", "netty"); - private static final String IO_SOCKET = "socket"; + public static final String IO_SOCKET = "socket"; public static final String IO_NETTY = "netty"; public static final List IO_LAYERS = Collections.unmodifiableList(Arrays.asList(IO_SOCKET, IO_NETTY)); diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 5145929eb..573831783 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -57,7 +57,7 @@ public class ConnectionRecovery extends BrokerTestCase { @Test public void namedConnectionRecovery() throws IOException, InterruptedException, TimeoutException { - String connectionName = "custom-name"; + String connectionName = generateConnectionName(); RecoverableConnection c = newRecoveringConnection(connectionName); try { assertThat(c.isOpen()).isTrue(); @@ -151,7 +151,7 @@ public String getPassword() { return password; } }); - RecoverableConnection c = (RecoverableConnection) cf.newConnection(UUID.randomUUID().toString()); + RecoverableConnection c = (RecoverableConnection) cf.newConnection(generateConnectionName()); try { assertThat(c.isOpen()).isTrue(); assertThat(usernameRequested.get()).isEqualTo(1); @@ -787,13 +787,14 @@ public void handleDelivery(String consumerTag, @Test public void recoveryWithExponentialBackoffDelayHandler() throws Exception { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); connectionFactory.setRecoveryDelayHandler(new RecoveryDelayHandler.ExponentialBackoffDelayHandler()); - Connection testConnection = connectionFactory.newConnection(UUID.randomUUID().toString()); + String connName = generateConnectionName(); + Connection testConnection = connectionFactory.newConnection(connName); try { assertThat(testConnection.isOpen()).isTrue(); TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection); assertThat(testConnection.isOpen()).isTrue(); } finally { - connection.close(); + testConnection.close(); } } @@ -807,7 +808,7 @@ public void handleDelivery(String consumerTag, connectionFactory.setTopologyRecoveryExecutor(executor); assertThat(connectionFactory.getTopologyRecoveryExecutor()).isEqualTo(executor); RecoverableConnection testConnection = (RecoverableConnection) connectionFactory.newConnection( - UUID.randomUUID().toString() + generateConnectionName() ); try { final List channels = new ArrayList(); @@ -970,26 +971,26 @@ protected ConnectionFactory newConnectionFactory() { return buildConnectionFactoryWithRecoveryEnabled(false); } - private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery) + private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); - return (AutorecoveringConnection) cf.newConnection(UUID.randomUUID().toString()); + return (AutorecoveringConnection) cf.newConnection(generateConnectionName()); } - private static RecoverableConnection newRecoveringConnection(Address[] addresses) + private RecoverableConnection newRecoveringConnection(Address[] addresses) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(false); // specifically use the Address[] overload - return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString()); + return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName()); } - private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List

addresses) + private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List
addresses) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); - return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString()); + return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName()); } - private static RecoverableConnection newRecoveringConnection(List
addresses) + private RecoverableConnection newRecoveringConnection(List
addresses) throws IOException, TimeoutException { return newRecoveringConnection(false, addresses); } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java b/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java index ef71a9419..fe46bf3f2 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java @@ -40,6 +40,7 @@ public class ExceptionHandling { private ConnectionFactory newConnectionFactory(ExceptionHandler eh) { ConnectionFactory cf = TestUtils.connectionFactory(); + cf.setNetworkRecoveryInterval(2000); cf.setExceptionHandler(eh); return cf; } @@ -74,21 +75,22 @@ protected void testConsumerHandleConsumerException(ExceptionHandler eh, CountDow throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = newConnectionFactory(eh); assertEquals(cf.getExceptionHandler(), eh); - Connection conn = cf.newConnection(); - assertEquals(conn.getExceptionHandler(), eh); - Channel ch = conn.createChannel(); - String q = ch.queueDeclare().getQueue(); - ch.basicConsume(q, new DefaultConsumer(ch) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, - AMQP.BasicProperties properties, byte[] body) throws IOException { - throw new RuntimeException("exception expected here, don't freak out"); - } - }); - ch.basicPublish("", q, null, "".getBytes()); - wait(latch); + try (Connection conn = cf.newConnection()) { + assertEquals(conn.getExceptionHandler(), eh); + Channel ch = conn.createChannel(); + String q = ch.queueDeclare().getQueue(); + ch.basicConsume(q, new DefaultConsumer(ch) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) throws IOException { + throw new RuntimeException("exception expected here, don't freak out"); + } + }); + ch.basicPublish("", q, null, "".getBytes()); + wait(latch); - assertEquals(!expectChannelClose, ch.isOpen()); + assertEquals(!expectChannelClose, ch.isOpen()); + } } @Test public void nullExceptionHandler() { diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 853138014..a66bcb8f6 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -82,7 +82,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim Connection connection1 = null; Connection connection2 = null; try { - connection1 = connectionFactory.newConnection(); + connection1 = connectionFactory.newConnection(generateConnectionName()); assertThat(metrics.getConnections().getCount()).isEqualTo(1L); connection1.createChannel(); @@ -102,7 +102,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim channel.basicGet(QUEUE, true); assertThat(metrics.getConsumedMessages().getCount()).isEqualTo(2L); - connection2 = connectionFactory.newConnection(); + connection2 = connectionFactory.newConnection(generateConnectionName()); assertThat(metrics.getConnections().getCount()).isEqualTo(2L); connection2.createChannel(); @@ -142,7 +142,7 @@ public void metricsPublisherUnrouted(ConnectionFactory connectionFactory) throws connectionFactory.setMetricsCollector(metrics); Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); channel.confirmSelect(); assertThat(metrics.getPublishUnroutedMessages().getCount()).isEqualTo(0L); @@ -168,7 +168,7 @@ public void metricsPublisherAck(ConnectionFactory connectionFactory) throws IOEx connectionFactory.setMetricsCollector(metrics); Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); channel.confirmSelect(); assertThat(metrics.getPublishAcknowledgedMessages().getCount()).isEqualTo(0L); @@ -196,7 +196,7 @@ public void metricsAck(ConnectionFactory connectionFactory) throws IOException, Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel1 = connection.createChannel(); Channel channel2 = connection.createChannel(); @@ -264,7 +264,7 @@ public void metricsReject(ConnectionFactory connectionFactory) throws IOExceptio Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); sendMessage(channel); @@ -304,7 +304,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF try { Channel [] channels = new Channel[nbChannels]; for(int i = 0; i < nbConnections; i++) { - connections[i] = connectionFactory.newConnection(); + connections[i] = connectionFactory.newConnection(generateConnectionName()); for(int j = 0; j < nbChannelsPerConnection; j++) { Channel channel = connections[i].createChannel(); channel.basicQos(1); @@ -347,7 +347,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF executorService.shutdownNow(); executorService = Executors.newFixedThreadPool(nbTasks); - tasks = new ArrayList>(); + tasks = new ArrayList<>(); for(int i = 0; i < nbTasks; i++) { Channel channelForConsuming = channels[i]; tasks.add(random.nextBoolean() ? @@ -376,7 +376,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF executorService.shutdownNow(); executorService = Executors.newFixedThreadPool(nbTasks); - tasks = new ArrayList>(); + tasks = new ArrayList<>(); for(int i = 0; i < nbTasks; i++) { Channel channelForConsuming = channels[i]; tasks.add(random.nextBoolean() ? @@ -405,7 +405,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); assertThat(metrics.getConnections().getCount()).isEqualTo(1L); @@ -429,7 +429,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti Connection connection = null; try { - connection = connectionFactory.newConnection(UUID.randomUUID().toString()); + connection = connectionFactory.newConnection(generateConnectionName()); Collection shutdownHooks = getShutdownHooks(connection); assertThat(shutdownHooks.size()).isEqualTo(0); @@ -459,7 +459,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti Connection connection = null; try { - connection = connectionFactory.newConnection(UUID.randomUUID().toString()); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel1 = connection.createChannel(); AtomicInteger ackedMessages = new AtomicInteger(0); diff --git a/src/test/java/com/rabbitmq/tools/Host.java b/src/test/java/com/rabbitmq/tools/Host.java index e9608ec15..76d63971f 100644 --- a/src/test/java/com/rabbitmq/tools/Host.java +++ b/src/test/java/com/rabbitmq/tools/Host.java @@ -40,7 +40,7 @@ public class Host { private static final Logger LOGGER = LoggerFactory.getLogger(Host.class); private static final String DOCKER_PREFIX = "DOCKER:"; - private static final Pattern CONNECTION_NAME_PATTERN = Pattern.compile("\"connection_name\",\"(?[a-zA-Z0-9\\-]+)?\""); + private static final Pattern CONNECTION_NAME_PATTERN = Pattern.compile("\"connection_name\",\"(?[a-zA-Z0-9\\-_]+)?\""); public static String hostname() { try {