diff --git a/src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java index 4fac19c12..d2f91c6c5 100644 --- a/src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java @@ -17,6 +17,9 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + /** Builder for {@link Connection} instances. */ public interface ConnectionBuilder extends ConnectionSettings { @@ -35,6 +38,22 @@ public interface ConnectionBuilder extends ConnectionSettings */ ConnectionBuilder listeners(Resource.StateListener... listeners); + /** + * Set the executor to use for incoming message delivery. + * + *

The executor is shared between the connection consumers. + * + *

By default, an {@link ExecutorService} with {@link Runtime#availableProcessors()} thread(s) + * is created for the connection. + * + *

It is the developer's responsibility to shut down the executor when it is no longer needed. + * + * @param executor executor for incoming message delivery + * @return this builder instance + * @see com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder#dispatchingExecutor(ExecutorService) + */ + ConnectionBuilder dispatchingExecutor(Executor executor); + /** * Create the connection instance. * diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index a3c984ea7..c550f7339 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -52,6 +52,8 @@ final class AmqpConnection extends ResourceBase implements Connection { + private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.AVAILABLE_PROCESSORS); + /** Connection-related issues */ private static final Predicate CONNECTION_EXCEPTION_PREDICATE = e -> e instanceof AmqpException.AmqpConnectionException; @@ -96,7 +98,9 @@ final class AmqpConnection extends ResourceBase implements Connection { private final String name; private final Lock instanceLock = new ReentrantLock(); private final boolean filterExpressionsSupported, setTokenSupported; - private volatile ExecutorService dispatchingExecutorService; + private volatile ConsumerWorkService consumerWorkService; + private volatile Executor dispatchingExecutor; + private final boolean privateDispatchingExecutor; private final CredentialsManager.Registration credentialsRegistration; AmqpConnection(AmqpConnectionBuilder builder) { @@ -116,6 +120,18 @@ final class AmqpConnection extends ResourceBase implements Connection { this.topologyListener = createTopologyListener(builder); + Executor de = + builder.dispatchingExecutor() == null + ? environment.dispatchingExecutorService() + : builder.dispatchingExecutor(); + + if (de == null) { + this.privateDispatchingExecutor = true; + } else { + this.privateDispatchingExecutor = false; + this.dispatchingExecutor = de; + } + if (recoveryConfiguration.activated()) { disconnectHandler = recoveryDisconnectHandler(recoveryConfiguration, this.name()); } else { @@ -680,22 +696,26 @@ ScheduledExecutorService scheduledExecutorService() { return this.environment.scheduledExecutorService(); } - ExecutorService dispatchingExecutorService() { + ConsumerWorkService consumerWorkService() { checkOpen(); - ExecutorService result = this.dispatchingExecutorService; + ConsumerWorkService result = this.consumerWorkService; if (result != null) { return result; } this.instanceLock.lock(); try { - if (this.dispatchingExecutorService == null) { - this.dispatchingExecutorService = - Executors.newSingleThreadExecutor( - Utils.threadFactory("dispatching-" + this.name() + "-")); + if (this.consumerWorkService == null) { + if (this.privateDispatchingExecutor) { + this.dispatchingExecutor = + Executors.newFixedThreadPool( + DEFAULT_NUM_THREADS, Utils.threadFactory("dispatching-" + this.name() + "-")); + } + this.consumerWorkService = + new WorkPoolConsumerWorkService(this.dispatchingExecutor, Duration.ZERO); } - return this.dispatchingExecutorService; + return this.consumerWorkService; } finally { this.instanceLock.unlock(); } @@ -852,16 +872,28 @@ private void close(Throwable cause) { } catch (InterruptedException e) { LOGGER.info("Interrupted while waiting for connection lock"); } + if (this.consumerWorkService != null) { + try { + this.consumerWorkService.close(); + } catch (Exception e) { + LOGGER.info( + "Error while closing consumer work service for connection '{}': {}", + this.name(), + e.getMessage()); + } + } try { - ExecutorService es = this.dispatchingExecutorService; - if (es != null) { - try { - es.shutdownNow(); - } catch (Exception e) { - LOGGER.info( - "Error while shutting down dispatching executor service for connection '{}': {}", - this.name(), - e.getMessage()); + if (this.privateDispatchingExecutor) { + Executor es = this.dispatchingExecutor; + if (es != null) { + try { + ((ExecutorService) es).shutdownNow(); + } catch (Exception e) { + LOGGER.info( + "Error while shutting down dispatching executor service for connection '{}': {}", + this.name(), + e.getMessage()); + } } } try { diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java index e27fefd6d..22d98c4d2 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; class AmqpConnectionBuilder implements ConnectionBuilder { @@ -31,6 +32,7 @@ class AmqpConnectionBuilder implements ConnectionBuilder { private final DefaultConnectionSettings connectionSettings = new AmqpConnectionBuilderConnectionSettings(this); private final List listeners = new ArrayList<>(); + private Executor dispatchingExecutor; private String name; private TopologyListener topologyListener; private boolean isolateResources = false; @@ -120,6 +122,12 @@ public ConnectionBuilder listeners(Resource.StateListener... listeners) { return this; } + @Override + public ConnectionBuilder dispatchingExecutor(Executor executor) { + this.dispatchingExecutor = executor; + return this; + } + @Override public RecoveryConfiguration recovery() { this.recoveryConfiguration.activated(true); @@ -135,6 +143,10 @@ boolean isolateResources() { return isolateResources; } + Executor dispatchingExecutor() { + return this.dispatchingExecutor; + } + @Override public Connection build() { return this.environment.connection(this); @@ -147,6 +159,7 @@ void copyTo(AmqpConnectionBuilder copy) { copy.name(this.name); copy.topologyListener(this.topologyListener); copy.isolateResources(this.isolateResources); + copy.dispatchingExecutor(this.dispatchingExecutor); } AmqpConnectionBuilder name(String name) { diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 5b1b73425..1d148dcba 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -75,9 +75,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer { private final SessionHandler sessionHandler; private final AtomicLong unsettledMessageCount = new AtomicLong(0); private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded; - private final ExecutorService dispatchingExecutorService; private final java.util.function.Consumer nativeHandler; private final java.util.function.Consumer nativeCloseHandler; + private final ConsumerWorkService consumerWorkService; // native receiver internal state, accessed only in the native executor/scheduler private ProtonReceiver protonReceiver; private volatile Scheduler protonExecutor; @@ -104,16 +104,19 @@ final class AmqpConsumer extends ResourceBase implements Consumer { ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER); this.connection = builder.connection(); this.sessionHandler = this.connection.createSessionHandler(); - - this.dispatchingExecutorService = connection.dispatchingExecutorService(); this.nativeHandler = createNativeHandler(messageHandler); this.nativeCloseHandler = - e -> - this.dispatchingExecutorService.submit( - () -> { - // get result to make spotbugs happy - boolean ignored = maybeCloseConsumerOnException(this, e); - }); + e -> { + this.connection + .consumerWorkService() + .dispatch( + () -> { + // get result to make spotbugs happy + boolean ignored = maybeCloseConsumerOnException(this, e); + }); + }; + this.consumerWorkService = connection.consumerWorkService(); + this.consumerWorkService.register(this); this.nativeReceiver = this.createNativeReceiver( this.sessionHandler.session(), @@ -233,33 +236,36 @@ private ClientReceiver createNativeReceiver( private java.util.function.Consumer createNativeHandler(MessageHandler handler) { return delivery -> { - this.unsettledMessageCount.incrementAndGet(); - this.metricsCollector.consume(); - this.dispatchingExecutorService.submit( - () -> { - AmqpMessage message; - try { - message = new AmqpMessage(delivery.message()); - } catch (ClientException e) { - LOGGER.warn("Error while decoding message: {}", e.getMessage()); + if (this.state() == OPEN) { + this.unsettledMessageCount.incrementAndGet(); + this.metricsCollector.consume(); + this.consumerWorkService.dispatch( + this, + () -> { + AmqpMessage message; try { - delivery.disposition(DeliveryState.rejected("", ""), true); - } catch (ClientException ex) { - LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage()); + message = new AmqpMessage(delivery.message()); + } catch (ClientException e) { + LOGGER.warn("Error while decoding message: {}", e.getMessage()); + try { + delivery.disposition(DeliveryState.rejected("", ""), true); + } catch (ClientException ex) { + LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage()); + } + return; } - return; - } - Consumer.Context context = - new DeliveryContext( - delivery, - this.protonExecutor, - this.protonReceiver, - this.metricsCollector, - this.unsettledMessageCount, - this.replenishCreditOperation, - this); - handler.handle(context, message); - }); + Consumer.Context context = + new DeliveryContext( + delivery, + this.protonExecutor, + this.protonReceiver, + this.metricsCollector, + this.unsettledMessageCount, + this.replenishCreditOperation, + this); + handler.handle(context, message); + }); + } }; } @@ -296,6 +302,9 @@ void recoverAfterConnectionFailure() { void close(Throwable cause) { if (this.closed.compareAndSet(false, true)) { this.state(CLOSING, cause); + if (this.consumerWorkService != null) { + this.consumerWorkService.unregister(this); + } this.connection.removeConsumer(this); try { if (this.nativeReceiver != null) { @@ -305,9 +314,11 @@ void close(Throwable cause) { } catch (Exception e) { LOGGER.warn("Error while closing receiver", e); } - this.state(CLOSED, cause); - this.metricsCollector.closeConsumer(); + MetricsCollector mc = this.metricsCollector; + if (mc != null) { + mc.closeConsumer(); + } } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java index 1a111f6c6..58b5f72b8 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java @@ -43,6 +43,7 @@ class AmqpEnvironment implements Environment { private final boolean internalPublisherExecutor; private final ExecutorService executorService; private final ScheduledExecutorService scheduledExecutorService; + private final Executor dispatchingExecutorService; private final ExecutorService publisherExecutorService; private final ConnectionManager connectionManager = new ConnectionManager(this); private final long id; @@ -60,6 +61,7 @@ class AmqpEnvironment implements Environment { AmqpEnvironment( ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, + Executor dispatchingExecutorService, ExecutorService publisherExecutorService, DefaultConnectionSettings connectionSettings, MetricsCollector metricsCollector, @@ -86,6 +88,7 @@ class AmqpEnvironment implements Environment { this.scheduledExecutorService = scheduledExecutorService; this.internalScheduledExecutor = false; } + this.dispatchingExecutorService = dispatchingExecutorService; if (publisherExecutorService == null) { this.publisherExecutorService = Utils.executorService(threadPrefix); this.internalPublisherExecutor = true; @@ -163,6 +166,10 @@ ExecutorService executorService() { return this.executorService; } + Executor dispatchingExecutorService() { + return this.dispatchingExecutorService; + } + ExecutorService publisherExecutorService() { return this.publisherExecutorService; } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironmentBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironmentBuilder.java index 892887b7d..e612f020a 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironmentBuilder.java @@ -17,13 +17,11 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp.impl; -import com.rabbitmq.client.amqp.ConnectionSettings; -import com.rabbitmq.client.amqp.Environment; -import com.rabbitmq.client.amqp.EnvironmentBuilder; -import com.rabbitmq.client.amqp.ObservationCollector; +import com.rabbitmq.client.amqp.*; import com.rabbitmq.client.amqp.metrics.MetricsCollector; import com.rabbitmq.client.amqp.metrics.NoOpMetricsCollector; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -34,6 +32,7 @@ public class AmqpEnvironmentBuilder implements EnvironmentBuilder { new DefaultEnvironmentConnectionSettings(this); private ExecutorService executorService; private ScheduledExecutorService scheduledExecutorService; + private Executor dispatchingExecutor; private ExecutorService publisherExecutorService; private MetricsCollector metricsCollector = NoOpMetricsCollector.INSTANCE; private ObservationCollector observationCollector = Utils.NO_OP_OBSERVATION_COLLECTOR; @@ -53,6 +52,24 @@ public AmqpEnvironmentBuilder executorService(ExecutorService executorService) { return this; } + /** + * Set the shared executor to use for incoming message delivery in this environment instance + * connections. + * + *

There is no shared executor by default, each connection uses its own, see {@link + * ConnectionBuilder#dispatchingExecutor(Executor)}. + * + *

It is the developer's responsibility to shut down the executor when it is no longer needed. + * + * @param executor the executor for incoming message delivery + * @return this builder instance + * @see ConnectionBuilder#dispatchingExecutor(Executor) + */ + public AmqpEnvironmentBuilder dispatchingExecutor(Executor executor) { + this.dispatchingExecutor = executor; + return this; + } + /** * Set scheduled executor service used for internal tasks (e.g. connection recovery). * @@ -145,6 +162,7 @@ public Environment build() { return new AmqpEnvironment( executorService, scheduledExecutorService, + dispatchingExecutor, publisherExecutorService, connectionSettings, metricsCollector, diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java index 629fded0b..e8c72b202 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java @@ -55,7 +55,6 @@ final class AmqpPublisher extends ResourceBase implements Publisher { private final Duration publishTimeout; private final SessionHandler sessionHandler; private volatile ObservationCollector.ConnectionInfo connectionInfo; - private final ExecutorService dispatchingExecutorService; private final java.util.function.Consumer nativeCloseHandler; AmqpPublisher(AmqpPublisherBuilder builder) { @@ -67,14 +66,16 @@ final class AmqpPublisher extends ResourceBase implements Publisher { this.connection = builder.connection(); this.publishTimeout = builder.publishTimeout(); this.sessionHandler = this.connection.createSessionHandler(); - this.dispatchingExecutorService = connection.dispatchingExecutorService(); this.nativeCloseHandler = - e -> - this.dispatchingExecutorService.submit( - () -> { - // get result to make spotbugs happy - boolean ignored = maybeCloseConsumerOnException(this, e); - }); + e -> { + this.connection + .consumerWorkService() + .dispatch( + () -> { + // get result to make spotbugs happy + boolean ignored = maybeCloseConsumerOnException(this, e); + }); + }; this.sender = this.createSender( sessionHandler.session(), this.address, this.publishTimeout, this.nativeCloseHandler); @@ -215,7 +216,10 @@ void close(Throwable cause) { this.sessionHandler, e -> LOGGER.info("Error while closing publisher session handler", e)); this.state(State.CLOSED, cause); - this.metricsCollector.closePublisher(); + MetricsCollector mc = this.metricsCollector; + if (mc != null) { + mc.closePublisher(); + } } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/ConsumerWorkService.java b/src/main/java/com/rabbitmq/client/amqp/impl/ConsumerWorkService.java new file mode 100644 index 000000000..91371577f --- /dev/null +++ b/src/main/java/com/rabbitmq/client/amqp/impl/ConsumerWorkService.java @@ -0,0 +1,31 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +interface ConsumerWorkService extends AutoCloseable { + + void register(AmqpConsumer consumer); + + void dispatch(AmqpConsumer consumer, Runnable runnable); + + void unregister(AmqpConsumer consumer); + + void dispatch(Runnable runnable); + + void close(); +} diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java index 8613af94c..10304103a 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java @@ -37,6 +37,8 @@ final class Utils { + static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); + static final Supplier NAME_SUPPLIER = new NameSupplier("client.gen-"); private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/WorkPool.java b/src/main/java/com/rabbitmq/client/amqp/impl/WorkPool.java new file mode 100644 index 000000000..59293fe7d --- /dev/null +++ b/src/main/java/com/rabbitmq/client/amqp/impl/WorkPool.java @@ -0,0 +1,383 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; + +/** + * This is based on the WorkPool class from RabbitMQ AMQP 091 Java client library. The + * source code is available here. + * + *

This is a generic implementation of the channels specification in Channeling Work, Nov + * 2010 (channels.pdf). Objects of type K must be registered, with + * registerKey(K), and then they become clients and a queue of items (type + * W) is stored for each client. Each client has a state which is exactly one of + * dormant, in progress or ready. Immediately after registration a client is + * dormant. Items may be (singly) added to (the end of) a client's queue with {@link + * WorkPool#addWorkItem(Object, Object)}. If the client is dormant it becomes ready + * thereby. All other states remain unchanged. The next ready client, together with a + * collection of its items, may be retrieved with nextWorkBlock(collection,max) + * (making that client in progress). An in progress client can finish (processing a + * batch of items) with finishWorkBlock(K). It then becomes either + * dormant or ready, depending if its queue of work items is empty or no. If a client + * has items queued, it is either in progress or ready but cannot be both. When work + * is finished it may be marked ready if there is further work, or dormant if there is + * not. There is never any work for a dormant client. A client may be unregistered, with + * unregisterKey(K), which removes the client from all parts of the state, and + * any queue of items stored with it. All clients may be unregistered with + * unregisterAllKeys(). + * + *

Concurrent Semantics

+ * + * This implementation is thread-safe. + * + * @param Key -- type of client + * @param Work -- type of work item + */ +final class WorkPool { + private static final int MAX_QUEUE_LENGTH = 1000; + + /** An injective queue of ready clients. */ + private final SetQueue ready = new SetQueue<>(); + + /** The set of clients which have work in progress. */ + private final Set inProgress = new HashSet<>(); + + /** The pool of registered clients, with their work queues. */ + private final Map> pool = new HashMap<>(); + + private final BiConsumer, W> enqueueingCallback; + private final Lock lock = new ReentrantLock(); + + public WorkPool(Duration queueingTimeout) { + if (queueingTimeout.toNanos() > 0) { + long timeout = queueingTimeout.toMillis(); + this.enqueueingCallback = + (queue, item) -> { + try { + boolean offered = queue.offer(item, timeout, TimeUnit.MILLISECONDS); + if (!offered) { + throw new WorkPoolFullException( + "Could not enqueue in work pool after " + queueingTimeout + " ms."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + } else { + this.enqueueingCallback = + (queue, item) -> { + try { + queue.put(item); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + } + } + + /** + * Add client key to pool of item queues, with an empty queue. A client is + * initially dormant. No-op if key already present. + * + * @param key client to add to pool + */ + public void registerKey(K key) { + this.lock.lock(); + try { + if (!this.pool.containsKey(key)) { + this.pool.put(key, new LinkedBlockingQueue<>(MAX_QUEUE_LENGTH)); + } + } finally { + this.lock.unlock(); + } + } + + /** + * Remove client from pool and from any other state. Has no effect if client already absent. + * + * @param key of client to unregister + */ + public void unregisterKey(K key) { + this.lock.lock(); + try { + this.pool.remove(key); + this.ready.remove(key); + this.inProgress.remove(key); + } finally { + this.lock.unlock(); + } + } + + /** Remove all clients from pool and from any other state. */ + public void unregisterAllKeys() { + this.lock.lock(); + try { + this.pool.clear(); + this.ready.clear(); + this.inProgress.clear(); + } finally { + this.lock.unlock(); + } + } + + /** + * Return the next ready client, and transfer a collection of that client's items to + * process. Mark client in progress. If there is no ready client, return + * null. + * + * @param to collection object in which to transfer items + * @param size max number of items to transfer + * @return key of client to whom items belong, or null if there is none. + */ + public K nextWorkBlock(Collection to, int size) { + this.lock.lock(); + try { + K nextKey = readyToInProgress(); + if (nextKey != null) { + LinkedBlockingQueue queue = this.pool.get(nextKey); + drainTo(queue, to, size); + } + return nextKey; + } finally { + this.lock.unlock(); + } + } + + /** + * Private implementation of drainTo (not implemented for + * LinkedList<W>s). + * + * @param deList to take (poll) elements from + * @param c to add elements to + * @param maxElements to take from deList + * @return number of elements actually taken + */ + private int drainTo(LinkedBlockingQueue deList, Collection c, int maxElements) { + int n = 0; + while (n < maxElements) { + W first = deList.poll(); + if (first == null) break; + c.add(first); + ++n; + } + return n; + } + + /** + * Add (enqueue) an item for a specific client. No change and returns false if + * client not registered. If dormant, the client will be marked ready. + * + * @param key the client to add to the work item to + * @param item the work item to add to the client queue + * @return true if and only if the client is marked readyas + * a result of this work item + */ + public boolean addWorkItem(K key, W item) { + LinkedBlockingQueue queue; + this.lock.lock(); + try { + queue = this.pool.get(key); + } finally { + this.lock.unlock(); + } + // The put operation may block. We need to make sure we are not holding the lock while that + // happens. + if (queue != null) { + enqueueingCallback.accept(queue, item); + + this.lock.lock(); + try { + if (isDormant(key)) { + dormantToReady(key); + return true; + } + } finally { + this.lock.unlock(); + } + } + return false; + } + + /** + * Set client no longer in progress. Ignore unknown clients (and return false + * ). + * + * @param key client that has finished work + * @return true if and only if client becomes ready + * @throws IllegalStateException if registered client not in progress + */ + public boolean finishWorkBlock(K key) { + this.lock.lock(); + try { + if (!this.isRegistered(key)) return false; + if (!this.inProgress.contains(key)) { + throw new IllegalStateException("Client " + key + " not in progress"); + } + + if (moreWorkItems(key)) { + inProgressToReady(key); + return true; + } else { + inProgressToDormant(key); + return false; + } + } finally { + this.lock.unlock(); + } + } + + private boolean moreWorkItems(K key) { + LinkedBlockingQueue leList = this.pool.get(key); + return leList != null && !leList.isEmpty(); + } + + /* State identification functions */ + private boolean isInProgress(K key) { + return this.inProgress.contains(key); + } + + private boolean isReady(K key) { + return this.ready.contains(key); + } + + private boolean isRegistered(K key) { + return this.pool.containsKey(key); + } + + private boolean isDormant(K key) { + return !isInProgress(key) && !isReady(key) && isRegistered(key); + } + + /* State transition methods - all assume key registered */ + private void inProgressToReady(K key) { + this.inProgress.remove(key); + this.ready.addIfNotPresent(key); + } + + private void inProgressToDormant(K key) { + this.inProgress.remove(key); + } + + private void dormantToReady(K key) { + this.ready.addIfNotPresent(key); + } + + /* Basic work selector and state transition step */ + private K readyToInProgress() { + K key = this.ready.poll(); + if (key != null) { + this.inProgress.add(key); + } + return key; + } + + /** + * A generic queue-like implementation (supporting operations addIfNotPresent, + * poll, contains, and isEmpty) which restricts a queue element + * to appear at most once. If the element is already present {@link #addIfNotPresent} returns + * false. Elements must not be null. + * + *

Concurrent Semantics

+ * + * This implementation is not thread-safe. + * + * @param type of elements in the queue + */ + private static final class SetQueue { + private final Set members = new HashSet(); + private final Queue queue = new LinkedList(); + + /** + * Add an element to the back of the queue and return true, or else return + * false. + * + * @param item to add + * @return true if the element was added, false if it is + * already present. + */ + public boolean addIfNotPresent(T item) { + if (this.members.contains(item)) { + return false; + } + this.members.add(item); + this.queue.offer(item); + return true; + } + + /** + * Remove the head of the queue and return it. + * + * @return head element of the queue, or null if the queue is empty. + */ + public T poll() { + T item = this.queue.poll(); + if (item != null) { + this.members.remove(item); + } + return item; + } + + /** + * @param item to look for in queue + * @return true if and only if item is in the queue. + */ + public boolean contains(T item) { + return this.members.contains(item); + } + + /** + * @return true if and only if the queue is empty. + */ + public boolean isEmpty() { + return this.members.isEmpty(); + } + + /** + * Remove item from queue, if present. + * + * @param item to remove + * @return true if and only if item was initially present and was removed. + */ + public boolean remove(T item) { + this.queue.remove(item); // there can only be one such item in the queue + return this.members.remove(item); + } + + /** Remove all items from the queue. */ + public void clear() { + this.queue.clear(); + this.members.clear(); + } + } + + /** Exception thrown when {@link WorkPool} enqueueing times out. */ + static class WorkPoolFullException extends RuntimeException { + + public WorkPoolFullException(String msg) { + super(msg); + } + } +} diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/WorkPoolConsumerWorkService.java b/src/main/java/com/rabbitmq/client/amqp/impl/WorkPoolConsumerWorkService.java new file mode 100644 index 000000000..37b42c177 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/amqp/impl/WorkPoolConsumerWorkService.java @@ -0,0 +1,89 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +final class WorkPoolConsumerWorkService implements ConsumerWorkService { + + private static final int MAX_RUNNABLE_BLOCK_SIZE = 256; + + private final Executor executor; + private final WorkPool workPool; + + WorkPoolConsumerWorkService(Executor executorService, Duration queueingTimeout) { + this.executor = executorService; + this.workPool = new WorkPool<>(queueingTimeout); + } + + @Override + public void dispatch(AmqpConsumer consumer, Runnable runnable) { + if (this.workPool.addWorkItem(consumer, runnable)) { + this.executor.execute(new WorkPoolRunnable()); + } + } + + @Override + public void dispatch(Runnable runnable) { + this.executor.execute(runnable); + } + + @Override + public void register(AmqpConsumer consumer) { + this.workPool.registerKey(consumer); + } + + @Override + public void unregister(AmqpConsumer consumer) { + this.workPool.unregisterKey(consumer); + } + + @Override + public void close() { + this.workPool.unregisterAllKeys(); + } + + private final class WorkPoolRunnable implements Runnable { + + @Override + public void run() { + int size = MAX_RUNNABLE_BLOCK_SIZE; + List block = new ArrayList(size); + try { + AmqpConsumer key = WorkPoolConsumerWorkService.this.workPool.nextWorkBlock(block, size); + if (key == null) { + return; // nothing ready to run + } + try { + for (Runnable runnable : block) { + runnable.run(); + } + } finally { + if (WorkPoolConsumerWorkService.this.workPool.finishWorkBlock(key)) { + WorkPoolConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); + } + } + } catch (RuntimeException e) { + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java index eba8a438e..8bcff6885 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java @@ -59,6 +59,7 @@ static void initAll() { null, null, null, + null, connectionSettings, NoOpMetricsCollector.INSTANCE, Utils.NO_OP_OBSERVATION_COLLECTOR); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java index 76fdc471b..02c3a831b 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java @@ -24,9 +24,11 @@ import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3; import static com.rabbitmq.client.amqp.impl.TestUtils.*; +import static com.rabbitmq.client.amqp.impl.Utils.threadFactory; import static java.nio.charset.StandardCharsets.*; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.stream.IntStream.range; import static java.util.stream.Stream.of; import static org.assertj.core.api.Assertions.*; @@ -38,8 +40,10 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.IntStream; import org.junit.jupiter.api.*; @@ -53,6 +57,28 @@ public class AmqpTest { Connection connection; String name; + private static String uuid() { + return UUID.randomUUID().toString(); + } + + private static Resource.StateListener closedListener( + Sync sync, Consumer callback) { + return context -> { + if (context.currentState() == Resource.State.CLOSED) { + callback.accept(context); + sync.down(); + } + }; + } + + private static Publisher.Callback acceptedCallback(Sync sync) { + return ctx -> { + if (ctx.status() == Publisher.Status.ACCEPTED) { + sync.down(); + } + }; + } + @BeforeEach void init(TestInfo info) { this.name = TestUtils.name(info); @@ -769,25 +795,43 @@ void types(TestInfo info) { assertThat(message.get()).hasProperty("key1", -1L); } - private static String uuid() { - return UUID.randomUUID().toString(); - } + @Test + void messagesAreDispatchedToEnvironmentConnectionExecutorService() { + String envPrefix = "env-"; + String connPrefix = "conn-"; + ExecutorService envExecutor = newSingleThreadExecutor(threadFactory(envPrefix)); + ExecutorService connExecutor = newSingleThreadExecutor(threadFactory(connPrefix)); + Environment env = TestUtils.environmentBuilder().dispatchingExecutor(envExecutor).build(); + try { + BiConsumer operation = + (c, prefix) -> { + String q = c.management().queue().exclusive(true).declare().name(); + Publisher p = c.publisherBuilder().queue(q).build(); + p.publish(p.message(), ctx -> {}); + Sync sync = sync(); + AtomicReference threadName = new AtomicReference<>(); + c.consumerBuilder() + .queue(q) + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + threadName.set(Thread.currentThread().getName()); + sync.down(); + }) + .build(); + assertThat(sync).completes(); + org.assertj.core.api.Assertions.assertThat(threadName.get()).startsWith(prefix); + }; - private static Resource.StateListener closedListener( - Sync sync, Consumer callback) { - return context -> { - if (context.currentState() == Resource.State.CLOSED) { - callback.accept(context); - sync.down(); - } - }; - } + Connection c1 = env.connectionBuilder().build(); + Connection c2 = env.connectionBuilder().dispatchingExecutor(connExecutor).build(); - private static Publisher.Callback acceptedCallback(Sync sync) { - return ctx -> { - if (ctx.status() == Publisher.Status.ACCEPTED) { - sync.down(); - } - }; + operation.accept(c1, envPrefix); + operation.accept(c2, connPrefix); + } finally { + env.close(); + envExecutor.shutdown(); + connExecutor.shutdown(); + } } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java index 91f77ceba..957bb9536 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java @@ -44,7 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -@TestUtils.DisabledIfNotCluster +@DisabledIfNotCluster public class ClusterTest { static final String[] URIS = diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Oauth2Test.java b/src/test/java/com/rabbitmq/client/amqp/impl/Oauth2Test.java index aa2db0437..17c16d3f4 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/Oauth2Test.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/Oauth2Test.java @@ -230,6 +230,7 @@ void tokenShouldBeRefreshedAutomatically(boolean shared, TestInfo info) throws E } @Test + @BrokerVersionAtLeast(RABBITMQ_4_1_0) void tokenOnHttpsShouldBeRefreshed(TestInfo info) throws Exception { KeyStore keyStore = generateKeyPair(); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java index e123817c4..d8f42bc05 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java @@ -35,6 +35,8 @@ import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -62,6 +64,7 @@ public class RecoveryClusterTest { static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = fixedWithInitialDelay(RECOVERY_INITIAL_DELAY, RECOVERY_DELAY); static List nodes; + ExecutorService dispatchingExecutorService; Environment environment; AmqpConnection connection; Management management; @@ -75,8 +78,15 @@ static void initAll() { @BeforeEach void init(TestInfo info) { + dispatchingExecutorService = + Executors.newSingleThreadExecutor(Utils.threadFactory("env-dispatching-executor-service-")); environment = - new AmqpEnvironmentBuilder().connectionSettings().uris(URIS).environmentBuilder().build(); + new AmqpEnvironmentBuilder() + .dispatchingExecutor(dispatchingExecutorService) + .connectionSettings() + .uris(URIS) + .environmentBuilder() + .build(); this.connection = connection(b -> b.name("c-management").recovery().connectionBuilder()); this.management = connection.management(); this.testInfo = info; @@ -85,6 +95,7 @@ void init(TestInfo info) { @AfterEach void tearDown() { environment.close(); + dispatchingExecutorService.shutdown(); } private static class QueueConfiguration { diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/WorkPoolTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/WorkPoolTest.java new file mode 100644 index 000000000..97e059c83 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/amqp/impl/WorkPoolTest.java @@ -0,0 +1,165 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link WorkPool} */ +public class WorkPoolTest { + + private final WorkPool pool = new WorkPool<>(Duration.ZERO); + + /** Test unknown key tolerated silently */ + @Test + public void unknownKey() { + assertFalse(this.pool.addWorkItem("test", new Object())); + } + + /** Test add work and remove work */ + @Test + public void basicInOut() { + Object one = new Object(); + Object two = new Object(); + + this.pool.registerKey("test"); + assertTrue(this.pool.addWorkItem("test", one)); + assertFalse(this.pool.addWorkItem("test", two)); + + List workList = new ArrayList(16); + String key = this.pool.nextWorkBlock(workList, 1); + assertEquals("test", key); + assertEquals(1, workList.size()); + assertEquals(one, workList.get(0)); + + assertTrue(this.pool.finishWorkBlock(key), "Should be made ready"); + + workList.clear(); + key = this.pool.nextWorkBlock(workList, 1); + assertEquals("test", key, "Work client key wrong"); + assertEquals(two, workList.get(0), "Wrong work delivered"); + + assertFalse(this.pool.finishWorkBlock(key), "Should not be made ready after this."); + assertNull(this.pool.nextWorkBlock(workList, 1), "Shouldn't be more work"); + } + + /** Test add work when work in progress. */ + @Test + public void workInWhileInProgress() { + Object one = new Object(); + Object two = new Object(); + + this.pool.registerKey("test"); + assertTrue(this.pool.addWorkItem("test", one)); + + List workList = new ArrayList(16); + String key = this.pool.nextWorkBlock(workList, 1); + assertEquals("test", key); + assertEquals(1, workList.size()); + assertEquals(one, workList.get(0)); + + assertFalse(this.pool.addWorkItem("test", two)); + + assertTrue(this.pool.finishWorkBlock(key)); + + workList.clear(); + key = this.pool.nextWorkBlock(workList, 1); + assertEquals("test", key); + assertEquals(1, workList.size()); + assertEquals(two, workList.get(0)); + } + + /** Test multiple work keys. */ + @Test + public void interleavingKeys() { + Object one = new Object(); + Object two = new Object(); + Object three = new Object(); + + this.pool.registerKey("test1"); + this.pool.registerKey("test2"); + + assertTrue(this.pool.addWorkItem("test1", one)); + assertTrue(this.pool.addWorkItem("test2", two)); + assertFalse(this.pool.addWorkItem("test1", three)); + + List workList = new ArrayList(16); + String key = this.pool.nextWorkBlock(workList, 3); + assertEquals("test1", key); + assertEquals(2, workList.size()); + assertEquals(one, workList.get(0)); + assertEquals(three, workList.get(1)); + + workList.clear(); + + key = this.pool.nextWorkBlock(workList, 2); + assertEquals("test2", key); + assertEquals(1, workList.size()); + assertEquals(two, workList.get(0)); + } + + /** Test removal of key (with work) */ + @Test + public void unregisterKey() { + Object one = new Object(); + Object two = new Object(); + Object three = new Object(); + + this.pool.registerKey("test1"); + this.pool.registerKey("test2"); + + assertTrue(this.pool.addWorkItem("test1", one)); + assertTrue(this.pool.addWorkItem("test2", two)); + assertFalse(this.pool.addWorkItem("test1", three)); + + this.pool.unregisterKey("test1"); + + List workList = new ArrayList(16); + String key = this.pool.nextWorkBlock(workList, 3); + assertEquals("test2", key); + assertEquals(1, workList.size()); + assertEquals(two, workList.get(0)); + } + + /** Test removal of all keys (with work). */ + @Test + public void unregisterAllKeys() { + Object one = new Object(); + Object two = new Object(); + Object three = new Object(); + + this.pool.registerKey("test1"); + this.pool.registerKey("test2"); + + assertTrue(this.pool.addWorkItem("test1", one)); + assertTrue(this.pool.addWorkItem("test2", two)); + assertFalse(this.pool.addWorkItem("test1", three)); + + this.pool.unregisterAllKeys(); + + List workList = new ArrayList(16); + assertNull(this.pool.nextWorkBlock(workList, 1)); + } +}