diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index 2d9c614a2d..ad587697fd 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -18,8 +18,6 @@ import static java.util.concurrent.TimeUnit.MINUTES; import com.rabbitmq.client.impl.*; -import com.rabbitmq.client.impl.nio.NioParams; -import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier; import com.rabbitmq.client.impl.recovery.RetryHandler; @@ -154,11 +152,9 @@ public class ConnectionFactory implements Cloneable { private MetricsCollector metricsCollector; private ObservationCollector observationCollector = ObservationCollector.NO_OP; - private boolean nio = false; private boolean netty = false; private FrameHandlerFactory frameHandlerFactory; private final NettyConfiguration nettyConf = new NettyConfiguration(this); - private NioParams nioParams = new NioParams(); private SslContextFactory sslContextFactory; @@ -917,9 +913,6 @@ public ConnectionFactory useSslProtocol(SSLContext context) { * com.rabbitmq.client.ConnectionFactory.NettyConfiguration#sslContext(io.netty.handler.ssl.SslContext)} * instead. * - * @see NioParams#enableHostnameVerification() - * @see NioParams#setSslEngineConfigurator(SslEngineConfigurator) - * @see SslEngineConfigurators#ENABLE_HOSTNAME_VERIFICATION * @see SocketConfigurators#ENABLE_HOSTNAME_VERIFICATION * @see ConnectionFactory#useSslProtocol(String) * @see ConnectionFactory#useSslProtocol(SSLContext) @@ -928,18 +921,10 @@ public ConnectionFactory useSslProtocol(SSLContext context) { * @since 5.4.0 */ public ConnectionFactory enableHostnameVerification() { - enableHostnameVerificationForNio(); enableHostnameVerificationForBlockingIo(); return this; } - protected void enableHostnameVerificationForNio() { - if (this.nioParams == null) { - this.nioParams = new NioParams(); - } - this.nioParams = this.nioParams.enableHostnameVerification(); - } - protected void enableHostnameVerificationForBlockingIo() { if (this.socketConf == null) { this.socketConf = @@ -1076,21 +1061,7 @@ public ConnectionFactory setCredentialsRefreshService( } protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException { - if (nio) { - if (this.frameHandlerFactory == null) { - if (this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) { - this.nioParams.setThreadFactory(getThreadFactory()); - } - this.frameHandlerFactory = - new SocketChannelFrameHandlerFactory( - connectionTimeout, - nioParams, - isSSL(), - sslContextFactory, - this.maxInboundMessageBodySize); - } - return this.frameHandlerFactory; - } else if (netty) { + if (netty) { if (this.frameHandlerFactory == null) { this.frameHandlerFactory = new NettyFrameHandlerFactory( @@ -1659,59 +1630,11 @@ public ConnectionFactory setRecoveryDelayHandler( return this; } - /** - * Sets the parameters when using NIO. - * - * @param nioParams - * @see NioParams - * @deprecated user {@link #netty()} instead - */ - @Deprecated - public ConnectionFactory setNioParams(NioParams nioParams) { - this.nioParams = nioParams; - return this; - } - - /** - * Retrieve the parameters for NIO mode. - * - * @return - * @deprecated Use {@link #netty()} - */ - @Deprecated - public NioParams getNioParams() { - return nioParams; - } - - /** - * Use non-blocking IO (NIO) for communication with the server. With NIO, several connections - * created from the same {@link ConnectionFactory} can use the same IO thread. - * - *

A client process using a lot of not-so-active connections can benefit from NIO, as it would - * use fewer threads than with the traditional, blocking IO mode. - * - *

Use {@link NioParams} to tune NIO and a {@link SocketChannelConfigurator} to configure the - * underlying {@link java.nio.channels.SocketChannel}s for connections. - * - * @see NioParams - * @see SocketChannelConfigurator - * @see java.nio.channels.SocketChannel - * @see java.nio.channels.Selector - * @deprecated Use {@link #netty()} instead - */ - @Deprecated - public ConnectionFactory useNio() { - this.nio = true; - this.netty = false; - return this; - } - /** * Use blocking IO for communication with the server. With blocking IO, each connection creates * its own thread to read data from the server. */ public ConnectionFactory useBlockingIo() { - this.nio = false; this.netty = false; return this; } @@ -1885,7 +1808,6 @@ public ConnectionFactory setTrafficListener(TrafficListener trafficListener) { private ConnectionFactory useNetty() { this.netty = true; - this.nio = false; return this; } diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java b/src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java index 12ebcadbbc..64427ffdc6 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2017-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -16,7 +16,6 @@ package com.rabbitmq.client; import com.rabbitmq.client.impl.AMQConnection; -import com.rabbitmq.client.impl.nio.NioParams; import javax.net.ssl.*; import java.io.FileInputStream; @@ -69,12 +68,6 @@ public class ConnectionFactoryConfigurator { public static final String CONNECTION_RECOVERY_INTERVAL = "connection.recovery.interval"; public static final String CHANNEL_RPC_TIMEOUT = "channel.rpc.timeout"; public static final String CHANNEL_SHOULD_CHECK_RPC_RESPONSE_TYPE = "channel.should.check.rpc.response.type"; - public static final String USE_NIO = "use.nio"; - public static final String NIO_READ_BYTE_BUFFER_SIZE = "nio.read.byte.buffer.size"; - public static final String NIO_WRITE_BYTE_BUFFER_SIZE = "nio.write.byte.buffer.size"; - public static final String NIO_NB_IO_THREADS = "nio.nb.io.threads"; - public static final String NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS = "nio.write.enqueuing.timeout.in.ms"; - public static final String NIO_WRITE_QUEUE_CAPACITY = "nio.write.queue.capacity"; public static final String SSL_ALGORITHM = "ssl.algorithm"; public static final String SSL_ENABLED = "ssl.enabled"; public static final String SSL_KEY_STORE = "ssl.key.store"; @@ -225,35 +218,6 @@ public static void load(ConnectionFactory cf, Map properties, St cf.setChannelShouldCheckRpcResponseType(Boolean.valueOf(channelShouldCheckRpcResponseType)); } - String useNio = lookUp(USE_NIO, properties, prefix); - if (useNio != null && Boolean.valueOf(useNio)) { - cf.useNio(); - - NioParams nioParams = new NioParams(); - - String readByteBufferSize = lookUp(NIO_READ_BYTE_BUFFER_SIZE, properties, prefix); - if (readByteBufferSize != null) { - nioParams.setReadByteBufferSize(Integer.valueOf(readByteBufferSize)); - } - String writeByteBufferSize = lookUp(NIO_WRITE_BYTE_BUFFER_SIZE, properties, prefix); - if (writeByteBufferSize != null) { - nioParams.setWriteByteBufferSize(Integer.valueOf(writeByteBufferSize)); - } - String nbIoThreads = lookUp(NIO_NB_IO_THREADS, properties, prefix); - if (nbIoThreads != null) { - nioParams.setNbIoThreads(Integer.valueOf(nbIoThreads)); - } - String writeEnqueuingTime = lookUp(NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS, properties, prefix); - if (writeEnqueuingTime != null) { - nioParams.setWriteEnqueuingTimeoutInMs(Integer.valueOf(writeEnqueuingTime)); - } - String writeQueueCapacity = lookUp(NIO_WRITE_QUEUE_CAPACITY, properties, prefix); - if (writeQueueCapacity != null) { - nioParams.setWriteQueueCapacity(Integer.valueOf(writeQueueCapacity)); - } - cf.setNioParams(nioParams); - } - String useSsl = lookUp(SSL_ENABLED, properties, prefix); if (useSsl != null && Boolean.valueOf(useSsl)) { setUpSsl(cf, properties, prefix); diff --git a/src/main/java/com/rabbitmq/client/DefaultSocketChannelConfigurator.java b/src/main/java/com/rabbitmq/client/DefaultSocketChannelConfigurator.java deleted file mode 100644 index 3dbd82d9ff..0000000000 --- a/src/main/java/com/rabbitmq/client/DefaultSocketChannelConfigurator.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client; - -import java.io.IOException; -import java.nio.channels.SocketChannel; - -public class DefaultSocketChannelConfigurator implements SocketChannelConfigurator { - - /** - * Provides a hook to insert custom configuration of the {@link SocketChannel}s - * used to connect to an AMQP server before they connect. - * - * The default behaviour of this method is to disable Nagle's - * algorithm to get more consistently low latency. However it - * may be overridden freely and there is no requirement to retain - * this behaviour. - * - * @param socketChannel The socket channel that is to be used for the Connection - */ - @Override - public void configure(SocketChannel socketChannel) throws IOException { - socketChannel.socket().setTcpNoDelay(true); - } -} diff --git a/src/main/java/com/rabbitmq/client/SocketChannelConfigurator.java b/src/main/java/com/rabbitmq/client/SocketChannelConfigurator.java deleted file mode 100644 index 60182de799..0000000000 --- a/src/main/java/com/rabbitmq/client/SocketChannelConfigurator.java +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client; - -import java.io.IOException; -import java.nio.channels.SocketChannel; -import java.util.Objects; - -/** - * - * @deprecated Use {@link ConnectionFactory#netty()} with a channel customizer instead. - */ -@FunctionalInterface -@Deprecated -public interface SocketChannelConfigurator { - - /** - * Provides a hook to insert custom configuration of the {@link SocketChannel}s - * used to connect to an AMQP server before they connect. - */ - void configure(SocketChannel socketChannel) throws IOException; - - /** - * Returns a composed configurator that performs, in sequence, this - * operation followed by the {@code after} operation. - * - * @param after the operation to perform after this operation - * @return a composed configurator that performs in sequence this - * operation followed by the {@code after} operation - * @throws NullPointerException if {@code after} is null - */ - default SocketChannelConfigurator andThen(SocketChannelConfigurator after) { - Objects.requireNonNull(after); - return t -> { configure(t); after.configure(t); }; - } - -} diff --git a/src/main/java/com/rabbitmq/client/SocketChannelConfigurators.java b/src/main/java/com/rabbitmq/client/SocketChannelConfigurators.java deleted file mode 100644 index a4b35b92f6..0000000000 --- a/src/main/java/com/rabbitmq/client/SocketChannelConfigurators.java +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright (c) 2018-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client; - -/** - * Ready-to-use instances and builder for {@link SocketChannelConfigurator}. - *

- * Note {@link SocketChannelConfigurator}s can be combined with - * {@link SocketChannelConfigurator#andThen(SocketChannelConfigurator)}. - * - * @since 5.4.0 - * @deprecated Use {@link ConnectionFactory#netty()} with a channel customizer instead. - */ -@Deprecated -public abstract class SocketChannelConfigurators { - - /** - * Disable Nagle's algorithm. - */ - public static final SocketChannelConfigurator DISABLE_NAGLE_ALGORITHM = - socketChannel -> SocketConfigurators.DISABLE_NAGLE_ALGORITHM.configure(socketChannel.socket()); - - /** - * Default {@link SocketChannelConfigurator} that disables Nagle's algorithm. - */ - public static final SocketChannelConfigurator DEFAULT = DISABLE_NAGLE_ALGORITHM; - - /** - * The default {@link SocketChannelConfigurator} that disables Nagle's algorithm. - * - * @return - */ - public static SocketChannelConfigurator defaultConfigurator() { - return DEFAULT; - } - - /** - * {@link SocketChannelConfigurator} that disables Nagle's algorithm. - * - * @return - */ - public static SocketChannelConfigurator disableNagleAlgorithm() { - return DISABLE_NAGLE_ALGORITHM; - } - - /** - * Builder to configure and creates a {@link SocketChannelConfigurator} instance. - * - * @return - */ - public static SocketChannelConfigurators.Builder builder() { - return new SocketChannelConfigurators.Builder(); - } - - public static class Builder { - - private SocketChannelConfigurator configurator = channel -> { - }; - - /** - * Set default configuration. - * - * @return - */ - public Builder defaultConfigurator() { - configurator = configurator.andThen(DEFAULT); - return this; - } - - /** - * Disable Nagle's Algorithm. - * - * @return - */ - public Builder disableNagleAlgorithm() { - configurator = configurator.andThen(DISABLE_NAGLE_ALGORITHM); - return this; - } - - /** - * Add an extra configuration step. - * - * @param extraConfiguration - * @return - */ - public Builder add(SocketChannelConfigurator extraConfiguration) { - configurator = configurator.andThen(extraConfiguration); - return this; - } - - /** - * Return the configured {@link SocketConfigurator}. - * - * @return - */ - public SocketChannelConfigurator build() { - return configurator; - } - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/BlockingQueueNioQueue.java b/src/main/java/com/rabbitmq/client/impl/nio/BlockingQueueNioQueue.java deleted file mode 100644 index 9fd90d9005..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/BlockingQueueNioQueue.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.rabbitmq.client.impl.nio; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Bridge between {@link NioQueue} and JDK's {@link BlockingQueue}. - * - * @see NioQueue - * @since 5.5.0 - */ -public class BlockingQueueNioQueue implements NioQueue { - - private final BlockingQueue delegate; - private final int writeEnqueuingTimeoutInMs; - - public BlockingQueueNioQueue(BlockingQueue delegate, int writeEnqueuingTimeoutInMs) { - this.delegate = delegate; - this.writeEnqueuingTimeoutInMs = writeEnqueuingTimeoutInMs; - } - - @Override - public boolean offer(WriteRequest writeRequest) throws InterruptedException { - return this.delegate.offer(writeRequest, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS); - } - - @Override - public int size() { - return this.delegate.size(); - } - - @Override - public WriteRequest poll() { - return this.delegate.poll(); - } - - @Override - public boolean isEmpty() { - return this.delegate.isEmpty(); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/ByteBufferFactory.java b/src/main/java/com/rabbitmq/client/impl/nio/ByteBufferFactory.java deleted file mode 100644 index 984b5482c9..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/ByteBufferFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.rabbitmq.client.impl.nio; - -import java.nio.ByteBuffer; - -/** - * Contract to create {@link ByteBuffer}s. - * - * @see NioParams - * @since 5.5.0 - */ -public interface ByteBufferFactory { - - /** - * Create the {@link ByteBuffer} that contains inbound frames. - * This buffer is the network buffer for plain connections. - * When using SSL/TLS, this buffer isn't directly connected to - * the network, the encrypted read buffer is. - * - * @param nioContext - * @return - */ - ByteBuffer createReadBuffer(NioContext nioContext); - - /** - * Create the {@link ByteBuffer} that contains outbound frames. - * This buffer is the network buffer for plain connections. - * When using SSL/TLS, this buffer isn't directed connected to - * the network, the encrypted write buffer is. - * - * @param nioContext - * @return - */ - ByteBuffer createWriteBuffer(NioContext nioContext); - - /** - * Create the network read {@link ByteBuffer}. - * This buffer contains encrypted frames read from the network. - * The {@link javax.net.ssl.SSLEngine} decrypts frame and pass them - * over to the read buffer. - * - * @param nioContext - * @return - */ - ByteBuffer createEncryptedReadBuffer(NioContext nioContext); - - /** - * Create the network write {@link ByteBuffer}. - * This buffer contains encrypted outbound frames. These - * frames come from the write buffer that sends them through - * the {@link javax.net.ssl.SSLContext} for encryption to - * this buffer. - * - * @param nioContext - * @return - */ - ByteBuffer createEncryptedWriteBuffer(NioContext nioContext); -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/ByteBufferOutputStream.java b/src/main/java/com/rabbitmq/client/impl/nio/ByteBufferOutputStream.java deleted file mode 100644 index 8e69cebb22..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/ByteBufferOutputStream.java +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; - -/** - * Bridge between the byte buffer and stream worlds. - */ -public class ByteBufferOutputStream extends OutputStream { - - private final WritableByteChannel channel; - - private final ByteBuffer buffer; - - public ByteBufferOutputStream(WritableByteChannel channel, ByteBuffer buffer) { - this.buffer = buffer; - this.channel = channel; - } - - @Override - public void write(int b) throws IOException { - if(!buffer.hasRemaining()) { - drain(channel, buffer); - } - buffer.put((byte) b); - } - - @Override - public void flush() throws IOException { - drain(channel, buffer); - } - - public static void drain(WritableByteChannel channel, ByteBuffer buffer) throws IOException { - buffer.flip(); - while(buffer.hasRemaining() && channel.write(buffer) != -1); - buffer.clear(); - } - -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/DefaultByteBufferFactory.java b/src/main/java/com/rabbitmq/client/impl/nio/DefaultByteBufferFactory.java deleted file mode 100644 index f1bb528b04..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/DefaultByteBufferFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.rabbitmq.client.impl.nio; - -import java.nio.ByteBuffer; -import java.util.function.Function; - -/** - * Default {@link ByteBufferFactory} that creates heap-based {@link ByteBuffer}s. - * This behavior can be changed by passing in a custom {@link Function} - * to the constructor. - * - * @see NioParams - * @see ByteBufferFactory - * @since 5.5.0 - */ -public class DefaultByteBufferFactory implements ByteBufferFactory { - - private final Function allocator; - - public DefaultByteBufferFactory(Function allocator) { - this.allocator = allocator; - } - - public DefaultByteBufferFactory() { - this(capacity -> ByteBuffer.allocate(capacity)); - } - - @Override - public ByteBuffer createReadBuffer(NioContext nioContext) { - if (nioContext.getSslEngine() == null) { - return allocator.apply(nioContext.getNioParams().getReadByteBufferSize()); - } else { - return allocator.apply(nioContext.getSslEngine().getSession().getApplicationBufferSize()); - } - } - - @Override - public ByteBuffer createWriteBuffer(NioContext nioContext) { - if (nioContext.getSslEngine() == null) { - return allocator.apply(nioContext.getNioParams().getWriteByteBufferSize()); - } else { - return allocator.apply(nioContext.getSslEngine().getSession().getApplicationBufferSize()); - } - } - - @Override - public ByteBuffer createEncryptedReadBuffer(NioContext nioContext) { - return createEncryptedByteBuffer(nioContext); - } - - @Override - public ByteBuffer createEncryptedWriteBuffer(NioContext nioContext) { - return createEncryptedByteBuffer(nioContext); - } - - protected ByteBuffer createEncryptedByteBuffer(NioContext nioContext) { - if (nioContext.getSslEngine() == null) { - throw new IllegalArgumentException("Encrypted byte buffer should be created only in SSL/TLS context"); - } else { - return allocator.apply(nioContext.getSslEngine().getSession().getPacketBufferSize()); - } - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/FrameBuilder.java b/src/main/java/com/rabbitmq/client/impl/nio/FrameBuilder.java deleted file mode 100644 index f9631d1598..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/FrameBuilder.java +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.MalformedFrameException; -import com.rabbitmq.client.impl.Frame; - -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import static java.lang.String.format; - -/** - * Class to create AMQP frames from a {@link ReadableByteChannel}. - * Supports partial frames: a frame can be read in several attempts - * from the {@link NioLoop}. This can happen when the channel won't - * read any more bytes in the middle of a frame building. The state - * of the outstanding frame is saved up, and the builder will - * start where it left off when the {@link NioLoop} comes back to - * this connection. - * This class is not thread safe. - * - * @since 4.4.0 - */ -public class FrameBuilder { - - private static final int PAYLOAD_OFFSET = 1 /* type */ + 2 /* channel */ + 4 /* payload size */; - - protected final ReadableByteChannel channel; - - protected final ByteBuffer applicationBuffer; - private final int maxPayloadSize; - // to store the bytes of the outstanding data - // 3 byte-long because the longest we read is an unsigned int - // (not need to store the latest byte) - private final int[] frameBuffer = new int[3]; - private int frameType; - private int frameChannel; - private byte[] framePayload; - private int bytesRead = 0; - - public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer, int maxPayloadSize) { - this.channel = channel; - this.applicationBuffer = buffer; - this.maxPayloadSize = maxPayloadSize; - } - - /** - * Read a frame from the network. - * This method returns null if a frame could not have been fully built from - * the network. The client must then retry later (typically - * when the channel notifies it has something to read). - * - * @return a complete frame or null if a frame couldn't have been fully built - * @throws IOException - * @see Frame#readFrom(DataInputStream, int) - */ - public Frame readFrame() throws IOException { - while (somethingToRead()) { - if (bytesRead == 0) { - // type - frameType = readFromBuffer(); - if (frameType == 'A') { - handleProtocolVersionMismatch(); - } - } else if (bytesRead == 1) { - // channel 1/2 - frameBuffer[0] = readFromBuffer(); - } else if (bytesRead == 2) { - // channel 2/2 - frameChannel = (frameBuffer[0] << 8) + readFromBuffer(); - } else if (bytesRead == 3) { - // payload size 1/4 - frameBuffer[0] = readFromBuffer(); - } else if (bytesRead == 4) { - // payload size 2/4 - frameBuffer[1] = readFromBuffer(); - } else if (bytesRead == 5) { - // payload size 3/4 - frameBuffer[2] = readFromBuffer(); - } else if (bytesRead == 6) { - // payload size 4/4 - int framePayloadSize = (frameBuffer[0] << 24) + (frameBuffer[1] << 16) + (frameBuffer[2] << 8) + readFromBuffer(); - if (framePayloadSize >= maxPayloadSize) { - throw new IllegalStateException(format( - "Frame body is too large (%d), maximum configured size is %d. " + - "See ConnectionFactory#setMaxInboundMessageBodySize " + - "if you need to increase the limit.", - framePayloadSize, maxPayloadSize - )); - } - framePayload = new byte[framePayloadSize]; - } else if (bytesRead >= PAYLOAD_OFFSET && bytesRead < framePayload.length + PAYLOAD_OFFSET) { - framePayload[bytesRead - PAYLOAD_OFFSET] = (byte) readFromBuffer(); - } else if (bytesRead == framePayload.length + PAYLOAD_OFFSET) { - int frameEndMarker = readFromBuffer(); - if (frameEndMarker != AMQP.FRAME_END) { - throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker); - } - bytesRead = 0; - return new Frame(frameType, frameChannel, framePayload); - } else { - throw new IllegalStateException("Number of read bytes incorrect: " + bytesRead); - } - bytesRead++; - } - return null; - } - - /** - * Tells whether there's something to read in the application buffer or not. - * Tries to read from the network if necessary. - * - * @return true if there's something to read in the application buffer - * @throws IOException - */ - protected boolean somethingToRead() throws IOException { - if (!applicationBuffer.hasRemaining()) { - applicationBuffer.clear(); - int read = NioHelper.read(channel, applicationBuffer); - applicationBuffer.flip(); - if (read > 0) { - return true; - } else { - return false; - } - } else { - return true; - } - } - - private int readFromBuffer() { - return applicationBuffer.get() & 0xff; - } - - /** - * Handle a protocol version mismatch. - * @return - * @throws IOException - * @see Frame#protocolVersionMismatch(DataInputStream) - */ - private void handleProtocolVersionMismatch() throws IOException { - // Probably an AMQP.... header indicating a version mismatch - // Otherwise meaningless, so try to read the version, - // and throw an exception, whether we read the version - // okay or not. - // Try to read everything from the network, this header - // is small and should never require several network reads. - byte[] expectedBytes = new byte[] { 'M', 'Q', 'P' }; - int expectedBytesCount = 0; - while (somethingToRead() && expectedBytesCount < 3) { - // We expect the letters M, Q, P in that order: generate an informative error if they're not found - int nextByte = readFromBuffer(); - if (nextByte != expectedBytes[expectedBytesCount]) { - throw new MalformedFrameException("Invalid AMQP protocol header from server: expected character " + - expectedBytes[expectedBytesCount] + ", got " + nextByte); - } - expectedBytesCount++; - } - - if (expectedBytesCount != 3) { - throw new MalformedFrameException("Invalid AMQP protocol header from server: read only " - + (expectedBytesCount + 1) + " byte(s) instead of 4"); - } - - int[] signature = new int[4]; - - for (int i = 0; i < 4; i++) { - if (somethingToRead()) { - signature[i] = readFromBuffer(); - } else { - throw new MalformedFrameException("Invalid AMQP protocol header from server"); - } - } - - MalformedFrameException x; - - if (signature[0] == 1 && - signature[1] == 1 && - signature[2] == 8 && - signature[3] == 0) { - x = new MalformedFrameException("AMQP protocol version mismatch; we are version " + - AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION + - ", server is 0-8"); - } else { - String sig = ""; - for (int i = 0; i < 4; i++) { - if (i != 0) - sig += ","; - sig += signature[i]; - } - - x = new MalformedFrameException("AMQP protocol version mismatch; we are version " + - AMQP.PROTOCOL.MAJOR + "-" + AMQP.PROTOCOL.MINOR + "-" + AMQP.PROTOCOL.REVISION + - ", server sent signature " + sig); - } - throw x; - } - - //Indicates ssl underflow state - means that cipherBuffer should aggregate next chunks of bytes - public boolean isUnderflowHandlingEnabled() { - return false; - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/FrameWriteRequest.java b/src/main/java/com/rabbitmq/client/impl/nio/FrameWriteRequest.java deleted file mode 100644 index 961e3a4f19..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/FrameWriteRequest.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.impl.Frame; - -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * - */ -public class FrameWriteRequest implements WriteRequest { - - final Frame frame; - - public FrameWriteRequest(Frame frame) { - this.frame = frame; - } - - @Override - public void handle(DataOutputStream outputStream) throws IOException { - frame.writeTo(outputStream); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/HeaderWriteRequest.java b/src/main/java/com/rabbitmq/client/impl/nio/HeaderWriteRequest.java deleted file mode 100644 index 3559d91e86..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/HeaderWriteRequest.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.AMQP; - -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * - */ -public class HeaderWriteRequest implements WriteRequest { - - public static final WriteRequest SINGLETON = new HeaderWriteRequest(); - - private HeaderWriteRequest() { } - - @Override - public void handle(DataOutputStream outputStream) throws IOException { - outputStream.write("AMQP".getBytes("US-ASCII")); - outputStream.write(0); - outputStream.write(AMQP.PROTOCOL.MAJOR); - outputStream.write(AMQP.PROTOCOL.MINOR); - outputStream.write(AMQP.PROTOCOL.REVISION); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioContext.java b/src/main/java/com/rabbitmq/client/impl/nio/NioContext.java deleted file mode 100644 index 28147925d9..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioContext.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.rabbitmq.client.impl.nio; - -import javax.net.ssl.SSLEngine; - -/** - * Context when creating resources for a NIO-based connection. - * - * @see ByteBufferFactory - * @since 5.5.0 - * @deprecated use {@link com.rabbitmq.client.ConnectionFactory#netty()} instead - */ -@Deprecated -public class NioContext { - - private final NioParams nioParams; - - private final SSLEngine sslEngine; - - NioContext(NioParams nioParams, SSLEngine sslEngine) { - this.nioParams = nioParams; - this.sslEngine = sslEngine; - } - - /** - * NIO params. - * - * @return - */ - public NioParams getNioParams() { - return nioParams; - } - - /** - * {@link SSLEngine} for SSL/TLS connection. - * Null for plain connection. - * - * @return - */ - public SSLEngine getSslEngine() { - return sslEngine; - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioHelper.java b/src/main/java/com/rabbitmq/client/impl/nio/NioHelper.java deleted file mode 100644 index a0521b03c6..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioHelper.java +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; - -public class NioHelper { - - static int read(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { - int read = channel.read(buffer); - if(read < 0) { - throw new IOException("I/O thread: reached EOF"); - } - return read; - } - -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java b/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java deleted file mode 100644 index 7894320768..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java +++ /dev/null @@ -1,325 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.impl.Environment; -import com.rabbitmq.client.impl.Frame; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; - -/** - * Logic of the NIO loop. - */ -public class NioLoop implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(NioLoop.class); - - private final NioLoopContext context; - - private final NioParams nioParams; - - private final ExecutorService connectionShutdownExecutor; - - public NioLoop(NioParams nioParams, NioLoopContext loopContext) { - this.nioParams = nioParams; - this.context = loopContext; - this.connectionShutdownExecutor = nioParams.getConnectionShutdownExecutor(); - } - - @Override - public void run() { - final SelectorHolder selectorState = context.readSelectorState; - final Selector selector = selectorState.selector; - final Set registrations = selectorState.registrations; - - final ByteBuffer buffer = context.readBuffer; - - final SelectorHolder writeSelectorState = context.writeSelectorState; - final Selector writeSelector = writeSelectorState.selector; - final Set writeRegistrations = writeSelectorState.registrations; - - // whether there have been write registrations in the previous loop - // registrations are done after Selector.select(), to work on clean keys - // thus, any write operation is performed in the next loop - // we don't want to wait in the read Selector.select() if there are - // pending writes - boolean writeRegistered = false; - - try { - while (!Thread.currentThread().isInterrupted()) { - - for (SelectionKey selectionKey : selector.keys()) { - SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment(); - if (state.getConnection() != null && state.getHeartbeatNanoSeconds() > 0) { - long now = System.nanoTime(); - if ((now - state.getLastActivity()) > state.getHeartbeatNanoSeconds() * 2) { - try { - handleHeartbeatFailure(state); - } catch (Exception e) { - LOGGER.warn("Error after heartbeat failure of connection {}", state.getConnection()); - } finally { - selectionKey.cancel(); - } - } - } - } - - int select; - if (!writeRegistered && registrations.isEmpty() && writeRegistrations.isEmpty()) { - // we can block, registrations will call Selector.wakeup() - select = selector.select(1000); - if (selector.keys().isEmpty()) { - // we haven't been doing anything for a while, shutdown state - boolean clean = context.cleanUp(); - if (clean) { - // we stop this thread - return; - } - // there may be incoming connections, keep going - } - } else { - // we don't have to block, we need to select and clean cancelled keys before registration - select = selector.selectNow(); - } - - writeRegistered = false; - - // registrations should be done after select, - // once the cancelled keys have been actually removed - SocketChannelRegistration registration; - Iterator registrationIterator = registrations.iterator(); - while (registrationIterator.hasNext()) { - registration = registrationIterator.next(); - registrationIterator.remove(); - int operations = registration.operations; - try { - if (registration.state.getChannel().isOpen()) { - registration.state.getChannel().register(selector, operations, registration.state); - } - } catch (Exception e) { - // can happen if the channel has been closed since the operation has been enqueued - LOGGER.info("Error while registering socket channel for read: {}", e.getMessage()); - } - } - - if (select > 0) { - Set readyKeys = selector.selectedKeys(); - Iterator iterator = readyKeys.iterator(); - while (iterator.hasNext()) { - SelectionKey key = iterator.next(); - iterator.remove(); - - if (!key.isValid()) { - continue; - } - final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment(); - try { - if (key.isReadable()) { - if (!state.getChannel().isOpen()) { - key.cancel(); - continue; - } - if(state.getConnection() == null) { - // we're in AMQConnection#start, between the header sending and the FrameHandler#initialize - // let's wait a bit more - continue; - } - - state.prepareForReadSequence(); - - while (state.continueReading()) { - final Frame frame = state.frameBuilder.readFrame(); - - if (frame != null) { - try { - state.getConnection().ioLoopThread(Thread.currentThread()); - boolean noProblem = state.getConnection().handleReadFrame(frame); - if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) { - // looks like the frame was Close-Ok or Close - dispatchShutdownToConnection(state); - key.cancel(); - break; - } - } catch (Throwable ex) { - // problem during frame processing, tell connection, and - // we can stop for this channel - handleIoError(state, ex); - key.cancel(); - break; - } - } - } - - state.setLastActivity(System.nanoTime()); - } - } catch (final Exception e) { - LOGGER.warn("Error during reading frames", e); - handleIoError(state, e); - key.cancel(); - } finally { - buffer.clear(); - } - } - } - - // write loop - - select = writeSelector.selectNow(); - - // registrations should be done after select, - // once the cancelled keys have been actually removed - SocketChannelRegistration writeRegistration; - Iterator writeRegistrationIterator = writeRegistrations.iterator(); - while (writeRegistrationIterator.hasNext()) { - writeRegistration = writeRegistrationIterator.next(); - writeRegistrationIterator.remove(); - int operations = writeRegistration.operations; - try { - if (writeRegistration.state.getChannel().isOpen()) { - writeRegistration.state.getChannel().register(writeSelector, operations, writeRegistration.state); - writeRegistered = true; - } - } catch (Exception e) { - // can happen if the channel has been closed since the operation has been enqueued - LOGGER.info("Error while registering socket channel for write: {}", e.getMessage()); - } - } - - if (select > 0) { - Set readyKeys = writeSelector.selectedKeys(); - Iterator iterator = readyKeys.iterator(); - while (iterator.hasNext()) { - SelectionKey key = iterator.next(); - iterator.remove(); - SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment(); - - if (!key.isValid()) { - continue; - } - - try { - if (key.isWritable()) { - if (!state.getChannel().isOpen()) { - key.cancel(); - continue; - } - - state.prepareForWriteSequence(); - - int toBeWritten = state.getWriteQueue().size(); - int written = 0; - - DataOutputStream outputStream = state.outputStream; - - WriteRequest request; - while (written <= toBeWritten && (request = state.getWriteQueue().poll()) != null) { - request.handle(outputStream); - written++; - } - outputStream.flush(); - } - } catch (Exception e) { - handleIoError(state, e); - } finally { - state.endWriteSequence(); - key.cancel(); - } - } - } - } - } catch (Exception e) { - LOGGER.error("Error in NIO loop", e); - } - } - - protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex) { - if (needToDispatchIoError(state)) { - dispatchIoErrorToConnection(state, ex); - } else { - try { - state.close(); - } catch (IOException ignored) { - - } - } - } - - protected void handleHeartbeatFailure(SocketChannelFrameHandlerState state) { - if (needToDispatchIoError(state)) { - dispatchShutdownToConnection( - () -> state.getConnection().handleHeartbeatFailure(), - state.getConnection().toString() - ); - } else { - try { - state.close(); - } catch (IOException ignored) { - - } - } - } - - protected boolean needToDispatchIoError(final SocketChannelFrameHandlerState state) { - return state.getConnection().isOpen(); - } - - protected void dispatchIoErrorToConnection(final SocketChannelFrameHandlerState state, final Throwable ex) { - dispatchShutdownToConnection( - () -> state.getConnection().handleIoError(ex), - state.getConnection().toString() - ); - } - - protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) { - dispatchShutdownToConnection( - () -> state.getConnection().doFinalShutdown(), - state.getConnection().toString() - ); - } - - protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable, String connectionName) { - // In case of recovery after the shutdown, - // the new connection shouldn't be initialized in - // the NIO thread, to avoid a deadlock. - if (this.connectionShutdownExecutor != null) { - connectionShutdownExecutor.execute(connectionShutdownRunnable); - } else if (executorService() != null) { - executorService().execute(connectionShutdownRunnable); - } else { - String name = "rabbitmq-connection-shutdown-" + connectionName; - Thread shutdownThread = Environment.newThread(threadFactory(), connectionShutdownRunnable, name); - shutdownThread.start(); - } - } - - private ExecutorService executorService() { - return nioParams.getNioExecutor(); - } - - private ThreadFactory threadFactory() { - return nioParams.getThreadFactory(); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioLoopContext.java b/src/main/java/com/rabbitmq/client/impl/nio/NioLoopContext.java deleted file mode 100644 index a31f142de0..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioLoopContext.java +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.impl.Environment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Selector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; - -/** - * - */ -public class NioLoopContext { - - private static final Logger LOGGER = LoggerFactory.getLogger(NioLoopContext.class); - - private final SocketChannelFrameHandlerFactory socketChannelFrameHandlerFactory; - - private final ExecutorService executorService; - - private final ThreadFactory threadFactory; - - final ByteBuffer readBuffer, writeBuffer; - - SelectorHolder readSelectorState; - SelectorHolder writeSelectorState; - - public NioLoopContext(SocketChannelFrameHandlerFactory socketChannelFrameHandlerFactory, - NioParams nioParams) { - this.socketChannelFrameHandlerFactory = socketChannelFrameHandlerFactory; - this.executorService = nioParams.getNioExecutor(); - this.threadFactory = nioParams.getThreadFactory(); - NioContext nioContext = new NioContext(nioParams, null); - this.readBuffer = nioParams.getByteBufferFactory().createReadBuffer(nioContext); - this.writeBuffer = nioParams.getByteBufferFactory().createWriteBuffer(nioContext); - } - - void initStateIfNecessary() throws IOException { - // This code is supposed to be called only from the SocketChannelFrameHandlerFactory - // and while holding the lock. - // We lock just in case some other code calls this method in the future. - socketChannelFrameHandlerFactory.lock(); - try { - if (this.readSelectorState == null) { - this.readSelectorState = new SelectorHolder(Selector.open()); - this.writeSelectorState = new SelectorHolder(Selector.open()); - - startIoLoops(); - } - } finally { - socketChannelFrameHandlerFactory.unlock(); - } - } - - private void startIoLoops() { - if (executorService == null) { - Thread nioThread = Environment.newThread( - threadFactory, - new NioLoop(socketChannelFrameHandlerFactory.nioParams, this), - "rabbitmq-nio" - ); - nioThread.start(); - } else { - this.executorService.submit(new NioLoop(socketChannelFrameHandlerFactory.nioParams, this)); - } - } - - protected boolean cleanUp() { - int readRegistrationsCount = readSelectorState.registrations.size(); - if(readRegistrationsCount != 0) { - return false; - } - socketChannelFrameHandlerFactory.lock(); - try { - if (readRegistrationsCount != readSelectorState.registrations.size()) { - // a connection request has come in meanwhile, don't do anything - return false; - } - - try { - readSelectorState.selector.close(); - } catch (IOException e) { - LOGGER.warn("Could not close read selector: {}", e.getMessage()); - } - try { - writeSelectorState.selector.close(); - } catch (IOException e) { - LOGGER.warn("Could not close write selector: {}", e.getMessage()); - } - - this.readSelectorState = null; - this.writeSelectorState = null; - } finally { - socketChannelFrameHandlerFactory.unlock(); - } - return true; - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioParams.java b/src/main/java/com/rabbitmq/client/impl/nio/NioParams.java deleted file mode 100644 index 8ecaf5e97e..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioParams.java +++ /dev/null @@ -1,432 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.SocketChannelConfigurator; -import com.rabbitmq.client.SocketChannelConfigurators; -import com.rabbitmq.client.SslEngineConfigurator; - -import javax.net.ssl.SSLEngine; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.function.Function; - -import static com.rabbitmq.client.SslEngineConfigurators.ENABLE_HOSTNAME_VERIFICATION; - -/** - * Parameters used to configure the NIO mode of a {@link com.rabbitmq.client.ConnectionFactory}. - * - * - * - * @since 4.0.0 - * @deprecated use {@link com.rabbitmq.client.ConnectionFactory#netty()} instead - */ -@Deprecated -public class NioParams { - - static Function DEFAULT_WRITE_QUEUE_FACTORY = - ctx -> new BlockingQueueNioQueue( - new ArrayBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity(), true), - ctx.getNioParams().getWriteEnqueuingTimeoutInMs() - ); - - /** - * size of the byte buffer used for inbound data - */ - private int readByteBufferSize = 32768; - - /** - * size of the byte buffer used for outbound data - */ - private int writeByteBufferSize = 32768; - - /** - * the max number of IO threads - */ - private int nbIoThreads = 1; - - /** - * the timeout to enqueue outbound frames - */ - private int writeEnqueuingTimeoutInMs = 10 * 1000; - - /** - * the capacity of the queue used for outbound frames - */ - private int writeQueueCapacity = 10000; - - /** - * the executor service used for IO threads and connections shutdown - */ - private ExecutorService nioExecutor; - - /** - * the thread factory used for IO threads and connections shutdown - */ - private ThreadFactory threadFactory; - - /** - * the hook to configure the socket channel before it's open - */ - private SocketChannelConfigurator socketChannelConfigurator = SocketChannelConfigurators.defaultConfigurator(); - - /** - * the hook to configure the SSL engine before the connection is open - */ - private SslEngineConfigurator sslEngineConfigurator = sslEngine -> { - }; - - /** - * the executor service used for connection shutdown - * - * @since 5.4.0 - */ - private ExecutorService connectionShutdownExecutor; - - /** - * The factory to create {@link java.nio.ByteBuffer}s. - * The default is to create heap-based {@link java.nio.ByteBuffer}s. - * - * @since 5.5.0 - */ - private ByteBufferFactory byteBufferFactory = new DefaultByteBufferFactory(); - - /** - * Factory to create a {@link NioQueue}. - * - * @since 5.5.0 - */ - private Function writeQueueFactory = - DEFAULT_WRITE_QUEUE_FACTORY; - - public NioParams() { - } - - public NioParams(NioParams nioParams) { - setReadByteBufferSize(nioParams.getReadByteBufferSize()); - setWriteByteBufferSize(nioParams.getWriteByteBufferSize()); - setNbIoThreads(nioParams.getNbIoThreads()); - setWriteEnqueuingTimeoutInMs(nioParams.getWriteEnqueuingTimeoutInMs()); - setWriteQueueCapacity(nioParams.getWriteQueueCapacity()); - setNioExecutor(nioParams.getNioExecutor()); - setThreadFactory(nioParams.getThreadFactory()); - setSocketChannelConfigurator(nioParams.getSocketChannelConfigurator()); - setSslEngineConfigurator(nioParams.getSslEngineConfigurator()); - setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor()); - setByteBufferFactory(nioParams.getByteBufferFactory()); - setWriteQueueFactory(nioParams.getWriteQueueFactory()); - } - - /** - * Enable server hostname verification for TLS connections. - * - * @return this {@link NioParams} instance - * @see NioParams#setSslEngineConfigurator(SslEngineConfigurator) - * @see com.rabbitmq.client.SslEngineConfigurators#ENABLE_HOSTNAME_VERIFICATION - */ - public NioParams enableHostnameVerification() { - if (this.sslEngineConfigurator == null) { - this.sslEngineConfigurator = ENABLE_HOSTNAME_VERIFICATION; - } else { - this.sslEngineConfigurator = this.sslEngineConfigurator.andThen(ENABLE_HOSTNAME_VERIFICATION); - } - return this; - } - - public int getReadByteBufferSize() { - return readByteBufferSize; - } - - /** - * Sets the size in byte of the read {@link java.nio.ByteBuffer} used in the NIO loop. - * Default is 32768. - *

- * This parameter isn't used when using SSL/TLS, where {@link java.nio.ByteBuffer} - * size is set up according to the {@link javax.net.ssl.SSLSession} packet size. - * - * @param readByteBufferSize size of the {@link java.nio.ByteBuffer} for inbound data - * @return this {@link NioParams} instance - */ - public NioParams setReadByteBufferSize(int readByteBufferSize) { - if (readByteBufferSize <= 0) { - throw new IllegalArgumentException("Buffer size must be greater than 0"); - } - this.readByteBufferSize = readByteBufferSize; - return this; - } - - public int getWriteByteBufferSize() { - return writeByteBufferSize; - } - - /** - * Sets the size in byte of the write {@link java.nio.ByteBuffer} used in the NIO loop. - * Default is 32768. - *

- * This parameter isn't used when using SSL/TLS, where {@link java.nio.ByteBuffer} - * size is set up according to the {@link javax.net.ssl.SSLSession} packet size. - * - * @param writeByteBufferSize size of the {@link java.nio.ByteBuffer} used for outbound data - * @return this {@link NioParams} instance - */ - public NioParams setWriteByteBufferSize(int writeByteBufferSize) { - if (writeByteBufferSize <= 0) { - throw new IllegalArgumentException("Buffer size must be greater than 0"); - } - this.writeByteBufferSize = writeByteBufferSize; - return this; - } - - public int getNbIoThreads() { - return nbIoThreads; - } - - /** - * Sets the max number of threads/tasks used for NIO. Default is 1. - * Set this number according to the number of simultaneous connections - * and their activity. - * Threads/tasks are created as necessary (e.g. with 10 threads, when - * 10 connections have been created). - * Once a connection is created, it's assigned to a thread/task and - * all its IO activity is handled by this thread/task. - *

- * When idle for a few seconds (i.e. without any connection to perform IO for), - * a thread/task stops and is recreated if necessary. - * - * @param nbIoThreads - * @return this {@link NioParams} instance - */ - public NioParams setNbIoThreads(int nbIoThreads) { - if (nbIoThreads <= 0) { - throw new IllegalArgumentException("Number of threads must be greater than 0"); - } - this.nbIoThreads = nbIoThreads; - return this; - } - - public int getWriteEnqueuingTimeoutInMs() { - return writeEnqueuingTimeoutInMs; - } - - /** - * Sets the timeout for queuing outbound frames. Default is 10,000 ms. - * Every requests to the server is divided into frames - * that are then queued in a {@link java.util.concurrent.BlockingQueue} before - * being sent on the network by a IO thread. - *

- * If the IO thread cannot cope with the frames dispatch, the - * {@link java.util.concurrent.BlockingQueue} gets filled up and blocks - * (blocking the calling thread by the same occasion). This timeout is the - * time the {@link java.util.concurrent.BlockingQueue} will wait before - * rejecting the outbound frame. The calling thread will then received - * an exception. - *

- * The appropriate value depends on the application scenarios: - * rate of outbound data (published messages, acknowledgment, etc), network speed... - * - * @param writeEnqueuingTimeoutInMs - * @return this {@link NioParams} instance - * @see NioParams#setWriteQueueCapacity(int) - */ - public NioParams setWriteEnqueuingTimeoutInMs(int writeEnqueuingTimeoutInMs) { - this.writeEnqueuingTimeoutInMs = writeEnqueuingTimeoutInMs; - return this; - } - - public ExecutorService getNioExecutor() { - return nioExecutor; - } - - /** - * Sets the {@link ExecutorService} to use for NIO threads/tasks. - * Default is to use the thread factory. - *

- * The {@link ExecutorService} should be able to run the - * number of requested IO threads, plus a few more, as it's also - * used to dispatch the shutdown of connections. - *

- * Connection shutdown can also be handled by a dedicated {@link ExecutorService}, - * see {@link #setConnectionShutdownExecutor(ExecutorService)}. - *

- * It's developer's responsibility to shut down the executor - * when it is no longer needed. - *

- * The thread factory isn't used if an executor service is set up. - * - * @param nioExecutor {@link ExecutorService} used for IO threads and connection shutdown - * @return this {@link NioParams} instance - * @see NioParams#setNbIoThreads(int) - * @see NioParams#setThreadFactory(ThreadFactory) - * @see NioParams#setConnectionShutdownExecutor(ExecutorService) - */ - public NioParams setNioExecutor(ExecutorService nioExecutor) { - this.nioExecutor = nioExecutor; - return this; - } - - public ThreadFactory getThreadFactory() { - return threadFactory; - } - - /** - * Sets the {@link ThreadFactory} to use for NIO threads/tasks. - * Default is to use the {@link com.rabbitmq.client.ConnectionFactory}'s - * {@link ThreadFactory}. - *

- * The {@link ThreadFactory} is used to spawn the IO threads - * and dispatch the shutdown of connections. - * - * @param threadFactory {@link ThreadFactory} used for IO threads and connection shutdown - * @return this {@link NioParams} instance - * @see NioParams#setNbIoThreads(int) - * @see NioParams#setNioExecutor(ExecutorService) - */ - public NioParams setThreadFactory(ThreadFactory threadFactory) { - this.threadFactory = threadFactory; - return this; - } - - public int getWriteQueueCapacity() { - return writeQueueCapacity; - } - - /** - * Set the capacity of the queue used for outbound frames. - * Default capacity is 10,000. - * - * @param writeQueueCapacity - * @return this {@link NioParams} instance - * @see NioParams#setWriteEnqueuingTimeoutInMs(int) - */ - public NioParams setWriteQueueCapacity(int writeQueueCapacity) { - if (writeQueueCapacity <= 0) { - throw new IllegalArgumentException("Write queue capacity must be greater than 0"); - } - this.writeQueueCapacity = writeQueueCapacity; - return this; - } - - public SocketChannelConfigurator getSocketChannelConfigurator() { - return socketChannelConfigurator; - } - - /** - * Set the {@link java.nio.channels.SocketChannel} configurator. - * This gets a chance to "configure" a socket channel - * before it has been opened. The default implementation disables - * Nagle's algorithm. - * - * @param configurator the configurator to use - * @return this {@link NioParams} instance - */ - public NioParams setSocketChannelConfigurator(SocketChannelConfigurator configurator) { - this.socketChannelConfigurator = configurator; - return this; - } - - public SslEngineConfigurator getSslEngineConfigurator() { - return sslEngineConfigurator; - } - - /** - * Set the {@link SSLEngine} configurator. - * This gets a change to "configure" the SSL engine - * before the connection has been opened. This can be - * used e.g. to set {@link javax.net.ssl.SSLParameters}. - * The default implementation doesn't do anything. - * - * @param configurator the configurator to use - * @return this {@link NioParams} instance - */ - public NioParams setSslEngineConfigurator(SslEngineConfigurator configurator) { - this.sslEngineConfigurator = configurator; - return this; - } - - public ExecutorService getConnectionShutdownExecutor() { - return connectionShutdownExecutor; - } - - /** - * Set the {@link ExecutorService} used for connection shutdown. - * If not set, falls back to the NIO executor and then the thread factory. - * This executor service is useful when strict control of the number of threads - * is necessary, the application can experience the closing of several connections - * at once, and automatic recovery is enabled. In such cases, the connection recovery - * can take place in the same pool of threads as the NIO operations, which can - * create deadlocks (all the threads of the pool are busy recovering, and there's no - * thread left for NIO, so connections never recover). - *

- * Note it's developer's responsibility to shut down the executor - * when it is no longer needed. - *

- * Using the thread factory for such scenarios avoid the deadlocks, at the price - * of potentially creating many short-lived threads in case of massive connection lost. - *

- * With both the NIO and connection shutdown executor services set and configured - * accordingly, the application can control reliably the number of threads used. - * - * @param connectionShutdownExecutor the executor service to use - * @return this {@link NioParams} instance - * @see NioParams#setNioExecutor(ExecutorService) - * @since 5.4.0 - */ - public NioParams setConnectionShutdownExecutor(ExecutorService connectionShutdownExecutor) { - this.connectionShutdownExecutor = connectionShutdownExecutor; - return this; - } - - /** - * Set the factory to create {@link java.nio.ByteBuffer}s. - *

- * The default implementation creates heap-based {@link java.nio.ByteBuffer}s. - * - * @param byteBufferFactory the factory to use - * @return this {@link NioParams} instance - * @see ByteBufferFactory - * @see DefaultByteBufferFactory - * @since 5.5.0 - */ - public NioParams setByteBufferFactory(ByteBufferFactory byteBufferFactory) { - this.byteBufferFactory = byteBufferFactory; - return this; - } - - public ByteBufferFactory getByteBufferFactory() { - return byteBufferFactory; - } - - /** - * Set the factory to create {@link NioQueue}s. - *

- * The default uses a {@link ArrayBlockingQueue}. - * - * @param writeQueueFactory the factory to use - * @return this {@link NioParams} instance - * @see NioQueue - * @since 5.5.0 - */ - public NioParams setWriteQueueFactory( - Function writeQueueFactory) { - this.writeQueueFactory = writeQueueFactory; - return this; - } - - public Function getWriteQueueFactory() { - return writeQueueFactory; - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioQueue.java b/src/main/java/com/rabbitmq/client/impl/nio/NioQueue.java deleted file mode 100644 index e44cf12d8e..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioQueue.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.rabbitmq.client.impl.nio; - -/** - * Contract to exchange frame between application threads and NIO thread. - *

- * This is a simplified subset of {@link java.util.concurrent.BlockingQueue}. - * This interface is considered a SPI and is likely to move between - * minor and patch releases. - * - * @see NioParams - * @since 5.5.0 - * @deprecated use {@link com.rabbitmq.client.ConnectionFactory#netty()} instead - */ -@Deprecated -public interface NioQueue { - - /** - * Enqueue a frame, block if the queue is full. - * - * @param writeRequest - * @return - * @throws InterruptedException - */ - boolean offer(WriteRequest writeRequest) throws InterruptedException; - - /** - * Get the current size of the queue. - * - * @return - */ - int size(); - - /** - * Retrieves and removes the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty - */ - WriteRequest poll(); - - /** - * Returns {@code true} if the queue contains no element. - * - * @return {@code true} if the queue contains no element - */ - boolean isEmpty(); -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SelectorHolder.java b/src/main/java/com/rabbitmq/client/impl/nio/SelectorHolder.java deleted file mode 100644 index 48a058fcca..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SelectorHolder.java +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import java.nio.channels.Selector; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * - */ -public class SelectorHolder { - - final Selector selector; - - final Set registrations = Collections - .newSetFromMap(new ConcurrentHashMap()); - - SelectorHolder(Selector selector) { - this.selector = selector; - } - - public void registerFrameHandlerState(SocketChannelFrameHandlerState state, int operations) { - registrations.add(new SocketChannelRegistration(state, operations)); - selector.wakeup(); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java deleted file mode 100644 index 656a9d7039..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.impl.AMQConnection; -import com.rabbitmq.client.impl.Frame; -import com.rabbitmq.client.impl.FrameHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.SocketException; -import java.time.Duration; - -/** - * - */ -public class SocketChannelFrameHandler implements FrameHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(SocketChannelFrameHandler.class); - - private final SocketChannelFrameHandlerState state; - - public SocketChannelFrameHandler(SocketChannelFrameHandlerState state) { - this.state = state; - } - - @Override - public InetAddress getLocalAddress() { - return state.getChannel().socket().getLocalAddress(); - } - - @Override - public int getLocalPort() { - return state.getChannel().socket().getLocalPort(); - } - - @Override - public InetAddress getAddress() { - return state.getChannel().socket().getInetAddress(); - } - - @Override - public int getPort() { - return state.getChannel().socket().getPort(); - } - - @Override - public void setTimeout(int timeoutMs) throws SocketException { - state.getChannel().socket().setSoTimeout(timeoutMs); - if (state.getConnection() != null) { - state.setHeartbeat(Duration.ofSeconds(state.getConnection().getHeartbeat())); - } - } - - @Override - public int getTimeout() throws SocketException { - return state.getChannel().socket().getSoTimeout(); - } - - @Override - public void sendHeader() throws IOException { - state.sendHeader(); - } - - @Override - public void initialize(AMQConnection connection) { - state.setConnection(connection); - } - - @Override - public Frame readFrame() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void writeFrame(Frame frame) throws IOException { - state.write(frame); - } - - @Override - public void flush() throws IOException { - - } - - @Override - public void close() { - try { - state.close(); - } catch (IOException e) { - LOGGER.warn("Error while closing SocketChannel", e); - } - } - - public SocketChannelFrameHandlerState getState() { - return state; - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerFactory.java deleted file mode 100644 index 34ec7d3f7d..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerFactory.java +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.Address; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.SslContextFactory; -import com.rabbitmq.client.impl.AbstractFrameHandlerFactory; -import com.rabbitmq.client.impl.FrameHandler; -import com.rabbitmq.client.impl.TlsUtils; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLHandshakeException; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * - */ -public class SocketChannelFrameHandlerFactory extends AbstractFrameHandlerFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(SocketChannelFrameHandler.class); - - final NioParams nioParams; - - private final SslContextFactory sslContextFactory; - - private final Lock stateLock = new ReentrantLock(); - - private final AtomicLong globalConnectionCount = new AtomicLong(); - - private final List nioLoopContexts; - - public SocketChannelFrameHandlerFactory(int connectionTimeout, NioParams nioParams, boolean ssl, - SslContextFactory sslContextFactory, - int maxInboundMessageBodySize) { - super(connectionTimeout, null, ssl, maxInboundMessageBodySize); - this.nioParams = new NioParams(nioParams); - this.sslContextFactory = sslContextFactory; - this.nioLoopContexts = new ArrayList<>(this.nioParams.getNbIoThreads()); - for (int i = 0; i < this.nioParams.getNbIoThreads(); i++) { - this.nioLoopContexts.add(new NioLoopContext(this, this.nioParams)); - } - } - - @Override - public FrameHandler create(Address addr, String connectionName) throws IOException { - int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl); - - SSLEngine sslEngine = null; - SocketChannel channel = null; - - try { - if (ssl) { - SSLContext sslContext = sslContextFactory.create(connectionName); - sslEngine = sslContext.createSSLEngine(addr.getHost(), portNumber); - sslEngine.setUseClientMode(true); - if (nioParams.getSslEngineConfigurator() != null) { - nioParams.getSslEngineConfigurator().configure(sslEngine); - } - } - - SocketAddress address = addr.toInetSocketAddress(portNumber); - // No Sonar: the channel is closed in case of error and it cannot - // be closed here because it's part of the state of the connection - // to be returned. - channel = SocketChannel.open(); //NOSONAR - channel.configureBlocking(true); - if(nioParams.getSocketChannelConfigurator() != null) { - nioParams.getSocketChannelConfigurator().configure(channel); - } - - channel.socket().connect(address, this.connectionTimeout); - - - if (ssl) { - int initialSoTimeout = channel.socket().getSoTimeout(); - channel.socket().setSoTimeout(this.connectionTimeout); - sslEngine.beginHandshake(); - try { - ReadableByteChannel wrappedReadChannel = Channels.newChannel( - channel.socket().getInputStream()); - WritableByteChannel wrappedWriteChannel = Channels.newChannel( - channel.socket().getOutputStream()); - boolean handshake = SslEngineHelper.doHandshake( - wrappedWriteChannel, wrappedReadChannel, sslEngine); - if (!handshake) { - LOGGER.error("TLS connection failed"); - throw new SSLException("TLS handshake failed"); - } - channel.socket().setSoTimeout(initialSoTimeout); - } catch (SSLHandshakeException e) { - LOGGER.error("TLS connection failed: {}", e.getMessage()); - throw e; - } - TlsUtils.logPeerCertificateInfo(sslEngine.getSession()); - } - - channel.configureBlocking(false); - - // lock - stateLock.lock(); - NioLoopContext nioLoopContext = null; - try { - long modulo = globalConnectionCount.getAndIncrement() % nioParams.getNbIoThreads(); - nioLoopContext = nioLoopContexts.get((int) modulo); - nioLoopContext.initStateIfNecessary(); - SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState( - channel, - nioLoopContext, - nioParams, - sslEngine, - this.maxInboundMessageBodySize - ); - state.startReading(); - SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state); - return frameHandler; - } finally { - stateLock.unlock(); - } - - - } catch(IOException e) { - try { - if(sslEngine != null && channel != null) { - SslEngineHelper.close(channel, sslEngine); - } - if (channel != null) { - channel.close(); - } - } catch(IOException closingException) { - // ignore - } - throw e; - } - - } - - void lock() { - stateLock.lock(); - } - - void unlock() { - stateLock.unlock(); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java deleted file mode 100644 index 89a5d6e45c..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import com.rabbitmq.client.impl.AMQConnection; -import com.rabbitmq.client.impl.Frame; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLEngine; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.time.Duration; - -/** - * - */ -public class SocketChannelFrameHandlerState { - - private static final Logger LOGGER = LoggerFactory.getLogger(SocketChannelFrameHandlerState.class); - - /** Time to linger before closing the socket forcefully. */ - private static final int SOCKET_CLOSING_TIMEOUT = 1; - - private final SocketChannel channel; - - private final NioQueue writeQueue; - - private volatile AMQConnection connection; - private volatile long heartbeatNanoSeconds = -1; - - /** should be used only in the NIO read thread */ - private long lastActivity; - - private final SelectorHolder writeSelectorState; - - private final SelectorHolder readSelectorState; - - final boolean ssl; - - final SSLEngine sslEngine; - - /** outbound app data (to be crypted if TLS is on) */ - final ByteBuffer plainOut; - - /** inbound app data (deciphered if TLS is on) */ - final ByteBuffer plainIn; - - /** outbound net data (ciphered if TLS is on) */ - final ByteBuffer cipherOut; - - /** inbound data (ciphered if TLS is on) */ - final ByteBuffer cipherIn; - - final DataOutputStream outputStream; - - final FrameBuilder frameBuilder; - - public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioLoopsState, - NioParams nioParams, SSLEngine sslEngine, - int maxFramePayloadSize) { - this.channel = channel; - this.readSelectorState = nioLoopsState.readSelectorState; - this.writeSelectorState = nioLoopsState.writeSelectorState; - - NioContext nioContext = new NioContext(nioParams, sslEngine); - - this.writeQueue = nioParams.getWriteQueueFactory() == null ? - NioParams.DEFAULT_WRITE_QUEUE_FACTORY.apply(nioContext) : - nioParams.getWriteQueueFactory().apply(nioContext); - - this.sslEngine = sslEngine; - if(this.sslEngine == null) { - this.ssl = false; - this.plainOut = nioLoopsState.writeBuffer; - this.cipherOut = null; - this.plainIn = nioLoopsState.readBuffer; - this.cipherIn = null; - - this.outputStream = new DataOutputStream( - new ByteBufferOutputStream(channel, plainOut) - ); - - this.frameBuilder = new FrameBuilder(channel, plainIn, maxFramePayloadSize); - - } else { - this.ssl = true; - this.plainOut = nioParams.getByteBufferFactory().createWriteBuffer(nioContext); - this.cipherOut = nioParams.getByteBufferFactory().createEncryptedWriteBuffer(nioContext); - this.plainIn = nioParams.getByteBufferFactory().createReadBuffer(nioContext); - this.cipherIn = nioParams.getByteBufferFactory().createEncryptedReadBuffer(nioContext); - - this.outputStream = new DataOutputStream( - new SslEngineByteBufferOutputStream(sslEngine, plainOut, cipherOut, channel) - ); - this.frameBuilder = new SslEngineFrameBuilder(sslEngine, plainIn, - cipherIn, channel, maxFramePayloadSize); - } - - } - - public SocketChannel getChannel() { - return channel; - } - - public NioQueue getWriteQueue() { - return writeQueue; - } - - public void sendHeader() throws IOException { - sendWriteRequest(HeaderWriteRequest.SINGLETON); - } - - public void write(Frame frame) throws IOException { - sendWriteRequest(new FrameWriteRequest(frame)); - } - - private void sendWriteRequest(WriteRequest writeRequest) throws IOException { - try { - boolean offered = this.writeQueue.offer(writeRequest); - if(offered) { - this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE); - this.readSelectorState.selector.wakeup(); - } else { - throw new IOException("Frame enqueuing failed"); - } - } catch (InterruptedException e) { - LOGGER.warn("Thread interrupted during enqueuing frame in write queue"); - Thread.currentThread().interrupt(); - } - } - - public void startReading() { - this.readSelectorState.registerFrameHandlerState(this, SelectionKey.OP_READ); - } - - public AMQConnection getConnection() { - return connection; - } - - public void setConnection(AMQConnection connection) { - this.connection = connection; - } - - void setHeartbeat(Duration ht) { - this.heartbeatNanoSeconds = ht.toNanos(); - } - - public void setLastActivity(long lastActivity) { - this.lastActivity = lastActivity; - } - - public long getLastActivity() { - return lastActivity; - } - - long getHeartbeatNanoSeconds() { - return this.heartbeatNanoSeconds; - } - - void prepareForWriteSequence() { - if(ssl) { - plainOut.clear(); - cipherOut.clear(); - } - } - - void endWriteSequence() { - if(!ssl) { - plainOut.clear(); - } - } - - void prepareForReadSequence() throws IOException { - if(ssl) { - if (!frameBuilder.isUnderflowHandlingEnabled()) { - cipherIn.clear(); - cipherIn.flip(); - } - - plainIn.clear(); - plainIn.flip(); - - } else { - NioHelper.read(channel, plainIn); - plainIn.flip(); - } - } - - boolean continueReading() throws IOException { - if(ssl) { - if (frameBuilder.isUnderflowHandlingEnabled()) { - int bytesRead = NioHelper.read(channel, cipherIn); - if (bytesRead == 0) { - return false; - } else { - cipherIn.flip(); - return true; - } - } - if (!plainIn.hasRemaining() && !cipherIn.hasRemaining()) { - // need to try to read something - cipherIn.clear(); - int bytesRead = NioHelper.read(channel, cipherIn); - if (bytesRead == 0) { - return false; - } else { - cipherIn.flip(); - return true; - } - } else { - return true; - } - } else { - if (!plainIn.hasRemaining()) { - plainIn.clear(); - NioHelper.read(channel, plainIn); - plainIn.flip(); - } - return plainIn.hasRemaining(); - } - } - - void close() throws IOException { - if(ssl) { - SslEngineHelper.close(channel, sslEngine); - } - if(channel.isOpen()) { - channel.socket().setSoLinger(true, SOCKET_CLOSING_TIMEOUT); - channel.close(); - } - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelRegistration.java b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelRegistration.java deleted file mode 100644 index 256f534230..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelRegistration.java +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -/** - * - */ -public class SocketChannelRegistration { - - final SocketChannelFrameHandlerState state; - final int operations; - - public SocketChannelRegistration(SocketChannelFrameHandlerState state, int operations) { - this.state = state; - this.operations = operations; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - SocketChannelRegistration that = (SocketChannelRegistration) o; - - return state.getChannel().equals(that.state.getChannel()); - } - - @Override - public int hashCode() { - return state.getChannel().hashCode(); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferOutputStream.java b/src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferOutputStream.java deleted file mode 100644 index d8a0920a9f..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferOutputStream.java +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import javax.net.ssl.SSLEngine; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; - -/** - * Bridge between the byte buffer and stream worlds. - */ -public class SslEngineByteBufferOutputStream extends OutputStream { - - private final SSLEngine sslEngine; - - private final ByteBuffer plainOut, cypherOut; - - private final WritableByteChannel channel; - - public SslEngineByteBufferOutputStream(SSLEngine sslEngine, ByteBuffer plainOut, ByteBuffer cypherOut, WritableByteChannel channel) { - this.sslEngine = sslEngine; - this.plainOut = plainOut; - this.cypherOut = cypherOut; - this.channel = channel; - } - - @Override - public void write(int b) throws IOException { - if (!plainOut.hasRemaining()) { - doFlush(); - } - plainOut.put((byte) b); - } - - @Override - public void flush() throws IOException { - if (plainOut.position() > 0) { - doFlush(); - } - } - - private void doFlush() throws IOException { - plainOut.flip(); - SslEngineHelper.write(channel, sslEngine, plainOut, cypherOut); - plainOut.clear(); - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SslEngineFrameBuilder.java b/src/main/java/com/rabbitmq/client/impl/nio/SslEngineFrameBuilder.java deleted file mode 100644 index 31601afa6c..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SslEngineFrameBuilder.java +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; - - -/** - * Sub-class of {@link FrameBuilder} that unwraps crypted data from the network. - * @since 4.4.0 - */ -public class SslEngineFrameBuilder extends FrameBuilder { - - private final SSLEngine sslEngine; - - private final ByteBuffer cipherBuffer; - - private boolean isUnderflowHandlingEnabled = false; - - public SslEngineFrameBuilder(SSLEngine sslEngine, ByteBuffer plainIn, - ByteBuffer cipherIn, ReadableByteChannel channel, - int maxPayloadSize) { - super(channel, plainIn, maxPayloadSize); - this.sslEngine = sslEngine; - this.cipherBuffer = cipherIn; - } - - @Override - protected boolean somethingToRead() throws IOException { - if (applicationBuffer.hasRemaining() && !isUnderflowHandlingEnabled) { - return true; - } else { - applicationBuffer.clear(); - - boolean underflowHandling = false; - - try { - SSLEngineResult result = sslEngine.unwrap(cipherBuffer, applicationBuffer); - switch (result.getStatus()) { - case OK: - applicationBuffer.flip(); - if (applicationBuffer.hasRemaining()) { - return true; - } - applicationBuffer.clear(); - break; - case BUFFER_OVERFLOW: - throw new SSLException("buffer overflow in read"); - case BUFFER_UNDERFLOW: - cipherBuffer.compact(); - underflowHandling = true; - return false; - case CLOSED: - throw new SSLException("closed in read"); - default: - throw new IllegalStateException("Invalid SSL status: " + result.getStatus()); - } - } finally { - isUnderflowHandlingEnabled = underflowHandling; - } - - return false; - } - } - - @Override - public boolean isUnderflowHandlingEnabled() { - return isUnderflowHandlingEnabled; - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SslEngineHelper.java b/src/main/java/com/rabbitmq/client/impl/nio/SslEngineHelper.java deleted file mode 100644 index c191233f42..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/SslEngineHelper.java +++ /dev/null @@ -1,243 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED; -import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK; -import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_WRAP; -import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING; - -/** - * - */ -public class SslEngineHelper { - - private static final Logger LOGGER = LoggerFactory.getLogger(SslEngineHelper.class); - - public static boolean doHandshake(WritableByteChannel writeChannel, ReadableByteChannel readChannel, SSLEngine engine) throws IOException { - - ByteBuffer plainOut = ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()); - ByteBuffer plainIn = ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()); - ByteBuffer cipherOut = ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); - ByteBuffer cipherIn = ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); - - LOGGER.debug("Starting TLS handshake"); - - SSLEngineResult.HandshakeStatus handshakeStatus = engine.getHandshakeStatus(); - LOGGER.debug("Initial handshake status is {}", handshakeStatus); - while (handshakeStatus != FINISHED && handshakeStatus != NOT_HANDSHAKING) { - LOGGER.debug("Handshake status is {}", handshakeStatus); - switch (handshakeStatus) { - case NEED_TASK: - LOGGER.debug("Running tasks"); - handshakeStatus = runDelegatedTasks(engine); - break; - case NEED_UNWRAP: - LOGGER.debug("Unwrapping..."); - handshakeStatus = unwrap(cipherIn, plainIn, readChannel, engine); - break; - case NEED_WRAP: - LOGGER.debug("Wrapping..."); - handshakeStatus = wrap(plainOut, cipherOut, writeChannel, engine); - break; - case FINISHED: - break; - case NOT_HANDSHAKING: - break; - default: - throw new SSLException("Unexpected handshake status " + handshakeStatus); - } - } - - - LOGGER.debug("TLS handshake completed"); - return true; - } - - private static SSLEngineResult.HandshakeStatus runDelegatedTasks(SSLEngine sslEngine) { - // FIXME run in executor? - Runnable runnable; - while ((runnable = sslEngine.getDelegatedTask()) != null) { - LOGGER.debug("Running delegated task"); - runnable.run(); - } - return sslEngine.getHandshakeStatus(); - } - - private static SSLEngineResult.HandshakeStatus unwrap(ByteBuffer cipherIn, ByteBuffer plainIn, - ReadableByteChannel channel, SSLEngine sslEngine) throws IOException { - SSLEngineResult.HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus(); - LOGGER.debug("Handshake status is {} before unwrapping", handshakeStatus); - - LOGGER.debug("Cipher in position {}", cipherIn.position()); - int read; - if (cipherIn.position() == 0) { - LOGGER.debug("Reading from channel"); - read = channel.read(cipherIn); - LOGGER.debug("Read {} byte(s) from channel", read); - if (read < 0) { - throw new SSLException("Could not read from socket channel"); - } - cipherIn.flip(); - } else { - LOGGER.debug("Not reading"); - } - - SSLEngineResult.Status status; - SSLEngineResult unwrapResult; - do { - int positionBeforeUnwrapping = cipherIn.position(); - LOGGER.debug("Before unwrapping cipherIn is {}, with {} remaining byte(s)", cipherIn, cipherIn.remaining()); - unwrapResult = sslEngine.unwrap(cipherIn, plainIn); - LOGGER.debug("SSL engine result is {} after unwrapping", unwrapResult); - status = unwrapResult.getStatus(); - switch (status) { - case OK: - plainIn.clear(); - if (unwrapResult.getHandshakeStatus() == NEED_TASK) { - handshakeStatus = runDelegatedTasks(sslEngine); - cipherIn.position(positionBeforeUnwrapping + unwrapResult.bytesConsumed()); - } else { - handshakeStatus = unwrapResult.getHandshakeStatus(); - } - break; - case BUFFER_OVERFLOW: - throw new SSLException("Buffer overflow during handshake"); - case BUFFER_UNDERFLOW: - LOGGER.debug("Buffer underflow"); - cipherIn.compact(); - LOGGER.debug("Reading from channel..."); - read = NioHelper.read(channel, cipherIn); - if(read <= 0) { - retryRead(channel, cipherIn); - } - LOGGER.debug("Done reading from channel..."); - cipherIn.flip(); - break; - case CLOSED: - sslEngine.closeInbound(); - break; - default: - throw new SSLException("Unexpected status from " + unwrapResult); - } - } - while (unwrapResult.getHandshakeStatus() != NEED_WRAP && unwrapResult.getHandshakeStatus() != FINISHED); - - LOGGER.debug("cipherIn position after unwrap {}", cipherIn.position()); - return handshakeStatus; - } - - private static int retryRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { - int attempt = 0; - int read = 0; - while(attempt < 3) { - try { - Thread.sleep(100L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - read = NioHelper.read(channel, buffer); - if(read > 0) { - break; - } - attempt++; - } - return read; - } - - private static SSLEngineResult.HandshakeStatus wrap(ByteBuffer plainOut, ByteBuffer cipherOut, - WritableByteChannel channel, SSLEngine sslEngine) throws IOException { - SSLEngineResult.HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus(); - LOGGER.debug("Handshake status is {} before wrapping", handshakeStatus); - SSLEngineResult result = sslEngine.wrap(plainOut, cipherOut); - LOGGER.debug("SSL engine result is {} after wrapping", result); - switch (result.getStatus()) { - case OK: - cipherOut.flip(); - while (cipherOut.hasRemaining()) { - int written = channel.write(cipherOut); - LOGGER.debug("Wrote {} byte(s)", written); - } - cipherOut.clear(); - if (result.getHandshakeStatus() == NEED_TASK) { - handshakeStatus = runDelegatedTasks(sslEngine); - } else { - handshakeStatus = result.getHandshakeStatus(); - } - - break; - case BUFFER_OVERFLOW: - throw new SSLException("Buffer overflow during handshake"); - default: - throw new SSLException("Unexpected status " + result.getStatus()); - } - return handshakeStatus; - } - - public static void write(WritableByteChannel socketChannel, SSLEngine engine, ByteBuffer plainOut, ByteBuffer cypherOut) throws IOException { - while (plainOut.hasRemaining()) { - cypherOut.clear(); - SSLEngineResult result = engine.wrap(plainOut, cypherOut); - switch (result.getStatus()) { - case OK: - cypherOut.flip(); - while (cypherOut.hasRemaining()) { - socketChannel.write(cypherOut); - } - break; - case BUFFER_OVERFLOW: - throw new SSLException("Buffer overflow occured after a wrap."); - case BUFFER_UNDERFLOW: - throw new SSLException("Buffer underflow occured after a wrap."); - case CLOSED: - throw new SSLException("Buffer closed"); - default: - throw new IllegalStateException("Invalid SSL status: " + result.getStatus()); - } - } - } - - public static void close(WritableByteChannel channel, SSLEngine engine) throws IOException { - ByteBuffer plainOut = ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()); - ByteBuffer cipherOut = ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); - - // won't be sending any more data - engine.closeOutbound(); - - while (!engine.isOutboundDone()) { - engine.wrap(plainOut, cipherOut); - cipherOut.flip(); - while (cipherOut.hasRemaining()) { - int num = channel.write(cipherOut); - if (num == -1) { - // the channel has been closed - break; - } - } - cipherOut.clear(); - } - } -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/WriteRequest.java b/src/main/java/com/rabbitmq/client/impl/nio/WriteRequest.java deleted file mode 100644 index 1550af8246..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/WriteRequest.java +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.impl.nio; - -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * - */ -public interface WriteRequest { - - void handle(DataOutputStream dataOutputStream) throws IOException; - -} diff --git a/src/main/java/com/rabbitmq/client/impl/nio/package-info.java b/src/main/java/com/rabbitmq/client/impl/nio/package-info.java deleted file mode 100644 index 9d6f23e3cb..0000000000 --- a/src/main/java/com/rabbitmq/client/impl/nio/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * NIO network connector. - */ -package com.rabbitmq.client.impl.nio; \ No newline at end of file diff --git a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java index 5b9fba3179..4bb63c02c2 100644 --- a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java +++ b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java @@ -16,7 +16,6 @@ package com.rabbitmq.client.test; import com.rabbitmq.client.*; -import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.tools.Host; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assumptions; @@ -42,17 +41,10 @@ public class BrokerTestCase { protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); - if(TestUtils.isNio()) { - connectionFactory.setNioParams(nioParams()); - } connectionFactory.setAutomaticRecoveryEnabled(isAutomaticRecoveryEnabled()); return connectionFactory; } - protected NioParams nioParams() { - return new NioParams(); - } - protected boolean isAutomaticRecoveryEnabled() { return true; } diff --git a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java index 6fd29f7aa5..a48710fe90 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java @@ -46,14 +46,12 @@ MetricsCollectorTest.class, MicrometerMetricsCollectorTest.class, DnsSrvRecordAddressResolverTest.class, - JavaNioTest.class, ConnectionFactoryTest.class, RecoveryAwareAMQConnectionFactoryTest.class, RpcTest.class, LambdaCallbackTest.class, ChannelAsyncCompletableFutureTest.class, RecoveryDelayHandlerTest.class, - FrameBuilderTest.class, PropertyFileInitialisationTest.class, ClientVersionTest.class, TestUtilsTest.class, @@ -62,7 +60,6 @@ JacksonJsonRpcTest.class, AddressTest.class, DefaultRetryHandlerTest.class, - NioDeadlockOnConnectionClosing.class, GeneratedClassesTest.class, RpcTopologyRecordingTest.class, ConnectionTest.class, diff --git a/src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java b/src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java index 978d57e9fe..43a31d3ff4 100644 --- a/src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java +++ b/src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java @@ -17,7 +17,6 @@ import com.rabbitmq.client.*; import com.rabbitmq.client.impl.*; -import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier; import com.rabbitmq.client.impl.recovery.RetryHandler; import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; @@ -213,7 +212,6 @@ public void shouldBeConfigurableUsingFluentAPI() throws Exception { MetricsCollector metricsCollector = mock(MetricsCollector.class); CredentialsRefreshService credentialsRefreshService = mock(CredentialsRefreshService.class); RecoveryDelayHandler recoveryDelayHandler = mock(RecoveryDelayHandler.class); - NioParams nioParams = mock(NioParams.class); SslContextFactory sslContextFactory = mock(SslContextFactory.class); TopologyRecoveryFilter topologyRecoveryFilter = mock(TopologyRecoveryFilter.class); Predicate connectionRecoveryTriggeringCondition = (ShutdownSignalException) -> true; @@ -249,8 +247,6 @@ public void shouldBeConfigurableUsingFluentAPI() throws Exception { .setCredentialsRefreshService(credentialsRefreshService) .setNetworkRecoveryInterval(7) .setRecoveryDelayHandler(recoveryDelayHandler) - .setNioParams(nioParams) - .useNio() .useBlockingIo() .setChannelRpcTimeout(8) .setSslContextFactory(sslContextFactory) @@ -282,7 +278,6 @@ public void shouldBeConfigurableUsingFluentAPI() throws Exception { assertThat(connectionFactory.getMetricsCollector()).isEqualTo(metricsCollector); assertThat(connectionFactory.getNetworkRecoveryInterval()).isEqualTo(7); assertThat(connectionFactory.getRecoveryDelayHandler()).isEqualTo(recoveryDelayHandler); - assertThat(connectionFactory.getNioParams()).isEqualTo(nioParams); assertThat(connectionFactory.getChannelRpcTimeout()).isEqualTo(8); assertThat(connectionFactory.isChannelShouldCheckRpcResponseType()).isEqualTo(true); assertThat(connectionFactory.getWorkPoolTimeout()).isEqualTo(9); diff --git a/src/test/java/com/rabbitmq/client/test/FrameBuilderTest.java b/src/test/java/com/rabbitmq/client/test/FrameBuilderTest.java deleted file mode 100644 index e5ecbdc324..0000000000 --- a/src/test/java/com/rabbitmq/client/test/FrameBuilderTest.java +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.test; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.MalformedFrameException; -import com.rabbitmq.client.impl.Frame; -import com.rabbitmq.client.impl.nio.FrameBuilder; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; - -/** - * - */ -public class FrameBuilderTest { - - @Mock - ReadableByteChannel channel; - - ByteBuffer buffer; - - FrameBuilder builder; - - AutoCloseable mocks; - - @BeforeEach - void init() { - this.mocks = MockitoAnnotations.openMocks(this); - } - - @AfterEach - void tearDown() throws Exception { - mocks.close(); - } - - @Test - public void buildFrameInOneGo() throws IOException { - buffer = ByteBuffer.wrap(new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end() }); - builder = new FrameBuilder(channel, buffer, Integer.MAX_VALUE); - Frame frame = builder.readFrame(); - assertThat(frame).isNotNull(); - assertThat(frame.getType()).isEqualTo(1); - assertThat(frame.getChannel()).isEqualTo(0); - assertThat(frame.getPayload()).hasSize(3); - } - - @Test - public void buildFramesInOneGo() throws IOException { - byte[] frameContent = new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end() }; - int nbFrames = 13; - byte[] frames = new byte[frameContent.length * nbFrames]; - for (int i = 0; i < nbFrames; i++) { - for (int j = 0; j < frameContent.length; j++) { - frames[i * frameContent.length + j] = frameContent[j]; - } - } - buffer = ByteBuffer.wrap(frames); - builder = new FrameBuilder(channel, buffer, Integer.MAX_VALUE); - int frameCount = 0; - Frame frame; - while ((frame = builder.readFrame()) != null) { - assertThat(frame).isNotNull(); - assertThat(frame.getType()).isEqualTo(1); - assertThat(frame.getChannel()).isEqualTo(0); - assertThat(frame.getPayload()).hasSize(3); - frameCount++; - } - assertThat(frameCount).isEqualTo(nbFrames); - } - - @Test - public void buildFrameInSeveralCalls() throws IOException { - buffer = ByteBuffer.wrap(new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2 }); - builder = new FrameBuilder(channel, buffer, Integer.MAX_VALUE); - Frame frame = builder.readFrame(); - assertThat(frame).isNull(); - - buffer.clear(); - buffer.put(b(3)).put(end()); - buffer.flip(); - - frame = builder.readFrame(); - assertThat(frame).isNotNull(); - assertThat(frame.getType()).isEqualTo(1); - assertThat(frame.getChannel()).isEqualTo(0); - assertThat(frame.getPayload()).hasSize(3); - } - - @Test - public void protocolMismatchHeader() throws IOException { - ByteBuffer[] buffers = new ByteBuffer[] { - ByteBuffer.wrap(new byte[] { 'A' }), - ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q' }), - ByteBuffer.wrap(new byte[] { 'A', 'N', 'Q', 'P' }), - ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P' }), - ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P', 1, 1, 8 }), - ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P', 1, 1, 8, 0 }), - ByteBuffer.wrap(new byte[] { 'A', 'M', 'Q', 'P', 1, 1, 9, 1 }) - }; - String[] messages = new String[] { - "Invalid AMQP protocol header from server: read only 1 byte(s) instead of 4", - "Invalid AMQP protocol header from server: read only 3 byte(s) instead of 4", - "Invalid AMQP protocol header from server: expected character 77, got 78", - "Invalid AMQP protocol header from server", - "Invalid AMQP protocol header from server", - "AMQP protocol version mismatch; we are version 0-9-1, server is 0-8", - "AMQP protocol version mismatch; we are version 0-9-1, server sent signature 1,1,9,1" - }; - - for (int i = 0; i < buffers.length; i++) { - builder = new FrameBuilder(channel, buffers[i], Integer.MAX_VALUE); - try { - builder.readFrame(); - fail("protocol header not correct, exception should have been thrown"); - } catch (MalformedFrameException e) { - assertThat(e.getMessage()).isEqualTo(messages[i]); - } - } - } - - byte b(int b) { - return (byte) b; - } - - byte end() { - return (byte) AMQP.FRAME_END; - } -} diff --git a/src/test/java/com/rabbitmq/client/test/FrameTest.java b/src/test/java/com/rabbitmq/client/test/FrameTest.java index 45742dcd2f..1e99a3611b 100644 --- a/src/test/java/com/rabbitmq/client/test/FrameTest.java +++ b/src/test/java/com/rabbitmq/client/test/FrameTest.java @@ -2,11 +2,11 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.impl.Frame; -import com.rabbitmq.client.impl.nio.ByteBufferOutputStream; import org.junit.jupiter.api.Test; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; @@ -105,4 +105,36 @@ public static void drain(WritableByteChannel channel, ByteBuffer buffer) throws buffer.clear(); } + private static class ByteBufferOutputStream extends OutputStream { + + private final WritableByteChannel channel; + + private final ByteBuffer buffer; + + public ByteBufferOutputStream(WritableByteChannel channel, ByteBuffer buffer) { + this.buffer = buffer; + this.channel = channel; + } + + @Override + public void write(int b) throws IOException { + if(!buffer.hasRemaining()) { + drain(channel, buffer); + } + buffer.put((byte) b); + } + + @Override + public void flush() throws IOException { + drain(channel, buffer); + } + + public static void drain(WritableByteChannel channel, ByteBuffer buffer) throws IOException { + buffer.flip(); + while(buffer.hasRemaining() && channel.write(buffer) != -1); + buffer.clear(); + } + + } + } diff --git a/src/test/java/com/rabbitmq/client/test/JavaNioTest.java b/src/test/java/com/rabbitmq/client/test/JavaNioTest.java deleted file mode 100644 index cdc095e40c..0000000000 --- a/src/test/java/com/rabbitmq/client/test/JavaNioTest.java +++ /dev/null @@ -1,251 +0,0 @@ -package com.rabbitmq.client.test; - -import com.rabbitmq.client.*; -import com.rabbitmq.client.impl.nio.BlockingQueueNioQueue; -import com.rabbitmq.client.impl.nio.DefaultByteBufferFactory; -import com.rabbitmq.client.impl.nio.NioParams; -import org.assertj.core.api.Condition; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * - */ -public class JavaNioTest { - - public static final String QUEUE = "nio.queue"; - - private Connection testConnection; - - @BeforeEach - public void init() throws Exception { - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio(); - testConnection = connectionFactory.newConnection(); - } - - @AfterEach - public void tearDown() throws Exception { - if (testConnection != null) { - testConnection.createChannel().queueDelete(QUEUE); - testConnection.close(); - } - } - - @Test - public void connection() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio(); - Connection connection = null; - try { - connection = basicGetBasicConsume(connectionFactory, "nio.queue", latch); - boolean messagesReceived = latch.await(5, TimeUnit.SECONDS); - assertTrue(messagesReceived, "Message has not been received"); - } finally { - safeClose(connection); - } - } - - @Test - public void twoConnections() throws IOException, TimeoutException, InterruptedException { - CountDownLatch latch = new CountDownLatch(2); - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio() - .setNioParams(new NioParams().setNbIoThreads(4)); - Connection connection1 = null; - Connection connection2 = null; - try { - connection1 = basicGetBasicConsume(connectionFactory, "nio.queue.1", latch); - connection2 = basicGetBasicConsume(connectionFactory, "nio.queue.2", latch); - - boolean messagesReceived = latch.await(5, TimeUnit.SECONDS); - assertTrue(messagesReceived, "Messages have not been received"); - } finally { - safeClose(connection1); - safeClose(connection2); - } - } - - @Test - public void twoConnectionsWithNioExecutor() throws IOException, TimeoutException, InterruptedException { - CountDownLatch latch = new CountDownLatch(2); - ExecutorService nioExecutor = Executors.newFixedThreadPool(5); - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio(); - Connection connection1 = null; - Connection connection2 = null; - try { - connection1 = basicGetBasicConsume(connectionFactory, "nio.queue.1", latch); - connection2 = basicGetBasicConsume(connectionFactory, "nio.queue.2", latch); - - boolean messagesReceived = latch.await(5, TimeUnit.SECONDS); - assertTrue(messagesReceived, "Messages have not been received"); - } finally { - safeClose(connection1); - safeClose(connection2); - nioExecutor.shutdownNow(); - } - } - - @Test - public void shutdownListenerCalled() throws IOException, TimeoutException, InterruptedException { - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio(); - Connection connection = connectionFactory.newConnection(); - try { - final CountDownLatch latch = new CountDownLatch(1); - connection.addShutdownListener(new ShutdownListener() { - - @Override - public void shutdownCompleted(ShutdownSignalException cause) { - latch.countDown(); - } - }); - safeClose(connection); - assertTrue(latch.await(5, TimeUnit.SECONDS), "Shutdown listener should have been called"); - } finally { - safeClose(connection); - } - } - - @Test - public void nioLoopCleaning() throws Exception { - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio(); - for (int i = 0; i < 10; i++) { - Connection connection = connectionFactory.newConnection(); - connection.abort(); - } - } - - @Test - public void messageSize() throws Exception { - for (int i = 0; i < 50; i++) { - sendAndVerifyMessage(testConnection, 76390); - } - } - - @Test - public void byteBufferFactory() throws Exception { - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio(); - int baseCapacity = 32768; - NioParams nioParams = new NioParams(); - nioParams.setReadByteBufferSize(baseCapacity / 2); - nioParams.setWriteByteBufferSize(baseCapacity / 4); - List byteBuffers = new CopyOnWriteArrayList<>(); - connectionFactory.setNioParams(nioParams.setByteBufferFactory(new DefaultByteBufferFactory(capacity -> { - ByteBuffer bb = ByteBuffer.allocate(capacity); - byteBuffers.add(bb); - return bb; - }))); - - try (Connection c = connectionFactory.newConnection()) { - sendAndVerifyMessage(c, 100); - } - - assertThat(byteBuffers).hasSize(2); - Condition condition = new Condition<>(c -> c == nioParams.getReadByteBufferSize() || - c == nioParams.getWriteByteBufferSize(), "capacity set by factory"); - assertThat(byteBuffers.get(0).capacity()).is(condition); - assertThat(byteBuffers.get(1).capacity()).is(condition); - } - - @Test - public void directByteBuffers() throws Exception { - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio() - .setNioParams(new NioParams().setByteBufferFactory(new DefaultByteBufferFactory(capacity -> ByteBuffer.allocateDirect(capacity)))); - try (Connection c = connectionFactory.newConnection()) { - sendAndVerifyMessage(c, 100); - } - } - - @Test - public void customWriteQueue() throws Exception { - AtomicInteger count = new AtomicInteger(0); - ConnectionFactory connectionFactory = new ConnectionFactory() - .useNio() - .setNioParams(new NioParams().setWriteQueueFactory(ctx -> { - count.incrementAndGet(); - return new BlockingQueueNioQueue( - new LinkedBlockingQueue<>(ctx.getNioParams().getWriteQueueCapacity()), - ctx.getNioParams().getWriteEnqueuingTimeoutInMs() - ); - })); - try (Connection c = connectionFactory.newConnection()) { - sendAndVerifyMessage(c, 100); - } - assertEquals(1, count.get()); - } - - private void sendAndVerifyMessage(Connection connection, int size) throws Exception { - CountDownLatch latch = new CountDownLatch(1); - boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size); - assertTrue(messageReceived, "Message has not been received"); - } - - private Connection basicGetBasicConsume(ConnectionFactory connectionFactory, String queue, final CountDownLatch latch) - throws IOException, TimeoutException { - Connection connection = connectionFactory.newConnection(); - Channel channel = connection.createChannel(); - channel.queueDeclare(queue, false, false, false, null); - channel.queuePurge(queue); - - channel.basicPublish("", queue, null, new byte[20000]); - - channel.basicConsume(queue, false, new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - getChannel().basicAck(envelope.getDeliveryTag(), false); - latch.countDown(); - } - }); - - return connection; - } - - private boolean basicGetBasicConsume(Connection connection, String queue, final CountDownLatch latch, int msgSize) - throws Exception { - Channel channel = connection.createChannel(); - channel.queueDeclare(queue, false, false, false, null); - channel.queuePurge(queue); - - channel.basicPublish("", queue, null, new byte[msgSize]); - - final String tag = channel.basicConsume(queue, false, new DefaultConsumer(channel) { - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - getChannel().basicAck(envelope.getDeliveryTag(), false); - latch.countDown(); - } - }); - - boolean done = latch.await(20, TimeUnit.SECONDS); - channel.basicCancel(tag); - return done; - } - - private void safeClose(Connection connection) { - if (connection != null) { - try { - connection.abort(); - } catch (Exception e) { - // OK - } - } - } -} diff --git a/src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java b/src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java deleted file mode 100644 index 50efb34bb0..0000000000 --- a/src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.test; - -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.impl.nio.NioParams; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * - */ -public class NioDeadlockOnConnectionClosing { - - static final Logger LOGGER = LoggerFactory.getLogger(NioDeadlockOnConnectionClosing.class); - - ExecutorService nioExecutorService, connectionShutdownExecutorService; - ConnectionFactory cf; - List connections; - - @BeforeEach - public void setUp() { - nioExecutorService = Executors.newFixedThreadPool(2); - connectionShutdownExecutorService = Executors.newFixedThreadPool(2); - cf = TestUtils.connectionFactory(); - cf.setAutomaticRecoveryEnabled(true); - cf.useNio(); - cf.setNetworkRecoveryInterval(1000); - NioParams params = new NioParams() - .setNioExecutor(nioExecutorService) - .setConnectionShutdownExecutor(connectionShutdownExecutorService) - .setNbIoThreads(2); - cf.setNioParams(params); - connections = new ArrayList<>(); - } - - @AfterEach - public void tearDown() throws Exception { - for (Connection connection : connections) { - try { - connection.close(2000); - } catch (Exception e) { - LOGGER.warn("Error while closing test connection", e); - } - } - - shutdownExecutorService(nioExecutorService); - shutdownExecutorService(connectionShutdownExecutorService); - } - - private void shutdownExecutorService(ExecutorService executorService) throws InterruptedException { - if (executorService == null) { - return; - } - executorService.shutdown(); - boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS); - if (!terminated) { - LOGGER.warn("Couldn't terminate executor after 5 seconds"); - } - } - - @Test - public void connectionClosing() throws Exception { - for (int i = 0; i < 10; i++) { - connections.add(cf.newConnection()); - } - closeAllConnectionsAndWaitForRecovery(connections); - for (Connection connection : connections) { - assertTrue(connection.isOpen()); - } - } -} diff --git a/src/test/java/com/rabbitmq/client/test/NoAutoRecoveryWhenTcpWindowIsFullTest.java b/src/test/java/com/rabbitmq/client/test/NoAutoRecoveryWhenTcpWindowIsFullTest.java index ecd82d05a5..4c474a2b4d 100644 --- a/src/test/java/com/rabbitmq/client/test/NoAutoRecoveryWhenTcpWindowIsFullTest.java +++ b/src/test/java/com/rabbitmq/client/test/NoAutoRecoveryWhenTcpWindowIsFullTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2018-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -24,7 +24,6 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.impl.recovery.AutorecoveringChannel; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; import com.rabbitmq.client.test.TestUtils.DisabledIfBrokerRunningOnDocker; @@ -107,11 +106,6 @@ public void configure(Socket socket) throws IOException { factory.setShutdownExecutor(executorService); factory.setNetworkRecoveryInterval(2000); - if (TestUtils.isNio()) { - factory.setWorkPoolTimeout(10 * 1000); - factory.setNioParams(new NioParams().setWriteQueueCapacity(10 * 1000 * 1000).setNbIoThreads(4)); - } - producingConnection = (AutorecoveringConnection) factory.newConnection("Producer Connection"); producingChannel = (AutorecoveringChannel) producingConnection.createChannel(); consumingConnection = (AutorecoveringConnection) factory.newConnection("Consuming Connection"); diff --git a/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java b/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java index 57660c17ba..7107e4347f 100644 --- a/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java +++ b/src/test/java/com/rabbitmq/client/test/PropertyFileInitialisationTest.java @@ -119,15 +119,6 @@ public void propertyInitialisationOverrideDefaultClientProperty() { assertThat(cf.getClientProperties()).extracting(key).isEqualTo("whatever"); } - @Test - public void propertyInitialisationDoNotUseNio() throws Exception { - cf.load(new HashMap() {{ - put("rabbitmq.use.nio", "false"); - put("rabbitmq.nio.nb.io.threads", "2"); - }}); - assertThat(cf.getNioParams().getNbIoThreads()).isNotEqualTo(2); - } - @Test public void lookUp() { assertThat(ConnectionFactoryConfigurator.lookUp( @@ -265,13 +256,6 @@ private void checkConnectionFactory(ConnectionFactory connectionFactory) { assertThat(connectionFactory.getNetworkRecoveryInterval()).isEqualTo(10000l); assertThat(connectionFactory.getChannelRpcTimeout()).isEqualTo(10000); assertThat(connectionFactory.isChannelShouldCheckRpcResponseType()).isTrue(); - - assertThat(connectionFactory.getNioParams()).isNotNull(); - assertThat(connectionFactory.getNioParams().getReadByteBufferSize()).isEqualTo(32000); - assertThat(connectionFactory.getNioParams().getWriteByteBufferSize()).isEqualTo(32000); - assertThat(connectionFactory.getNioParams().getNbIoThreads()).isEqualTo(2); - assertThat(connectionFactory.getNioParams().getWriteEnqueuingTimeoutInMs()).isEqualTo(5000); - assertThat(connectionFactory.getNioParams().getWriteQueueCapacity()).isEqualTo(1000); } private Properties getPropertiesWitPrefix(String prefix) throws IOException { diff --git a/src/test/java/com/rabbitmq/client/test/TestUtils.java b/src/test/java/com/rabbitmq/client/test/TestUtils.java index 5fec97b80b..69ce57afaa 100644 --- a/src/test/java/com/rabbitmq/client/test/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/test/TestUtils.java @@ -51,10 +51,9 @@ public class TestUtils { public static final String IO_LAYER = System.getProperty("io.layer", "netty"); private static final String IO_SOCKET = "socket"; - private static final String IO_NIO = "nio"; public static final String IO_NETTY = "netty"; public static final List IO_LAYERS = - Collections.unmodifiableList(Arrays.asList(IO_SOCKET, IO_NIO, IO_NETTY)); + Collections.unmodifiableList(Arrays.asList(IO_SOCKET, IO_NETTY)); private static final ThreadLocal EVENT_LOOP_GROUP = new ThreadLocal<>(); @@ -72,10 +71,6 @@ public static boolean isSocket() { return isSocket(IO_LAYER); } - public static boolean isNio() { - return isNio(IO_LAYER); - } - public static boolean isNetty() { return isNetty(IO_LAYER); } @@ -84,10 +79,6 @@ private static boolean isSocket(String layer) { return IO_SOCKET.equals(layer); } - private static boolean isNio(String layer) { - return IO_NIO.equals(layer); - } - private static boolean isNetty(String layer) { return IO_NETTY.equals(layer); } @@ -100,9 +91,7 @@ public static void setIoLayer(ConnectionFactory cf) { } public static void setIoLayer(ConnectionFactory cf, String layer) { - if (isNio(layer)) { - cf.useNio(); - } else if (isNetty(layer)) { + if (isNetty(layer)) { cf.netty().enqueuingTimeout(Duration.ofSeconds(30)); } else if (isSocket(layer)) { cf.useBlockingIo(); diff --git a/src/test/java/com/rabbitmq/client/test/server/PersistenceGuarantees.java b/src/test/java/com/rabbitmq/client/test/server/PersistenceGuarantees.java index 8a085d598e..558346ed8f 100644 --- a/src/test/java/com/rabbitmq/client/test/server/PersistenceGuarantees.java +++ b/src/test/java/com/rabbitmq/client/test/server/PersistenceGuarantees.java @@ -19,7 +19,6 @@ import java.io.IOException; -import com.rabbitmq.client.impl.nio.NioParams; import org.junit.jupiter.api.Test; import com.rabbitmq.client.MessageProperties; @@ -29,15 +28,6 @@ public class PersistenceGuarantees extends BrokerTestCase { private static final int COUNT = 10000; private String queue; - @Override - protected NioParams nioParams() { - NioParams nioParams = super.nioParams(); - // may need a higher enqueuing timeout on slow environments - return nioParams - .setWriteEnqueuingTimeoutInMs(nioParams.getWriteEnqueuingTimeoutInMs() * 3) - .setWriteQueueCapacity(nioParams.getWriteQueueCapacity() * 2); - } - protected void declareQueue() throws IOException { queue = channel.queueDeclare("", true, false, false, null).getQueue(); } diff --git a/src/test/java/com/rabbitmq/client/test/ssl/HostnameVerification.java b/src/test/java/com/rabbitmq/client/test/ssl/HostnameVerification.java index 6cc6024949..6eab9c70ed 100644 --- a/src/test/java/com/rabbitmq/client/test/ssl/HostnameVerification.java +++ b/src/test/java/com/rabbitmq/client/test/ssl/HostnameVerification.java @@ -43,7 +43,6 @@ public class HostnameVerification { public static Consumer[] data() { return new Consumer[] { blockingIo(enableHostnameVerification()), - nio(enableHostnameVerification()), (Consumer) cf -> { try { cf.netty().sslContext(SslContextBuilder.forClient() @@ -65,13 +64,6 @@ private static Consumer blockingIo(final Consumer nio(final Consumer customizer) { - return connectionFactory -> { - connectionFactory.useNio(); - customizer.accept(connectionFactory); - }; - } - private static Consumer enableHostnameVerification() { return ConnectionFactory::enableHostnameVerification; } diff --git a/src/test/java/com/rabbitmq/client/test/ssl/NioTlsUnverifiedConnection.java b/src/test/java/com/rabbitmq/client/test/ssl/NioTlsUnverifiedConnection.java deleted file mode 100644 index 15c026c0f4..0000000000 --- a/src/test/java/com/rabbitmq/client/test/ssl/NioTlsUnverifiedConnection.java +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.test.ssl; - -import static com.rabbitmq.client.test.TestUtils.basicGetBasicConsume; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.TrustEverythingTrustManager; -import com.rabbitmq.client.impl.nio.NioParams; -import com.rabbitmq.client.test.BrokerTestCase; -import com.rabbitmq.client.test.TestUtils; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.TrustManager; -import org.junit.jupiter.api.Test; -import org.netcrusher.core.reactor.NioReactor; -import org.netcrusher.tcp.TcpCrusher; -import org.netcrusher.tcp.TcpCrusherBuilder; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class NioTlsUnverifiedConnection extends BrokerTestCase { - - public static final String QUEUE = "tls.nio.queue"; - - public void openConnection() - throws IOException, TimeoutException { - try { - connectionFactory.useSslProtocol(); - connectionFactory.useNio(); - } catch (Exception ex) { - throw new IOException(ex.toString()); - } - - int attempt = 0; - while(attempt < 3) { - try { - connection = connectionFactory.newConnection(); - break; - } catch(Exception e) { - LoggerFactory.getLogger(getClass()).warn("Error when opening TLS connection"); - attempt++; - } - } - if(connection == null) { - fail("Couldn't open TLS connection after 3 attempts"); - } - - } - - @Override - protected void releaseResources() throws IOException { - channel.queueDelete(QUEUE); - } - - @Test - public void connectionGetConsume() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - basicGetBasicConsume(connection, QUEUE, latch, 100 * 1000); - boolean messagesReceived = latch.await(5, TimeUnit.SECONDS); - assertTrue(messagesReceived, "Message has not been received"); - } - - @Test - public void connectionGetConsumeProtocols() throws Exception { - Collection availableProtocols = TlsTestUtils.availableTlsProtocols(); - Collection protocols = Stream.of("TLSv1.2", "TLSv1.3") - .filter(p -> availableProtocols.contains(p)) - .collect(Collectors.toList()); - for (String protocol : protocols) { - SSLContext sslContext = SSLContext.getInstance(protocol); - sslContext.init(null, new TrustManager[] {new TrustEverythingTrustManager()}, null); - ConnectionFactory cf = TestUtils.connectionFactory(); - cf.useSslProtocol(sslContext); - cf.useNio(); - AtomicReference engine = new AtomicReference<>(); - cf.setNioParams(new NioParams() - .setSslEngineConfigurator(sslEngine -> engine.set(sslEngine))); - try (Connection c = cf.newConnection()) { - CountDownLatch latch = new CountDownLatch(1); - basicGetBasicConsume(c, QUEUE, latch, 100); - boolean messagesReceived = latch.await(5, TimeUnit.SECONDS); - assertTrue(messagesReceived, "Message has not been received"); - assertThat(engine.get()).isNotNull(); - assertThat(engine.get().getEnabledProtocols()).contains(protocol); - } - } - } - - @Test public void socketChannelConfigurator() throws Exception { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.useNio(); - connectionFactory.useSslProtocol(); - NioParams nioParams = new NioParams(); - final AtomicBoolean sslEngineHasBeenCalled = new AtomicBoolean(false); - nioParams.setSslEngineConfigurator(sslEngine -> sslEngineHasBeenCalled.set(true)); - - connectionFactory.setNioParams(nioParams); - - Connection connection = null; - try { - connection = connectionFactory.newConnection(); - assertTrue(sslEngineHasBeenCalled.get(), "The SSL engine configurator should have called"); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - @Test public void messageSize() throws Exception { - int[] sizes = new int[]{100, 1000, 10 * 1000, 1 * 1000 * 1000, 5 * 1000 * 1000}; - for (int size : sizes) { - sendAndVerifyMessage(size); - } - } - - // The purpose of this test is to put some stress on client TLS layer (SslEngineByteBufferInputStream to be specific) - // in an attempt to trigger condition described in https://github.com/rabbitmq/rabbitmq-java-client/issues/317 - // Unfortunately it is not guaranteed to be reproducible - @Test public void largeMessagesTlsTraffic() throws Exception { - for (int i = 0; i < 50; i++) { - sendAndVerifyMessage(76390); - } - } - - @Test - public void connectionShouldEnforceConnectionTimeout() throws Exception { - int amqpPort = 5671; // assumes RabbitMQ server running on localhost; - int amqpProxyPort = TestUtils.randomNetworkPort(); - - int connectionTimeout = 3_000; - int handshakeTimeout = 1_000; - - try (NioReactor reactor = new NioReactor(); - TcpCrusher tcpProxy = - TcpCrusherBuilder.builder() - .withReactor(reactor) - .withBindAddress(new InetSocketAddress(amqpProxyPort)) - .withConnectAddress("localhost", amqpPort) - .build()) { - - tcpProxy.open(); - tcpProxy.freeze(); - - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); - factory.setPort(amqpProxyPort); - - factory.useSslProtocol(); - factory.useNio(); - - factory.setConnectionTimeout(connectionTimeout); - factory.setHandshakeTimeout(handshakeTimeout); - - ExecutorService executorService = Executors.newSingleThreadExecutor(); - try { - CountDownLatch latch = new CountDownLatch(1); - executorService.submit( - () -> { - try { - factory.newConnection(); - latch.countDown(); - } catch (SocketTimeoutException e) { - latch.countDown(); - } catch (Exception e) { - // not supposed to happen - } - }); - - boolean connectionCreatedTimedOut = latch.await(10, TimeUnit.SECONDS); - assertThat(connectionCreatedTimedOut).isTrue(); - - } finally { - executorService.shutdownNow(); - } - } - } - - private void sendAndVerifyMessage(int size) throws Exception { - CountDownLatch latch = new CountDownLatch(1); - boolean messageReceived = basicGetBasicConsume(connection, QUEUE, latch, size); - assertTrue(messageReceived, "Message has not been received"); - } - -} diff --git a/src/test/java/com/rabbitmq/client/test/ssl/SslContextFactoryTest.java b/src/test/java/com/rabbitmq/client/test/ssl/SslContextFactoryTest.java index 276d9d8571..1a9d0acc48 100644 --- a/src/test/java/com/rabbitmq/client/test/ssl/SslContextFactoryTest.java +++ b/src/test/java/com/rabbitmq/client/test/ssl/SslContextFactoryTest.java @@ -44,18 +44,10 @@ public class SslContextFactoryTest { .useBlockingIo() .setAutomaticRecoveryEnabled(true) ); - doTestSetSslContextFactory(() -> new ConnectionFactory() - .useNio() - .setAutomaticRecoveryEnabled(true) - ); doTestSetSslContextFactory(() -> new ConnectionFactory() .useBlockingIo() .setAutomaticRecoveryEnabled(false) ); - doTestSetSslContextFactory(() -> new ConnectionFactory() - .useNio() - .setAutomaticRecoveryEnabled(false) - ); doTestSetSslContextFactory(() -> { SslContextFactory sslContextFactory = sslContextFactory(); return new ConnectionFactory() diff --git a/src/test/java/com/rabbitmq/client/test/ssl/SslTestSuite.java b/src/test/java/com/rabbitmq/client/test/ssl/SslTestSuite.java index 9e02b883ce..4c389cb8d3 100644 --- a/src/test/java/com/rabbitmq/client/test/ssl/SslTestSuite.java +++ b/src/test/java/com/rabbitmq/client/test/ssl/SslTestSuite.java @@ -25,7 +25,6 @@ VerifiedConnection.class, BadVerifiedConnection.class, ConnectionFactoryDefaultTlsVersion.class, - NioTlsUnverifiedConnection.class, HostnameVerification.class, TlsConnectionLogging.class, SslContextFactoryTest.class diff --git a/src/test/java/com/rabbitmq/client/test/ssl/TlsConnectionLogging.java b/src/test/java/com/rabbitmq/client/test/ssl/TlsConnectionLogging.java index bb096c8287..6724e51db5 100644 --- a/src/test/java/com/rabbitmq/client/test/ssl/TlsConnectionLogging.java +++ b/src/test/java/com/rabbitmq/client/test/ssl/TlsConnectionLogging.java @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2019-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -18,17 +18,14 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.impl.TlsUtils; -import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.test.TestUtils; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import org.assertj.core.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import wiremock.org.apache.hc.core5.ssl.SSLContextBuilder; import javax.net.ssl.*; -import java.security.cert.X509Certificate; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -38,7 +35,7 @@ public class TlsConnectionLogging { public static Object[] certificateInfoAreProperlyExtracted() { - return new Object[]{blockingIo(), nio(), netty()}; + return new Object[]{blockingIo(), netty()}; } public static Function> blockingIo() { @@ -50,16 +47,6 @@ public static Function> blockingIo() { }; } - public static Function> nio() { - return connectionFactory -> { - connectionFactory.useNio(); - AtomicReference sslEngineCaptor = new AtomicReference<>(); - connectionFactory.setNioParams(new NioParams() - .setSslEngineConfigurator(sslEngine -> sslEngineCaptor.set(sslEngine))); - return () -> sslEngineCaptor.get().getSession(); - }; - } - public static Function> netty() { return connectionFactory -> { AtomicReference sslEngineCaptor = new AtomicReference<>(); diff --git a/src/test/java/com/rabbitmq/client/test/ssl/VerifiedConnection.java b/src/test/java/com/rabbitmq/client/test/ssl/VerifiedConnection.java index 7dd37a4bf8..5b52465f73 100644 --- a/src/test/java/com/rabbitmq/client/test/ssl/VerifiedConnection.java +++ b/src/test/java/com/rabbitmq/client/test/ssl/VerifiedConnection.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.impl.nio.NioParams; import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -60,13 +59,7 @@ public void connectionGetConsumeProtocols() throws Exception { cf.useSslProtocol(sslContext); TlsTestUtils.maybeConfigureNetty(cf, sslContext); AtomicReference> protocolsSupplier = new AtomicReference<>(); - if (TestUtils.isNio()) { - cf.useNio(); - cf.setNioParams(new NioParams() - .setSslEngineConfigurator(sslEngine -> { - protocolsSupplier.set(() -> sslEngine.getEnabledProtocols()); - })); - } else if (TestUtils.isSocket()) { + if (TestUtils.isSocket()) { cf.setSocketConfigurator(socket -> { SSLSocket s = (SSLSocket) socket; protocolsSupplier.set(s::getEnabledProtocols); diff --git a/src/test/resources/property-file-initialisation/configuration.properties b/src/test/resources/property-file-initialisation/configuration.properties index 7ec04b03fa..468dcc2435 100644 --- a/src/test/resources/property-file-initialisation/configuration.properties +++ b/src/test/resources/property-file-initialisation/configuration.properties @@ -16,10 +16,4 @@ rabbitmq.connection.recovery.enabled=false rabbitmq.topology.recovery.enabled=false rabbitmq.connection.recovery.interval=10000 rabbitmq.channel.rpc.timeout=10000 -rabbitmq.channel.should.check.rpc.response.type=true -rabbitmq.use.nio=true -rabbitmq.nio.read.byte.buffer.size=32000 -rabbitmq.nio.write.byte.buffer.size=32000 -rabbitmq.nio.nb.io.threads=2 -rabbitmq.nio.write.enqueuing.timeout.in.ms=5000 -rabbitmq.nio.write.queue.capacity=1000 +rabbitmq.channel.should.check.rpc.response.type=true \ No newline at end of file