From 4e2025f355ce61a28949979e7958a7a90d20f44b Mon Sep 17 00:00:00 2001 From: Cassio Fiuza Date: Wed, 24 Dec 2025 16:32:04 -0300 Subject: [PATCH 1/4] feat(rabbitmq): share connections with context safety and config guard --- .../messaging/rabbitmq/ClientHolder.java | 99 +++++++--- .../rabbitmq/IncomingRabbitMQMessage.java | 15 +- .../messaging/rabbitmq/RabbitMQConnector.java | 150 ++++++++++++-- .../rabbitmq/i18n/RabbitMQExceptions.java | 3 + .../internals/IncomingRabbitMQChannel.java | 23 ++- .../internals/OutgoingRabbitMQChannel.java | 4 +- .../internals/RabbitMQClientHelper.java | 172 ++++++++++++++-- .../rabbitmq/IncomingContextBean.java | 46 +++++ .../messaging/rabbitmq/RabbitMQTest.java | 186 ++++++++++++++++++ .../messaging/rabbitmq/RabbitMQUsage.java | 20 ++ 10 files changed, 638 insertions(+), 80 deletions(-) create mode 100644 smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index 5c9ef286a2..30e8a721f8 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -22,46 +22,20 @@ public class ClientHolder { private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicReference connectionHolder = new AtomicReference<>(); - private final Uni connection; + private final AtomicReference rootContext; + private final AtomicReference> connectionStage = new AtomicReference<>(); private final Vertx vertx; + private final RabbitMQConnectorCommonConfiguration configuration; public ClientHolder(RabbitMQClient client, RabbitMQConnectorCommonConfiguration configuration, Vertx vertx, Context root) { this.client = client; + this.configuration = configuration; this.vertx = vertx; - this.connection = Uni.createFrom().deferred(() -> client.start() - .onSubscription().invoke(() -> { - connected.set(true); - log.connectionEstablished(configuration.getChannel()); - }) - .onItem().transform(ignored -> { - connectionHolder - .set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root)); - - // handle the case we are already disconnected. - if (!client.isConnected() || connectionHolder.get() == null) { - // Throwing the exception would trigger a retry. - connectionHolder.set(null); - throw ex.illegalStateConnectionDisconnected(); - } - return client; - }) - .onFailure().invoke(log::unableToConnectToBroker) - .onFailure().invoke(t -> { - connectionHolder.set(null); - log.unableToRecoverFromConnectionDisruption(t); - })) - .memoize().until(() -> { - CurrentConnection connection = connectionHolder.get(); - if (connection == null) { - return true; - } - return !connection.client.isConnected(); - }); - + this.rootContext = new AtomicReference<>(root); } public static CompletionStage runOnContext(Context context, IncomingRabbitMQMessage msg, @@ -89,6 +63,17 @@ public Context getContext() { } } + public void ensureContext(Context context) { + if (context == null) { + return; + } + rootContext.compareAndSet(null, context); + CurrentConnection connection = connectionHolder.get(); + if (connection != null && connection.context == null) { + connectionHolder.compareAndSet(connection, new CurrentConnection(connection.client, context)); + } + } + public RabbitMQClient client() { return client; } @@ -112,7 +97,29 @@ public Vertx getVertx() { @CheckReturnValue public Uni getOrEstablishConnection() { - return connection; + CompletionStage existing = connectionStage.get(); + if (existing != null) { + if (!existing.toCompletableFuture().isDone() || client.isConnected()) { + return Uni.createFrom().completionStage(existing); + } + connectionStage.compareAndSet(existing, null); + } + + for (;;) { + CompletionStage current = connectionStage.get(); + if (current != null) { + return Uni.createFrom().completionStage(current); + } + CompletionStage created = createConnectionUni().subscribeAsCompletionStage(); + if (connectionStage.compareAndSet(null, created)) { + created.whenComplete((result, error) -> { + if (error != null) { + connectionStage.compareAndSet(created, null); + } + }); + return Uni.createFrom().completionStage(created); + } + } } private static class CurrentConnection { @@ -126,4 +133,32 @@ private CurrentConnection(RabbitMQClient client, Context context) { } } + private Uni createConnectionUni() { + return Uni.createFrom().deferred(() -> client.start() + .onSubscription().invoke(() -> { + connected.set(true); + log.connectionEstablished(configuration.getChannel()); + }) + .onItem().transform(ignored -> { + Context context = rootContext.get(); + if (context == null) { + context = Vertx.currentContext(); + } + connectionHolder.set(new CurrentConnection(client, context)); + + // handle the case we are already disconnected. + if (!client.isConnected() || connectionHolder.get() == null) { + // Throwing the exception would trigger a retry. + connectionHolder.set(null); + throw ex.illegalStateConnectionDisconnected(); + } + return client; + }) + .onFailure().invoke(log::unableToConnectToBroker) + .onFailure().invoke(t -> { + connectionHolder.set(null); + log.unableToRecoverFromConnectionDisruption(t); + })); + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index f795b37f17..93bb4934e4 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -63,15 +63,26 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Meta public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { - this(delegate.getDelegate(), holder, onNack, onAck, contentTypeOverride); + this(delegate.getDelegate(), holder, holder.getContext(), onNack, onAck, contentTypeOverride); + } + + public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Context context, + RabbitMQFailureHandler onNack, + RabbitMQAckHandler onAck, String contentTypeOverride) { + this(delegate.getDelegate(), holder, context, onNack, onAck, contentTypeOverride); } IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { + this(msg, holder, holder.getContext(), onNack, onAck, contentTypeOverride); + } + + IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, Context context, + RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { this.message = msg; this.deliveryTag = msg.envelope().getDeliveryTag(); this.holder = holder; - this.context = holder.getContext(); + this.context = context != null ? context : holder.getContext(); this.contentTypeOverride = contentTypeOverride; this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride); this.onNack = onNack; diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index bd02070c57..3d35b11645 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -1,14 +1,19 @@ package io.smallrye.reactive.messaging.rabbitmq; -import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.*; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING; +import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicInteger; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -38,6 +43,7 @@ import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import io.smallrye.reactive.messaging.rabbitmq.internals.IncomingRabbitMQChannel; import io.smallrye.reactive.messaging.rabbitmq.internals.OutgoingRabbitMQChannel; +import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; import io.vertx.rabbitmq.RabbitMQOptions; @@ -64,6 +70,7 @@ @ConnectorAttribute(name = "reconnect-interval", direction = INCOMING_AND_OUTGOING, description = "The interval (in seconds) between two reconnection attempts", type = "int", alias = "rabbitmq-reconnect-interval", defaultValue = "10") @ConnectorAttribute(name = "network-recovery-interval", direction = INCOMING_AND_OUTGOING, description = "How long (ms) will automatic recovery wait before attempting to reconnect", type = "int", defaultValue = "5000") @ConnectorAttribute(name = "user", direction = INCOMING_AND_OUTGOING, description = "The user name to use when connecting to the broker", type = "string", defaultValue = "guest") +@ConnectorAttribute(name = "shared-connection-name", direction = INCOMING_AND_OUTGOING, description = "Optional identifier allowing multiple channels to share the same RabbitMQ connection when set to the same value", type = "string") @ConnectorAttribute(name = "include-properties", direction = INCOMING_AND_OUTGOING, description = "Whether to include properties when a broker message is passed on the event bus", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "requested-channel-max", direction = INCOMING_AND_OUTGOING, description = "The initially requested maximum channel number", type = "int", defaultValue = "2047") @ConnectorAttribute(name = "requested-heartbeat", direction = INCOMING_AND_OUTGOING, description = "The initially requested heartbeat interval (seconds), zero for none", type = "int", defaultValue = "60") @@ -164,7 +171,8 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H Instance failureHandlerFactories; private List incomings = new CopyOnWriteArrayList<>(); private List outgoings = new CopyOnWriteArrayList<>(); - private Map clients = new ConcurrentHashMap<>(); + private Map clientRegistrations = new ConcurrentHashMap<>(); + private Map sharedClients = new ConcurrentHashMap<>(); @Inject @Any @@ -263,27 +271,28 @@ public void terminate( outgoing.terminate(); } - clients.forEach((channel, rabbitMQClient) -> rabbitMQClient.stopAndAwait()); - clients.clear(); + List registeredChannels = new ArrayList<>(clientRegistrations.keySet()); + for (String channel : registeredChannels) { + releaseClient(channel); + } + sharedClients.clear(); } public Vertx vertx() { return executionHolder.vertx(); } - public void registerClient(String channel, RabbitMQClient client) { - RabbitMQClient old = clients.put(channel, client); - if (old != null) { - old.stopAndForget(); - } - } - public void reportIncomingFailure(String channel, Throwable reason) { log.failureReported(channel, reason); - RabbitMQClient client = clients.remove(channel); - if (client != null) { - // Called on vertx context, we can't block: stop clients without waiting - client.stopAndForget(); + ClientRegistration registration = clientRegistrations.remove(channel); + if (registration == null) { + return; + } + + if (registration.shared) { + releaseSharedClient(registration.key, false); + } else { + stopClient(registration.holder.client(), false); } } @@ -306,4 +315,115 @@ public Instance credentialsProviders() { public Instance> configMaps() { return configMaps; } + + public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config, io.vertx.mutiny.core.Context context) { + ClientRegistration existing = clientRegistrations.get(config.getChannel()); + if (existing != null) { + return existing.holder; + } + + return config.getSharedConnectionName() + .map(name -> getOrCreateSharedHolder(config, context, name)) + .orElseGet(() -> createAndRegisterHolder(config, context, config.getChannel(), false)); + } + + private ClientHolder createAndRegisterHolder(RabbitMQConnectorCommonConfiguration config, + io.vertx.mutiny.core.Context context, String key, boolean shared) { + ClientHolder holder = new ClientHolder(RabbitMQClientHelper.createClient(this, config), config, vertx(), context); + clientRegistrations.put(config.getChannel(), new ClientRegistration(holder, shared, key)); + return holder; + } + + private ClientHolder getOrCreateSharedHolder(RabbitMQConnectorCommonConfiguration config, + io.vertx.mutiny.core.Context context, String name) { + RabbitMQOptions options = RabbitMQClientHelper.buildClientOptions(this, config); + String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options); + SharedClient shared = sharedClients.compute(name, (key, existing) -> { + if (existing != null) { + if (!existing.fingerprint.equals(fingerprint)) { + throw ex.illegalStateSharedConnectionConfigMismatch(name); + } + existing.retain(); + if (context != null) { + existing.holder.ensureContext(context); + } + return existing; + } + return new SharedClient(name, new ClientHolder( + RabbitMQClient.create(vertx(), options), + config, + vertx(), + context), fingerprint); + }); + clientRegistrations.put(config.getChannel(), new ClientRegistration(shared.holder, true, name)); + return shared.holder; + } + + public void releaseClient(String channel) { + ClientRegistration registration = clientRegistrations.remove(channel); + if (registration == null) { + return; + } + + if (registration.shared) { + releaseSharedClient(registration.key, true); + } else { + stopClient(registration.holder.client(), true); + } + } + + private void releaseSharedClient(String sharedName, boolean await) { + SharedClient shared = sharedClients.get(sharedName); + if (shared == null) { + return; + } + if (shared.release()) { + sharedClients.remove(sharedName, shared); + stopClient(shared.holder.client(), await); + } + } + + private void stopClient(RabbitMQClient client, boolean await) { + if (client == null) { + return; + } + if (await) { + client.stopAndAwait(); + } else { + client.stopAndForget(); + } + } + + private static final class ClientRegistration { + final ClientHolder holder; + final boolean shared; + final String key; + + private ClientRegistration(ClientHolder holder, boolean shared, String key) { + this.holder = holder; + this.shared = shared; + this.key = key; + } + } + + private static final class SharedClient { + final String name; + final ClientHolder holder; + final String fingerprint; + final AtomicInteger references = new AtomicInteger(1); + + private SharedClient(String name, ClientHolder holder, String fingerprint) { + this.name = name; + this.holder = holder; + this.fingerprint = fingerprint; + } + + private void retain() { + references.incrementAndGet(); + } + + private boolean release() { + return references.decrementAndGet() == 0; + } + } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java index 2ca6b0c0e1..2a47f36a4c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java @@ -41,4 +41,7 @@ public interface RabbitMQExceptions { @Message(id = 16009, value = "Unable to create a client, probably a config error") IllegalStateException illegalStateUnableToCreateClient(@Cause Throwable t); + + @Message(id = 16010, value = "Shared connection '%s' has mismatched configuration; ensure all channels using the same shared-connection-name have identical connection settings") + IllegalStateException illegalStateSharedConnectionConfigMismatch(String sharedConnectionName); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 6420558b84..0614f84ac0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -43,10 +43,11 @@ public class IncomingRabbitMQChannel { private final RabbitMQOpenTelemetryInstrumenter instrumenter; private final AtomicReference subscription = new AtomicReference<>(); - private volatile RabbitMQClient client; private final RabbitMQConnectorIncomingConfiguration config; private final Multi> stream; private final RabbitMQConnector connector; + private final Context incomingContext; + private volatile RabbitMQClient client; public IncomingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration ic, Instance openTelemetryInstance) { @@ -58,14 +59,17 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, } this.config = ic; this.connector = connector; + this.incomingContext = Context + .newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); - Multi> multi = createConsumer(connector, ic) + Multi> multi = createConsumer(connector, ic, incomingContext) .invoke(tuple -> client = tuple.getItem1().client()) // Translate all consumers into a merged stream of messages - .onItem().transformToMulti(tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), ic, onNack, onAck)); + .onItem().transformToMulti( + tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); if (ic.getBroadcast()) { multi = multi.broadcast().toAllSubscribers(); @@ -110,9 +114,9 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder } private Uni> createConsumer(RabbitMQConnector connector, - RabbitMQConnectorIncomingConfiguration ic) { - // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic); + RabbitMQConnectorIncomingConfiguration ic, Context root) { + ClientHolder holder = connector.getClientHolder(ic, root); + final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { Uni uni; @@ -130,8 +134,6 @@ private Uni> createConsumer(RabbitMQConne .subscribe().with(ignored -> promise.complete(), promise::fail); }); - Context root = Context.newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); - final ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root); return holder.getOrEstablishConnection() .invoke(() -> log.connectionEstablished(ic.getChannel())) .flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer))); @@ -236,6 +238,7 @@ private Uni createConsumer(RabbitMQConnectorIncomingConfigurat private Multi> getStreamOfMessages( RabbitMQConsumer receiver, ClientHolder holder, + Context context, RabbitMQConnectorIncomingConfiguration ic, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) { @@ -247,8 +250,8 @@ private Multi> getStreamOfMessages( Multi> multi = receiver.toMulti() // close the consumer on stream termination .onTermination().call(receiver::cancel) - .emitOn(c -> VertxContext.runOnContext(holder.getContext().getDelegate(), c)) - .map(m -> new IncomingRabbitMQMessage<>(m, holder, onNack, onAck, contentTypeOverride)); + .emitOn(c -> VertxContext.runOnContext(context.getDelegate(), c)) + .map(m -> new IncomingRabbitMQMessage<>(m, holder, context, onNack, onAck, contentTypeOverride)); if (ic.getTracingEnabled()) { return multi.map(msg -> instrumenter.traceIncoming(msg, RabbitMQTrace.traceQueue(queueName, msg.message.envelope().getRoutingKey(), msg.getHeaders()))); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index 5302118942..e56da53b5c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -32,15 +32,15 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut Instance openTelemetryInstance) { this.config = oc; + holder = connector.getClientHolder(oc, null); // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc); + final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { // Ensure we create the exchange to which messages are to be sent RabbitMQClientHelper.declareExchangeIfNeeded(client, oc, connector.configMaps()) .subscribe().with((ignored) -> promise.complete(), promise::fail); }); - holder = new ClientHolder(client, oc, connector.vertx(), null); final Uni getSender = holder.getOrEstablishConnection() .onItem() .transformToUni(connection -> Uni.createFrom().item(RabbitMQPublisher.create(connector.vertx(), connection, diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java index 79ba298c1f..0fe91fae62 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java @@ -7,6 +7,9 @@ import static io.vertx.core.net.ClientOptionsBase.DEFAULT_METRICS_NAME; import static java.time.Duration.ofSeconds; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.*; import java.util.stream.Collectors; @@ -29,6 +32,11 @@ import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.core.json.JsonObject; import io.vertx.core.net.JksOptions; +import io.vertx.core.net.KeyCertOptions; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.core.net.PemTrustOptions; +import io.vertx.core.net.PfxOptions; +import io.vertx.core.net.TrustOptions; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; import io.vertx.rabbitmq.RabbitMQOptions; @@ -42,29 +50,79 @@ private RabbitMQClientHelper() { // avoid direct instantiation. } - static RabbitMQClient createClient(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { - Optional clientOptionsName = config.getClientOptionsName(); - Vertx vertx = connector.vertx(); - RabbitMQOptions options; + public static RabbitMQClient createClient(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { try { - if (clientOptionsName.isPresent()) { - options = getClientOptionsFromBean(connector.clientOptions(), clientOptionsName.get()); - } else { - options = getClientOptions(vertx, config, connector.credentialsProviders()); - } - if (DEFAULT_METRICS_NAME.equals(options.getMetricsName())) { - options.setMetricsName("rabbitmq|" + config.getChannel()); - } - RabbitMQOptions intercepted = ConfigUtils.customize(config.config(), connector.configCustomizers(), options); - RabbitMQClient client = RabbitMQClient.create(vertx, intercepted); - connector.registerClient(config.getChannel(), client); - return client; + RabbitMQOptions options = buildClientOptions(connector, config); + return RabbitMQClient.create(connector.vertx(), options); } catch (Exception e) { log.unableToCreateClient(e); throw ex.illegalStateUnableToCreateClient(e); } } + public static RabbitMQOptions buildClientOptions(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { + Optional clientOptionsName = config.getClientOptionsName(); + Vertx vertx = connector.vertx(); + RabbitMQOptions options; + String connectionLabel = config.getSharedConnectionName().orElse(config.getChannel()); + if (clientOptionsName.isPresent()) { + options = getClientOptionsFromBean(connector.clientOptions(), clientOptionsName.get()); + } else { + options = getClientOptions(vertx, config, connector.credentialsProviders()); + } + if (DEFAULT_METRICS_NAME.equals(options.getMetricsName())) { + options.setMetricsName("rabbitmq|" + connectionLabel); + } + if (options.getConnectionName() == null || options.getConnectionName().isEmpty()) { + options.setConnectionName(resolveConnectionName(config)); + } + return ConfigUtils.customize(config.config(), connector.configCustomizers(), options); + } + + public static String computeConnectionFingerprint(RabbitMQOptions options) { + StringBuilder raw = new StringBuilder(); + append(raw, "uri", options.getUri()); + + List
addresses = options.getAddresses(); + if (addresses != null && !addresses.isEmpty()) { + List normalized = addresses.stream() + .map(address -> address.getHost() + ":" + address.getPort()) + .sorted() + .collect(Collectors.toList()); + append(raw, "addresses", String.join(",", normalized)); + } else { + append(raw, "host", options.getHost()); + append(raw, "port", Integer.toString(options.getPort())); + } + + append(raw, "virtualHost", options.getVirtualHost()); + append(raw, "user", options.getUser()); + append(raw, "passwordHash", hashValue(options.getPassword())); + + append(raw, "ssl", Boolean.toString(options.isSsl())); + append(raw, "trustAll", Boolean.toString(options.isTrustAll())); + append(raw, "hostnameVerificationAlgorithm", options.getHostnameVerificationAlgorithm()); + append(raw, "keyCertOptions", keyCertFingerprint(options.getKeyCertOptions())); + append(raw, "trustOptions", trustFingerprint(options.getTrustOptions())); + + append(raw, "connectionTimeout", Integer.toString(options.getConnectionTimeout())); + append(raw, "handshakeTimeout", Integer.toString(options.getHandshakeTimeout())); + append(raw, "requestedHeartbeat", Integer.toString(options.getRequestedHeartbeat())); + append(raw, "requestedChannelMax", Integer.toString(options.getRequestedChannelMax())); + append(raw, "networkRecoveryInterval", Long.toString(options.getNetworkRecoveryInterval())); + append(raw, "automaticRecoveryEnabled", Boolean.toString(options.isAutomaticRecoveryEnabled())); + append(raw, "automaticRecoveryOnInitialConnection", Boolean.toString(options.isAutomaticRecoveryOnInitialConnection())); + append(raw, "useNio", Boolean.toString(options.isNioEnabled())); + append(raw, "reconnectAttempts", Integer.toString(options.getReconnectAttempts())); + append(raw, "reconnectInterval", Long.toString(options.getReconnectInterval())); + + append(raw, "credentialsProvider", className(options.getCredentialsProvider())); + append(raw, "credentialsRefreshService", className(options.getCredentialsRefreshService())); + append(raw, "saslConfig", className(options.getSaslConfig())); + + return sha256(raw.toString()); + } + static RabbitMQOptions getClientOptionsFromBean(Instance options, String optionsBeanName) { options = options.select(Identifier.Literal.of(optionsBeanName)); if (options.isUnsatisfied()) { @@ -83,9 +141,7 @@ static RabbitMQOptions getClientOptionsFromBean(Instance option static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConfiguration config, Instance credentialsProviders) { - String connectionName = String.format("%s (%s)", - config.getChannel(), - config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing"); + String connectionName = resolveConnectionName(config); List
addresses = config.getAddresses() .map(s -> Arrays.asList(Address.parseAddresses(s))) .orElseGet(() -> Collections.singletonList(new Address(config.getHost(), config.getPort()))); @@ -160,6 +216,84 @@ static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConf return options; } + private static String resolveConnectionName(RabbitMQConnectorCommonConfiguration config) { + return config.getSharedConnectionName() + .orElseGet(() -> String.format("%s (%s)", + config.getChannel(), + config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing")); + } + + private static void append(StringBuilder target, String key, String value) { + target.append(key).append('=').append(value == null ? "" : value).append(';'); + } + + private static String className(Object value) { + return value == null ? "" : value.getClass().getName(); + } + + private static String hashValue(String value) { + if (value == null) { + return ""; + } + return sha256(value); + } + + private static String sha256(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder hex = new StringBuilder(hash.length * 2); + for (byte b : hash) { + hex.append(String.format("%02x", b)); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("Unable to compute SHA-256 hash", e); + } + } + + private static String keyCertFingerprint(KeyCertOptions options) { + if (options == null) { + return ""; + } + if (options instanceof JksOptions) { + JksOptions jks = (JksOptions) options; + return String.join(":", "JKS", nullToEmpty(jks.getPath()), nullToEmpty(jks.getAlias())); + } + if (options instanceof PfxOptions) { + PfxOptions pfx = (PfxOptions) options; + return String.join(":", "PFX", nullToEmpty(pfx.getPath()), nullToEmpty(pfx.getAlias())); + } + if (options instanceof PemKeyCertOptions) { + PemKeyCertOptions pem = (PemKeyCertOptions) options; + return String.join(":", "PEM", String.join(",", pem.getKeyPaths()), String.join(",", pem.getCertPaths())); + } + return options.getClass().getName(); + } + + private static String trustFingerprint(TrustOptions options) { + if (options == null) { + return ""; + } + if (options instanceof JksOptions) { + JksOptions jks = (JksOptions) options; + return String.join(":", "JKS", nullToEmpty(jks.getPath()), nullToEmpty(jks.getAlias())); + } + if (options instanceof PfxOptions) { + PfxOptions pfx = (PfxOptions) options; + return String.join(":", "PFX", nullToEmpty(pfx.getPath()), nullToEmpty(pfx.getAlias())); + } + if (options instanceof PemTrustOptions) { + PemTrustOptions pem = (PemTrustOptions) options; + return String.join(":", "PEM", String.join(",", pem.getCertPaths())); + } + return options.getClass().getName(); + } + + private static String nullToEmpty(String value) { + return value == null ? "" : value; + } + public static String serverQueueName(String name) { if (name.equals("(server.auto)")) { return ""; diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java new file mode 100644 index 0000000000..46eaa7214b --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java @@ -0,0 +1,46 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class IncomingContextBean { + + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicReference messageContext = new AtomicReference<>(); + private final AtomicReference eventLoopContext = new AtomicReference<>(); + + @Incoming("data") + public Uni consume(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + messageContext.set(context); + eventLoopContext.set(context.isEventLoopContext()); + }); + latch.countDown(); + return Uni.createFrom().voidItem(); + } + + public boolean awaitMessage(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + public Context getMessageContext() { + return messageContext.get(); + } + + public boolean isEventLoopContext() { + Boolean value = eventLoopContext.get(); + return value != null && value; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index b79c77c999..aa9242b4b9 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -198,6 +198,192 @@ void testIncomingDeclarations() throws Exception { assertThat(binding2.getString("routing_key")).isEqualTo("urgent"); } + @Test + void testSharedConnectionIncomingAndOutgoingStartup() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + } + + @Test + void testSharedConnectionIncomingUsesEventLoopContext() throws InterruptedException { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingContextBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + IncomingContextBean bean = get(container, IncomingContextBean.class); + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> name != null && name.startsWith("shared-connection")) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + + usage.produce(exchangeName, queueName, routingKey, 1, () -> 1); + + assertThat(bean.awaitMessage(1, TimeUnit.MINUTES)).isTrue(); + assertThat(bean.getMessageContext()).isNotNull(); + assertThat(bean.isEventLoopContext()).isTrue(); + } + + @Test + void testSharedConnectionNameIsNotSuffixed() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> "shared-connection".equals(name)) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + } + + @Test + void testDefaultConnectionNameIncludesDirection() { + final String routingKey = "default"; + + weld.addBeanClass(IncomingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + boolean hasDefaultName = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .anyMatch(name -> "data (Incoming)".equals(name)); + assertThat(hasDefaultName).isTrue(); + }); + } + + private static String getConnectionName(JsonObject connection) { + String connectionName = connection.getString("connection_name"); + if (connectionName != null) { + return connectionName; + } + + JsonObject properties = connection.getJsonObject("client_properties"); + if (properties == null) { + return null; + } + + return properties.getString("connection_name"); + } + /** * Verifies that Exchanges, Queues and Bindings are correctly declared as a result of * incoming connector configuration that specifies DLQ/DLX overrides. diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java index 19768102c1..8c9b3480b3 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java @@ -277,6 +277,26 @@ public JsonArray getBindings(final String exchangeName, final String queueName) } } + /** + * Returns the list of active connections. + * + * @return a {@link JsonArray} of connection descriptions + * @throws IOException if an error occurs + */ + public JsonArray getConnections() throws IOException { + final URL url = new URL(String.format("http://%s:%d/api/connections", options.getHost(), managementPort)); + final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty("Authorization", "Basic " + getBasicAuth()); + conn.connect(); + + if (conn.getResponseCode() == 200) { + final String jsonString = getResponseString(conn); + return new JsonArray(jsonString); + } else { + return null; + } + } + private JsonObject getObjectByTypeAndName(final String objectType, final String objectName) throws IOException { final URL url = new URL(String.format("http://%s:%d/api/%s/%%2F/%s", options.getHost(), managementPort, objectType, objectName)); From e2745d3a3a575aad862cf6702a065af487c657cc Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 7 Jan 2026 16:35:31 +0100 Subject: [PATCH 2/4] Some tests and cleanup --- .../messaging/rabbitmq/ClientHolder.java | 58 ++++---- .../rabbitmq/IncomingRabbitMQMessage.java | 5 - .../messaging/rabbitmq/RabbitMQConnector.java | 5 +- .../internals/OutgoingRabbitMQChannel.java | 22 +-- .../internals/RabbitMQClientHelper.java | 105 +------------- .../rabbitmq/DualIncomingContextBean.java | 69 +++++++++ .../messaging/rabbitmq/OutgoingBean.java | 7 +- .../rabbitmq/RabbitMQReconnectionTest.java | 91 +++++++++++- .../messaging/rabbitmq/RabbitMQTest.java | 132 ++++++++++++++++++ .../rabbitmq/ReconnectingContextBean.java | 50 +++++++ .../internals/RabbitMQClientHelperTest.java | 91 ++++++++++++ 11 files changed, 480 insertions(+), 155 deletions(-) create mode 100644 smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java create mode 100644 smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java create mode 100644 smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index 30e8a721f8..39131c84d4 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -24,18 +24,40 @@ public class ClientHolder { private final AtomicReference connectionHolder = new AtomicReference<>(); private final AtomicReference rootContext; private final AtomicReference> connectionStage = new AtomicReference<>(); + private final Uni connection; private final Vertx vertx; - private final RabbitMQConnectorCommonConfiguration configuration; public ClientHolder(RabbitMQClient client, - RabbitMQConnectorCommonConfiguration configuration, + String channel, Vertx vertx, Context root) { this.client = client; - this.configuration = configuration; this.vertx = vertx; this.rootContext = new AtomicReference<>(root); + this.connection = Uni.createFrom().deferred(() -> client.start() + .onSubscription().invoke(() -> { + connected.set(true); + log.connectionEstablished(channel); + }) + .onItem().transform(ignored -> { + Context ctx = rootContext.get(); + connectionHolder + .set(new CurrentConnection(client, ctx == null ? Vertx.currentContext() : ctx)); + + // handle the case we are already disconnected. + if (!client.isConnected() || connectionHolder.get() == null) { + // Throwing the exception would trigger a retry. + connectionHolder.set(null); + throw ex.illegalStateConnectionDisconnected(); + } + return client; + }) + .onFailure().invoke(log::unableToConnectToBroker) + .onFailure().invoke(t -> { + connectionHolder.set(null); + log.unableToRecoverFromConnectionDisruption(t); + })); } public static CompletionStage runOnContext(Context context, IncomingRabbitMQMessage msg, @@ -110,7 +132,7 @@ public Uni getOrEstablishConnection() { if (current != null) { return Uni.createFrom().completionStage(current); } - CompletionStage created = createConnectionUni().subscribeAsCompletionStage(); + CompletionStage created = connection.subscribeAsCompletionStage(); if (connectionStage.compareAndSet(null, created)) { created.whenComplete((result, error) -> { if (error != null) { @@ -133,32 +155,4 @@ private CurrentConnection(RabbitMQClient client, Context context) { } } - private Uni createConnectionUni() { - return Uni.createFrom().deferred(() -> client.start() - .onSubscription().invoke(() -> { - connected.set(true); - log.connectionEstablished(configuration.getChannel()); - }) - .onItem().transform(ignored -> { - Context context = rootContext.get(); - if (context == null) { - context = Vertx.currentContext(); - } - connectionHolder.set(new CurrentConnection(client, context)); - - // handle the case we are already disconnected. - if (!client.isConnected() || connectionHolder.get() == null) { - // Throwing the exception would trigger a retry. - connectionHolder.set(null); - throw ex.illegalStateConnectionDisconnected(); - } - return client; - }) - .onFailure().invoke(log::unableToConnectToBroker) - .onFailure().invoke(t -> { - connectionHolder.set(null); - log.unableToRecoverFromConnectionDisruption(t); - })); - } - } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index 93bb4934e4..40269b5d1f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -72,11 +72,6 @@ public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Co this(delegate.getDelegate(), holder, context, onNack, onAck, contentTypeOverride); } - IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, - RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { - this(msg, holder, holder.getContext(), onNack, onAck, contentTypeOverride); - } - IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, Context context, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { this.message = msg; diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index 3d35b11645..8d61b20de1 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -329,7 +329,8 @@ public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config, private ClientHolder createAndRegisterHolder(RabbitMQConnectorCommonConfiguration config, io.vertx.mutiny.core.Context context, String key, boolean shared) { - ClientHolder holder = new ClientHolder(RabbitMQClientHelper.createClient(this, config), config, vertx(), context); + ClientHolder holder = new ClientHolder(RabbitMQClientHelper.createClient(this, config), config.getChannel(), vertx(), + context); clientRegistrations.put(config.getChannel(), new ClientRegistration(holder, shared, key)); return holder; } @@ -351,7 +352,7 @@ private ClientHolder getOrCreateSharedHolder(RabbitMQConnectorCommonConfiguratio } return new SharedClient(name, new ClientHolder( RabbitMQClient.create(vertx(), options), - config, + config.getChannel(), vertx(), context), fingerprint); }); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index e56da53b5c..be9973fd75 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -26,6 +26,7 @@ public class OutgoingRabbitMQChannel { private final Flow.Subscriber> subscriber; private final RabbitMQConnectorOutgoingConfiguration config; private final ClientHolder holder; + private final RabbitMQMessageSender processor; private volatile RabbitMQPublisher publisher; public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc, @@ -54,10 +55,19 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut .onFailure().recoverWithNull().memoize().indefinitely(); // Set up a sender based on the publisher we established above - final RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance); + processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance); // Return a SubscriberBuilder - subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t))); + subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t)) + .onTermination().call(() -> { + if (publisher != null) { + return publisher.stop() + .ifNoItem().after(Duration.ofSeconds(oc.getReconnectInterval())).fail() + .onFailure() + .invoke(e -> log.infof(e, "Error terminating outgoing channel %s", config.getChannel())); + } + return Uni.createFrom().voidItem(); + })); } public Flow.Subscriber> getSubscriber() { @@ -95,12 +105,6 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder } public void terminate() { - if (publisher != null) { - try { - publisher.stop().await().atMost(Duration.ofMillis(config.getConnectionTimeout())); - } catch (Exception e) { - log.infof(e, "Error terminating outgoing channel %s", config.getChannel()); - } - } + processor.cancel(); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java index 0fe91fae62..70143fbbce 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java @@ -32,11 +32,6 @@ import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.core.json.JsonObject; import io.vertx.core.net.JksOptions; -import io.vertx.core.net.KeyCertOptions; -import io.vertx.core.net.PemKeyCertOptions; -import io.vertx.core.net.PemTrustOptions; -import io.vertx.core.net.PfxOptions; -import io.vertx.core.net.TrustOptions; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; import io.vertx.rabbitmq.RabbitMQOptions; @@ -80,47 +75,12 @@ public static RabbitMQOptions buildClientOptions(RabbitMQConnector connector, Ra } public static String computeConnectionFingerprint(RabbitMQOptions options) { - StringBuilder raw = new StringBuilder(); - append(raw, "uri", options.getUri()); - + JsonObject json = options.toJson(); List
addresses = options.getAddresses(); - if (addresses != null && !addresses.isEmpty()) { - List normalized = addresses.stream() - .map(address -> address.getHost() + ":" + address.getPort()) - .sorted() - .collect(Collectors.toList()); - append(raw, "addresses", String.join(",", normalized)); - } else { - append(raw, "host", options.getHost()); - append(raw, "port", Integer.toString(options.getPort())); + if (addresses != null) { + json.put("addresses", addresses.stream().map(Address::toString).collect(Collectors.toList())); } - - append(raw, "virtualHost", options.getVirtualHost()); - append(raw, "user", options.getUser()); - append(raw, "passwordHash", hashValue(options.getPassword())); - - append(raw, "ssl", Boolean.toString(options.isSsl())); - append(raw, "trustAll", Boolean.toString(options.isTrustAll())); - append(raw, "hostnameVerificationAlgorithm", options.getHostnameVerificationAlgorithm()); - append(raw, "keyCertOptions", keyCertFingerprint(options.getKeyCertOptions())); - append(raw, "trustOptions", trustFingerprint(options.getTrustOptions())); - - append(raw, "connectionTimeout", Integer.toString(options.getConnectionTimeout())); - append(raw, "handshakeTimeout", Integer.toString(options.getHandshakeTimeout())); - append(raw, "requestedHeartbeat", Integer.toString(options.getRequestedHeartbeat())); - append(raw, "requestedChannelMax", Integer.toString(options.getRequestedChannelMax())); - append(raw, "networkRecoveryInterval", Long.toString(options.getNetworkRecoveryInterval())); - append(raw, "automaticRecoveryEnabled", Boolean.toString(options.isAutomaticRecoveryEnabled())); - append(raw, "automaticRecoveryOnInitialConnection", Boolean.toString(options.isAutomaticRecoveryOnInitialConnection())); - append(raw, "useNio", Boolean.toString(options.isNioEnabled())); - append(raw, "reconnectAttempts", Integer.toString(options.getReconnectAttempts())); - append(raw, "reconnectInterval", Long.toString(options.getReconnectInterval())); - - append(raw, "credentialsProvider", className(options.getCredentialsProvider())); - append(raw, "credentialsRefreshService", className(options.getCredentialsRefreshService())); - append(raw, "saslConfig", className(options.getSaslConfig())); - - return sha256(raw.toString()); + return sha256(json.encode()); } static RabbitMQOptions getClientOptionsFromBean(Instance options, String optionsBeanName) { @@ -223,21 +183,6 @@ private static String resolveConnectionName(RabbitMQConnectorCommonConfiguration config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing")); } - private static void append(StringBuilder target, String key, String value) { - target.append(key).append('=').append(value == null ? "" : value).append(';'); - } - - private static String className(Object value) { - return value == null ? "" : value.getClass().getName(); - } - - private static String hashValue(String value) { - if (value == null) { - return ""; - } - return sha256(value); - } - private static String sha256(String value) { try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); @@ -252,48 +197,6 @@ private static String sha256(String value) { } } - private static String keyCertFingerprint(KeyCertOptions options) { - if (options == null) { - return ""; - } - if (options instanceof JksOptions) { - JksOptions jks = (JksOptions) options; - return String.join(":", "JKS", nullToEmpty(jks.getPath()), nullToEmpty(jks.getAlias())); - } - if (options instanceof PfxOptions) { - PfxOptions pfx = (PfxOptions) options; - return String.join(":", "PFX", nullToEmpty(pfx.getPath()), nullToEmpty(pfx.getAlias())); - } - if (options instanceof PemKeyCertOptions) { - PemKeyCertOptions pem = (PemKeyCertOptions) options; - return String.join(":", "PEM", String.join(",", pem.getKeyPaths()), String.join(",", pem.getCertPaths())); - } - return options.getClass().getName(); - } - - private static String trustFingerprint(TrustOptions options) { - if (options == null) { - return ""; - } - if (options instanceof JksOptions) { - JksOptions jks = (JksOptions) options; - return String.join(":", "JKS", nullToEmpty(jks.getPath()), nullToEmpty(jks.getAlias())); - } - if (options instanceof PfxOptions) { - PfxOptions pfx = (PfxOptions) options; - return String.join(":", "PFX", nullToEmpty(pfx.getPath()), nullToEmpty(pfx.getAlias())); - } - if (options instanceof PemTrustOptions) { - PemTrustOptions pem = (PemTrustOptions) options; - return String.join(":", "PEM", String.join(",", pem.getCertPaths())); - } - return options.getClass().getName(); - } - - private static String nullToEmpty(String value) { - return value == null ? "" : value; - } - public static String serverQueueName(String name) { if (name.equals("(server.auto)")) { return ""; diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java new file mode 100644 index 0000000000..ee2c2f7823 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java @@ -0,0 +1,69 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class DualIncomingContextBean { + + private final CountDownLatch latch1 = new CountDownLatch(1); + private final CountDownLatch latch2 = new CountDownLatch(1); + private final AtomicReference context1 = new AtomicReference<>(); + private final AtomicReference context2 = new AtomicReference<>(); + private final AtomicReference eventLoop1 = new AtomicReference<>(); + private final AtomicReference eventLoop2 = new AtomicReference<>(); + + @Incoming("data1") + public Uni consume1(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + context1.set(context); + eventLoop1.set(context.isEventLoopContext()); + }); + latch1.countDown(); + return Uni.createFrom().voidItem(); + } + + @Incoming("data2") + public Uni consume2(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + context2.set(context); + eventLoop2.set(context.isEventLoopContext()); + }); + latch2.countDown(); + return Uni.createFrom().voidItem(); + } + + public boolean awaitMessages(long timeout, TimeUnit unit) throws InterruptedException { + return latch1.await(timeout, unit) && latch2.await(timeout, unit); + } + + public Context getContext1() { + return context1.get(); + } + + public Context getContext2() { + return context2.get(); + } + + public boolean isEventLoop1() { + Boolean value = eventLoop1.get(); + return value != null && value; + } + + public boolean isEventLoop2() { + Boolean value = eventLoop2.get(); + return value != null && value; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java index 3367006052..49a6b708b9 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java @@ -2,9 +2,10 @@ import jakarta.enterprise.context.ApplicationScoped; -import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; +import io.smallrye.mutiny.Multi; + /** * A bean that can be registered to do just enough to support the * declaration of an exchange backing an outgoing rabbitmq channel. @@ -13,8 +14,8 @@ public class OutgoingBean { @Outgoing("sink") - public Message process() { - return Message.of("test"); + public Multi process() { + return Multi.createFrom().items("test", "test2", "test3"); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java index 7aef4925d4..2a0fd5b30a 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java @@ -55,7 +55,7 @@ private Proxy createContainerProxy(ToxiproxyContainer toxiproxy, int toxiPort) { } } - @Test + @Test // 15s void testSendingMessagesToRabbitMQ_connection_fails() { final String routingKey = "normal"; @@ -101,7 +101,7 @@ void testSendingMessagesToRabbitMQ_connection_fails() { } } - @Test + @Test // 17s void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { final String routingKey = "normal"; @@ -147,10 +147,95 @@ void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { } } + @Test + void testSharedConnectionReconnectionPreservesContext() { + final String routingKey = "shared"; + try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") + .asCompatibleSubstituteFor("shopify/toxiproxy")) + .withNetworkAliases("toxiproxy")) { + toxiproxy.withNetwork(Network.SHARED); + toxiproxy.start(); + await().until(toxiproxy::isRunning); + + List exposedPorts = toxiproxy.getExposedPorts(); + int toxiPort = exposedPorts.get(exposedPorts.size() - 1); + Proxy proxy = createContainerProxy(toxiproxy, toxiPort); + int exposedPort = toxiproxy.getMappedPort(toxiPort); + + weld.addBeanClass(ReconnectingContextBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.queue.durable", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", toxiproxy.getHost()) + .put("mp.messaging.incoming.data.port", exposedPort) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", toxiproxy.getHost()) + .put("mp.messaging.outgoing.sink.port", exposedPort) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-interval", 1) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + ReconnectingContextBean bean = get(container, ReconnectingContextBean.class); + + // Wait for at least one message before disconnect (from OutgoingBean) + await().atMost(1, TimeUnit.MINUTES).until(() -> !bean.getContexts().isEmpty()); + + // Verify pre-disconnect messages have event loop context + assertThat(bean.getEventLoopFlags().get(0)).isTrue(); + + int preDisconnectCount = bean.getContexts().size(); + + // Disconnect + proxy.disable(); + await().pollDelay(3, SECONDS).until(() -> !isRabbitMQConnectorAvailable(container)); + + // Reconnect + proxy.enable(); + await().atMost(1, TimeUnit.MINUTES).until(() -> isRabbitMQConnectorAvailable(container)); + + // Send messages after reconnection via the direct RabbitMQ client + AtomicInteger counter = new AtomicInteger(); + usage.produce(exchangeName, queueName, routingKey, 3, counter::getAndIncrement); + + // Wait for at least one more message after reconnection + await().atMost(1, TimeUnit.MINUTES).until(() -> bean.getContexts().size() > preDisconnectCount); + + // Verify post-reconnection messages also have event loop context. + // This should fail because the reconnection Uni closure captures the original + // null root parameter instead of reading rootContext.get(), so after reconnect + // the context falls back to Vertx.currentContext() which may not be an event loop. + List postReconnectFlags = bean.getEventLoopFlags() + .subList(preDisconnectCount, bean.getEventLoopFlags().size()); + assertThat(postReconnectFlags) + .as("After reconnection, all messages should still have event loop context") + .doesNotContain(false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Verifies that messages can be received from RabbitMQ. */ - @Test + @Test // 14s void testReceivingMessagesFromRabbitMQ_connection_fails() { final String routingKey = "xyzzy"; try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index aa9242b4b9..8f081d66a5 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.rabbitmq; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import java.util.Comparator; @@ -384,6 +385,41 @@ private static String getConnectionName(JsonObject connection) { return properties.getString("connection_name"); } + @Test + void testSharedConnectionConfigMismatchFailsStartup() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", "some-other-host") + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + assertThatThrownBy(() -> container = weld.initialize()) + .isInstanceOf(Exception.class) + .hasStackTraceContaining("mismatched configuration"); + } + /** * Verifies that Exchanges, Queues and Bindings are correctly declared as a result of * incoming connector configuration that specifies DLQ/DLX overrides. @@ -1036,4 +1072,100 @@ void testConsumerArguments() { }); } + @Test + void testSharedConnectionMultipleIncomingChannelsGetDistinctContexts() throws InterruptedException { + final String routingKey1 = "ctx1"; + final String routingKey2 = "ctx2"; + final String queueName2 = queueName + "-2"; + + weld.addBeanClass(DualIncomingContextBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data1.exchange.name", exchangeName) + .put("mp.messaging.incoming.data1.exchange.declare", true) + .put("mp.messaging.incoming.data1.queue.name", queueName) + .put("mp.messaging.incoming.data1.queue.declare", true) + .put("mp.messaging.incoming.data1.routing-keys", routingKey1) + .put("mp.messaging.incoming.data1.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data1.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data1.host", host) + .put("mp.messaging.incoming.data1.port", port) + .put("mp.messaging.incoming.data1.tracing.enabled", false) + .put("mp.messaging.incoming.data2.exchange.name", exchangeName) + .put("mp.messaging.incoming.data2.exchange.declare", true) + .put("mp.messaging.incoming.data2.queue.name", queueName2) + .put("mp.messaging.incoming.data2.queue.declare", true) + .put("mp.messaging.incoming.data2.routing-keys", routingKey2) + .put("mp.messaging.incoming.data2.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data2.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data2.host", host) + .put("mp.messaging.incoming.data2.port", port) + .put("mp.messaging.incoming.data2.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().atMost(1, TimeUnit.MINUTES).until(() -> isRabbitMQConnectorAvailable(container)); + + DualIncomingContextBean bean = get(container, DualIncomingContextBean.class); + + usage.produce(exchangeName, queueName, routingKey1, 1, () -> 1); + usage.produce(exchangeName, queueName2, routingKey2, 1, () -> 2); + + assertThat(bean.awaitMessages(1, TimeUnit.MINUTES)).isTrue(); + + assertThat(bean.isEventLoop1()).isTrue(); + assertThat(bean.isEventLoop2()).isTrue(); + assertThat(bean.getContext1()).isNotSameAs(bean.getContext2()); + + // Verify single shared connection + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> name != null && name.startsWith("shared-connection")) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + } + + @Test + void testNonSharedIncomingUsesEventLoopContext() throws InterruptedException { + final String routingKey = "nonshared"; + + weld.addBeanClass(IncomingContextBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + IncomingContextBean bean = get(container, IncomingContextBean.class); + + usage.produce(exchangeName, queueName, routingKey, 1, () -> 1); + + assertThat(bean.awaitMessage(1, TimeUnit.MINUTES)).isTrue(); + assertThat(bean.getMessageContext()).isNotNull(); + assertThat(bean.isEventLoopContext()).isTrue(); + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java new file mode 100644 index 0000000000..766065cf6b --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java @@ -0,0 +1,50 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class ReconnectingContextBean { + + private final CopyOnWriteArrayList contexts = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList eventLoopFlags = new CopyOnWriteArrayList<>(); + private volatile CountDownLatch latch = new CountDownLatch(1); + + @Incoming("data") + public Uni consume(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + contexts.add(context); + eventLoopFlags.add(context.isEventLoopContext()); + }); + latch.countDown(); + return Uni.createFrom().voidItem(); + } + + public void setExpectedMessages(int count) { + latch = new CountDownLatch(count); + } + + public boolean awaitMessages(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + public List getContexts() { + return contexts; + } + + public List getEventLoopFlags() { + return eventLoopFlags; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java new file mode 100644 index 0000000000..37d67bc1b2 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java @@ -0,0 +1,91 @@ +package io.smallrye.reactive.messaging.rabbitmq.internals; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.rabbitmq.client.Address; + +import io.vertx.rabbitmq.RabbitMQOptions; + +class RabbitMQClientHelperTest { + + @Test + void testIdenticalOptionsProduceSameFingerprint() { + RabbitMQOptions options1 = new RabbitMQOptions() + .setHost("localhost") + .setPort(5672) + .setUser("guest") + .setPassword("guest") + .setVirtualHost("/"); + + RabbitMQOptions options2 = new RabbitMQOptions() + .setHost("localhost") + .setPort(5672) + .setUser("guest") + .setPassword("guest") + .setVirtualHost("/"); + + String fingerprint1 = RabbitMQClientHelper.computeConnectionFingerprint(options1); + String fingerprint2 = RabbitMQClientHelper.computeConnectionFingerprint(options2); + + assertThat(fingerprint1).isEqualTo(fingerprint2); + } + + @Test + void testDifferentHostsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("host-a").setPort(5672); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("host-b").setPort(5672); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentAddressesProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setAddresses(List.of(new Address("host-a", 5672))); + RabbitMQOptions options2 = new RabbitMQOptions().setAddresses(List.of(new Address("host-a", 5673))); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentPortsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setPort(5672); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setPort(5673); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentUsersProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setUser("alice"); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setUser("bob"); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentVirtualHostsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setVirtualHost("/"); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setVirtualHost("/staging"); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentSslProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setSsl(false); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setSsl(true); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + +} From b5d2419e49ef9fa9552d2c26f68b7679efc17662 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 26 Mar 2026 12:02:11 +0100 Subject: [PATCH 3/4] Simplify the ClientHolder, context and shared client mechanism --- .../messaging/rabbitmq/ClientHolder.java | 116 +++++--------- .../rabbitmq/IncomingRabbitMQMessage.java | 8 +- .../messaging/rabbitmq/RabbitMQConnector.java | 141 ++++-------------- .../internals/IncomingRabbitMQChannel.java | 7 +- .../internals/OutgoingRabbitMQChannel.java | 13 +- .../rabbitmq/IncomingRabbitMQMessageTest.java | 6 +- 6 files changed, 85 insertions(+), 206 deletions(-) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index 39131c84d4..eae70ea412 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -3,7 +3,10 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -13,51 +16,34 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; import io.vertx.mutiny.core.Context; -import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; public class ClientHolder { private final RabbitMQClient client; - private final AtomicBoolean connected = new AtomicBoolean(false); - private final AtomicReference connectionHolder = new AtomicReference<>(); - private final AtomicReference rootContext; - private final AtomicReference> connectionStage = new AtomicReference<>(); + private final AtomicBoolean hasBeenConnected = new AtomicBoolean(false); + private final AtomicReference> ongoingConnection = new AtomicReference<>(); private final Uni connection; + private final Set channels = ConcurrentHashMap.newKeySet(); - private final Vertx vertx; - - public ClientHolder(RabbitMQClient client, - String channel, - Vertx vertx, - Context root) { + public ClientHolder(RabbitMQClient client) { this.client = client; - this.vertx = vertx; - this.rootContext = new AtomicReference<>(root); this.connection = Uni.createFrom().deferred(() -> client.start() .onSubscription().invoke(() -> { - connected.set(true); - log.connectionEstablished(channel); + hasBeenConnected.set(true); + log.connectionEstablished(String.join(", ", channels)); }) .onItem().transform(ignored -> { - Context ctx = rootContext.get(); - connectionHolder - .set(new CurrentConnection(client, ctx == null ? Vertx.currentContext() : ctx)); - // handle the case we are already disconnected. - if (!client.isConnected() || connectionHolder.get() == null) { + if (!client.isConnected()) { // Throwing the exception would trigger a retry. - connectionHolder.set(null); throw ex.illegalStateConnectionDisconnected(); } return client; }) - .onFailure().invoke(log::unableToConnectToBroker) - .onFailure().invoke(t -> { - connectionHolder.set(null); - log.unableToRecoverFromConnectionDisruption(t); - })); + .onFailure().invoke(log::unableToConnectToBroker)) + .memoize().until(() -> !client.isConnected()); } public static CompletionStage runOnContext(Context context, IncomingRabbitMQMessage msg, @@ -76,32 +62,12 @@ public static CompletionStage runOnContextAndReportFailure(Context context }); } - public Context getContext() { - CurrentConnection connection = connectionHolder.get(); - if (connection != null) { - return connection.context; - } else { - return null; - } - } - - public void ensureContext(Context context) { - if (context == null) { - return; - } - rootContext.compareAndSet(null, context); - CurrentConnection connection = connectionHolder.get(); - if (connection != null && connection.context == null) { - connectionHolder.compareAndSet(connection, new CurrentConnection(connection.client, context)); - } - } - public RabbitMQClient client() { return client; } public boolean hasBeenConnected() { - return connected.get(); + return hasBeenConnected.get(); } @CheckReturnValue @@ -113,46 +79,46 @@ public Function> getNack(final long deliveryTag, final bool return t -> client.basicNack(deliveryTag, false, requeue); } - public Vertx getVertx() { - return vertx; - } - @CheckReturnValue public Uni getOrEstablishConnection() { - CompletionStage existing = connectionStage.get(); + return Uni.createFrom().deferred(this::establishConnection); + } + + private Uni establishConnection() { + CompletableFuture existing = ongoingConnection.get(); if (existing != null) { - if (!existing.toCompletableFuture().isDone() || client.isConnected()) { + if (!existing.isDone() || client.isConnected()) { return Uni.createFrom().completionStage(existing); } - connectionStage.compareAndSet(existing, null); + ongoingConnection.compareAndSet(existing, null); } - for (;;) { - CompletionStage current = connectionStage.get(); - if (current != null) { - return Uni.createFrom().completionStage(current); - } - CompletionStage created = connection.subscribeAsCompletionStage(); - if (connectionStage.compareAndSet(null, created)) { - created.whenComplete((result, error) -> { - if (error != null) { - connectionStage.compareAndSet(created, null); - } - }); - return Uni.createFrom().completionStage(created); - } + CompletableFuture placeholder = new CompletableFuture<>(); + CompletableFuture current = ongoingConnection.compareAndExchange(null, placeholder); + if (current != null) { + return Uni.createFrom().completionStage(current); } + connection.subscribe().with(placeholder::complete, placeholder::completeExceptionally); + placeholder.whenComplete((result, error) -> { + if (error != null) { + ongoingConnection.compareAndSet(placeholder, null); + } + }); + return Uni.createFrom().completionStage(placeholder); } - private static class CurrentConnection { + public Set channels() { + return channels; + } - final RabbitMQClient client; - final Context context; + public ClientHolder retain(String channel) { + channels.add(channel); + return this; + } - private CurrentConnection(RabbitMQClient client, Context context) { - this.client = client; - this.context = context; - } + public boolean release(String channel) { + channels.remove(channel); + return channels.isEmpty(); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index 40269b5d1f..837822d2ee 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -60,12 +60,6 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Meta private final String contentTypeOverride; private final T payload; - public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, - RabbitMQFailureHandler onNack, - RabbitMQAckHandler onAck, String contentTypeOverride) { - this(delegate.getDelegate(), holder, holder.getContext(), onNack, onAck, contentTypeOverride); - } - public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Context context, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { @@ -77,7 +71,7 @@ public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Co this.message = msg; this.deliveryTag = msg.envelope().getDeliveryTag(); this.holder = holder; - this.context = context != null ? context : holder.getContext(); + this.context = context; this.contentTypeOverride = contentTypeOverride; this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride); this.onNack = onNack; diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index 8d61b20de1..392a692343 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -6,14 +6,12 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicInteger; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -169,10 +167,11 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H @Inject @Any Instance failureHandlerFactories; - private List incomings = new CopyOnWriteArrayList<>(); - private List outgoings = new CopyOnWriteArrayList<>(); - private Map clientRegistrations = new ConcurrentHashMap<>(); - private Map sharedClients = new ConcurrentHashMap<>(); + private final List incomings = new CopyOnWriteArrayList<>(); + private final List outgoings = new CopyOnWriteArrayList<>(); + private final Map clients = new ConcurrentHashMap<>(); + // connection-name to fingerprint map to check against same connection-name but different options + private final Map connectionFingerprints = new ConcurrentHashMap<>(); @Inject @Any @@ -271,11 +270,11 @@ public void terminate( outgoing.terminate(); } - List registeredChannels = new ArrayList<>(clientRegistrations.keySet()); - for (String channel : registeredChannels) { - releaseClient(channel); + for (Map.Entry entry : clients.entrySet()) { + stopClient(entry.getValue().client(), true); } - sharedClients.clear(); + clients.clear(); + connectionFingerprints.clear(); } public Vertx vertx() { @@ -284,16 +283,7 @@ public Vertx vertx() { public void reportIncomingFailure(String channel, Throwable reason) { log.failureReported(channel, reason); - ClientRegistration registration = clientRegistrations.remove(channel); - if (registration == null) { - return; - } - - if (registration.shared) { - releaseSharedClient(registration.key, false); - } else { - stopClient(registration.holder.client(), false); - } + releaseClient(channel, false); } public Instance failureHandlerFactories() { @@ -316,71 +306,30 @@ public Instance credentialsProviders() { return configMaps; } - public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config, io.vertx.mutiny.core.Context context) { - ClientRegistration existing = clientRegistrations.get(config.getChannel()); - if (existing != null) { - return existing.holder; - } - - return config.getSharedConnectionName() - .map(name -> getOrCreateSharedHolder(config, context, name)) - .orElseGet(() -> createAndRegisterHolder(config, context, config.getChannel(), false)); - } - - private ClientHolder createAndRegisterHolder(RabbitMQConnectorCommonConfiguration config, - io.vertx.mutiny.core.Context context, String key, boolean shared) { - ClientHolder holder = new ClientHolder(RabbitMQClientHelper.createClient(this, config), config.getChannel(), vertx(), - context); - clientRegistrations.put(config.getChannel(), new ClientRegistration(holder, shared, key)); - return holder; - } - - private ClientHolder getOrCreateSharedHolder(RabbitMQConnectorCommonConfiguration config, - io.vertx.mutiny.core.Context context, String name) { + public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config) { + String channel = config.getChannel(); RabbitMQOptions options = RabbitMQClientHelper.buildClientOptions(this, config); + String connectionName = options.getConnectionName(); String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options); - SharedClient shared = sharedClients.compute(name, (key, existing) -> { - if (existing != null) { - if (!existing.fingerprint.equals(fingerprint)) { - throw ex.illegalStateSharedConnectionConfigMismatch(name); - } - existing.retain(); - if (context != null) { - existing.holder.ensureContext(context); - } - return existing; - } - return new SharedClient(name, new ClientHolder( - RabbitMQClient.create(vertx(), options), - config.getChannel(), - vertx(), - context), fingerprint); - }); - clientRegistrations.put(config.getChannel(), new ClientRegistration(shared.holder, true, name)); - return shared.holder; - } - - public void releaseClient(String channel) { - ClientRegistration registration = clientRegistrations.remove(channel); - if (registration == null) { - return; - } - - if (registration.shared) { - releaseSharedClient(registration.key, true); - } else { - stopClient(registration.holder.client(), true); + String existing = connectionFingerprints.putIfAbsent(connectionName, fingerprint); + if (existing != null && !existing.equals(fingerprint)) { + throw ex.illegalStateSharedConnectionConfigMismatch(connectionName); } + return clients.compute(fingerprint, + (key, current) -> (current == null ? new ClientHolder(RabbitMQClient.create(vertx(), options)) : current) + .retain(channel)); } - private void releaseSharedClient(String sharedName, boolean await) { - SharedClient shared = sharedClients.get(sharedName); - if (shared == null) { - return; - } - if (shared.release()) { - sharedClients.remove(sharedName, shared); - stopClient(shared.holder.client(), await); + public void releaseClient(String channel, boolean await) { + for (var e : clients.entrySet()) { + ClientHolder shared = e.getValue(); + if (shared.channels().contains(channel)) { + if (clients.computeIfPresent(e.getKey(), (k, c) -> c.release(channel) ? null : c) == null) { + connectionFingerprints.values().remove(e.getKey()); + stopClient(shared.client(), await); + } + return; + } } } @@ -395,36 +344,4 @@ private void stopClient(RabbitMQClient client, boolean await) { } } - private static final class ClientRegistration { - final ClientHolder holder; - final boolean shared; - final String key; - - private ClientRegistration(ClientHolder holder, boolean shared, String key) { - this.holder = holder; - this.shared = shared; - this.key = key; - } - } - - private static final class SharedClient { - final String name; - final ClientHolder holder; - final String fingerprint; - final AtomicInteger references = new AtomicInteger(1); - - private SharedClient(String name, ClientHolder holder, String fingerprint) { - this.name = name; - this.holder = holder; - this.fingerprint = fingerprint; - } - - private void retain() { - references.incrementAndGet(); - } - - private boolean release() { - return references.decrementAndGet() == 0; - } - } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 0614f84ac0..cfd37b71a0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -65,7 +65,7 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); - Multi> multi = createConsumer(connector, ic, incomingContext) + Multi> multi = createConsumer(connector, ic) .invoke(tuple -> client = tuple.getItem1().client()) // Translate all consumers into a merged stream of messages .onItem().transformToMulti( @@ -114,8 +114,8 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder } private Uni> createConsumer(RabbitMQConnector connector, - RabbitMQConnectorIncomingConfiguration ic, Context root) { - ClientHolder holder = connector.getClientHolder(ic, root); + RabbitMQConnectorIncomingConfiguration ic) { + ClientHolder holder = connector.getClientHolder(ic); final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { @@ -135,7 +135,6 @@ private Uni> createConsumer(RabbitMQConne }); return holder.getOrEstablishConnection() - .invoke(() -> log.connectionEstablished(ic.getChannel())) .flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer))); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index be9973fd75..a7462c763f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -33,7 +33,7 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut Instance openTelemetryInstance) { this.config = oc; - holder = connector.getClientHolder(oc, null); + holder = connector.getClientHolder(oc); // Create a client final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { @@ -60,11 +60,14 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut // Return a SubscriberBuilder subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t)) .onTermination().call(() -> { - if (publisher != null) { - return publisher.stop() - .ifNoItem().after(Duration.ofSeconds(oc.getReconnectInterval())).fail() + RabbitMQPublisher pub = publisher; + publisher = null; + if (pub != null) { + return pub.stop() + .ifNoItem().after(Duration.ofSeconds(oc.getConnectionTimeout())).fail() .onFailure() - .invoke(e -> log.infof(e, "Error terminating outgoing channel %s", config.getChannel())); + .invoke(e -> log.infof(e, "Error terminating outgoing channel %s", config.getChannel())) + .onFailure().recoverWithNull(); } return Uni.createFrom().voidItem(); })); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java index 71d22eeb56..4a9ee544e3 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java @@ -47,7 +47,7 @@ public void testDoubleAckBehavior() { Exception nackReason = new Exception("test"); - IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), + IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), null, doNothingNack, doNothingAck, "text/plain"); @@ -67,7 +67,7 @@ public void testDoubleNackBehavior() { Exception nackReason = new Exception("test"); - IncomingRabbitMQMessage nackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), + IncomingRabbitMQMessage nackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), null, doNothingNack, doNothingAck, "text/plain"); @@ -87,7 +87,7 @@ void testConvertPayloadFallback() { RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); IncomingRabbitMQMessage incomingRabbitMQMessage = new IncomingRabbitMQMessage<>(msg, - mock(ClientHolder.class), + mock(ClientHolder.class), null, doNothingNack, doNothingAck, null); assertThat(incomingRabbitMQMessage.getPayload()).isEqualTo(payloadBuffer); From eb6fc829e02419df1945fdf217dd1101583ff49e Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 27 Mar 2026 11:01:35 +0100 Subject: [PATCH 4/4] Added some docs for RabbitMQ shared-connection-name property --- .../rabbitmq/rabbitmq-client-customization.md | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md b/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md index c75337dcd3..b500c7d65b 100644 --- a/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md +++ b/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md @@ -13,3 +13,31 @@ connector. You need to indicate the name of the client using the `client-options-name` attribute: mp.messaging.incoming.prices.client-options-name=my-named-options + +## Shared connections + +By default, each channel opens its own connection to the RabbitMQ +broker. If your application has multiple channels connecting to the same +broker, you can configure them to share a single underlying connection +using the `shared-connection-name` attribute: + +``` properties +mp.messaging.incoming.orders.connector=smallrye-rabbitmq +mp.messaging.incoming.orders.shared-connection-name=my-connection + +mp.messaging.outgoing.confirmations.connector=smallrye-rabbitmq +mp.messaging.outgoing.confirmations.shared-connection-name=my-connection +``` + +In the above example, the `orders` incoming channel and the +`confirmations` outgoing channel share the same RabbitMQ connection. + +All channels sharing a connection name **must** use identical connection +options (host, port, credentials, SSL settings, virtual host, etc.). +If two channels declare the same `shared-connection-name` but have +different connection options, the connector throws an +`IllegalStateException` at startup. + +Shared connections are useful when your application has many channels +connecting to the same broker and you want to reduce the number of TCP +connections, or when the broker imposes connection limits.