diff --git a/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md b/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md index c75337dcd3..b500c7d65b 100644 --- a/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md +++ b/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md @@ -13,3 +13,31 @@ connector. You need to indicate the name of the client using the `client-options-name` attribute: mp.messaging.incoming.prices.client-options-name=my-named-options + +## Shared connections + +By default, each channel opens its own connection to the RabbitMQ +broker. If your application has multiple channels connecting to the same +broker, you can configure them to share a single underlying connection +using the `shared-connection-name` attribute: + +``` properties +mp.messaging.incoming.orders.connector=smallrye-rabbitmq +mp.messaging.incoming.orders.shared-connection-name=my-connection + +mp.messaging.outgoing.confirmations.connector=smallrye-rabbitmq +mp.messaging.outgoing.confirmations.shared-connection-name=my-connection +``` + +In the above example, the `orders` incoming channel and the +`confirmations` outgoing channel share the same RabbitMQ connection. + +All channels sharing a connection name **must** use identical connection +options (host, port, credentials, SSL settings, virtual host, etc.). +If two channels declare the same `shared-connection-name` but have +different connection options, the connector throws an +`IllegalStateException` at startup. + +Shared connections are useful when your application has many channels +connecting to the same broker and you want to reduce the number of TCP +connections, or when the broker imposes connection limits. diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index 5c9ef286a2..eae70ea412 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -3,7 +3,10 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -13,55 +16,34 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; import io.vertx.mutiny.core.Context; -import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; public class ClientHolder { private final RabbitMQClient client; - private final AtomicBoolean connected = new AtomicBoolean(false); - private final AtomicReference connectionHolder = new AtomicReference<>(); + private final AtomicBoolean hasBeenConnected = new AtomicBoolean(false); + private final AtomicReference> ongoingConnection = new AtomicReference<>(); private final Uni connection; + private final Set channels = ConcurrentHashMap.newKeySet(); - private final Vertx vertx; - - public ClientHolder(RabbitMQClient client, - RabbitMQConnectorCommonConfiguration configuration, - Vertx vertx, - Context root) { + public ClientHolder(RabbitMQClient client) { this.client = client; - this.vertx = vertx; this.connection = Uni.createFrom().deferred(() -> client.start() .onSubscription().invoke(() -> { - connected.set(true); - log.connectionEstablished(configuration.getChannel()); + hasBeenConnected.set(true); + log.connectionEstablished(String.join(", ", channels)); }) .onItem().transform(ignored -> { - connectionHolder - .set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root)); - // handle the case we are already disconnected. - if (!client.isConnected() || connectionHolder.get() == null) { + if (!client.isConnected()) { // Throwing the exception would trigger a retry. - connectionHolder.set(null); throw ex.illegalStateConnectionDisconnected(); } return client; }) - .onFailure().invoke(log::unableToConnectToBroker) - .onFailure().invoke(t -> { - connectionHolder.set(null); - log.unableToRecoverFromConnectionDisruption(t); - })) - .memoize().until(() -> { - CurrentConnection connection = connectionHolder.get(); - if (connection == null) { - return true; - } - return !connection.client.isConnected(); - }); - + .onFailure().invoke(log::unableToConnectToBroker)) + .memoize().until(() -> !client.isConnected()); } public static CompletionStage runOnContext(Context context, IncomingRabbitMQMessage msg, @@ -80,21 +62,12 @@ public static CompletionStage runOnContextAndReportFailure(Context context }); } - public Context getContext() { - CurrentConnection connection = connectionHolder.get(); - if (connection != null) { - return connection.context; - } else { - return null; - } - } - public RabbitMQClient client() { return client; } public boolean hasBeenConnected() { - return connected.get(); + return hasBeenConnected.get(); } @CheckReturnValue @@ -106,24 +79,46 @@ public Function> getNack(final long deliveryTag, final bool return t -> client.basicNack(deliveryTag, false, requeue); } - public Vertx getVertx() { - return vertx; - } - @CheckReturnValue public Uni getOrEstablishConnection() { - return connection; + return Uni.createFrom().deferred(this::establishConnection); } - private static class CurrentConnection { - - final RabbitMQClient client; - final Context context; + private Uni establishConnection() { + CompletableFuture existing = ongoingConnection.get(); + if (existing != null) { + if (!existing.isDone() || client.isConnected()) { + return Uni.createFrom().completionStage(existing); + } + ongoingConnection.compareAndSet(existing, null); + } - private CurrentConnection(RabbitMQClient client, Context context) { - this.client = client; - this.context = context; + CompletableFuture placeholder = new CompletableFuture<>(); + CompletableFuture current = ongoingConnection.compareAndExchange(null, placeholder); + if (current != null) { + return Uni.createFrom().completionStage(current); } + connection.subscribe().with(placeholder::complete, placeholder::completeExceptionally); + placeholder.whenComplete((result, error) -> { + if (error != null) { + ongoingConnection.compareAndSet(placeholder, null); + } + }); + return Uni.createFrom().completionStage(placeholder); + } + + public Set channels() { + return channels; + } + + public ClientHolder retain(String channel) { + channels.add(channel); + return this; + } + + public boolean release(String channel) { + channels.remove(channel); + return channels.isEmpty(); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index f795b37f17..837822d2ee 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -60,18 +60,18 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Meta private final String contentTypeOverride; private final T payload; - public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, + public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Context context, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { - this(delegate.getDelegate(), holder, onNack, onAck, contentTypeOverride); + this(delegate.getDelegate(), holder, context, onNack, onAck, contentTypeOverride); } - IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, + IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, Context context, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { this.message = msg; this.deliveryTag = msg.envelope().getDeliveryTag(); this.holder = holder; - this.context = holder.getContext(); + this.context = context; this.contentTypeOverride = contentTypeOverride; this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride); this.onNack = onNack; diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index bd02070c57..392a692343 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -1,6 +1,9 @@ package io.smallrye.reactive.messaging.rabbitmq; -import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.*; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING; +import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; import java.util.List; @@ -38,6 +41,7 @@ import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import io.smallrye.reactive.messaging.rabbitmq.internals.IncomingRabbitMQChannel; import io.smallrye.reactive.messaging.rabbitmq.internals.OutgoingRabbitMQChannel; +import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; import io.vertx.rabbitmq.RabbitMQOptions; @@ -64,6 +68,7 @@ @ConnectorAttribute(name = "reconnect-interval", direction = INCOMING_AND_OUTGOING, description = "The interval (in seconds) between two reconnection attempts", type = "int", alias = "rabbitmq-reconnect-interval", defaultValue = "10") @ConnectorAttribute(name = "network-recovery-interval", direction = INCOMING_AND_OUTGOING, description = "How long (ms) will automatic recovery wait before attempting to reconnect", type = "int", defaultValue = "5000") @ConnectorAttribute(name = "user", direction = INCOMING_AND_OUTGOING, description = "The user name to use when connecting to the broker", type = "string", defaultValue = "guest") +@ConnectorAttribute(name = "shared-connection-name", direction = INCOMING_AND_OUTGOING, description = "Optional identifier allowing multiple channels to share the same RabbitMQ connection when set to the same value", type = "string") @ConnectorAttribute(name = "include-properties", direction = INCOMING_AND_OUTGOING, description = "Whether to include properties when a broker message is passed on the event bus", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "requested-channel-max", direction = INCOMING_AND_OUTGOING, description = "The initially requested maximum channel number", type = "int", defaultValue = "2047") @ConnectorAttribute(name = "requested-heartbeat", direction = INCOMING_AND_OUTGOING, description = "The initially requested heartbeat interval (seconds), zero for none", type = "int", defaultValue = "60") @@ -162,9 +167,11 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H @Inject @Any Instance failureHandlerFactories; - private List incomings = new CopyOnWriteArrayList<>(); - private List outgoings = new CopyOnWriteArrayList<>(); - private Map clients = new ConcurrentHashMap<>(); + private final List incomings = new CopyOnWriteArrayList<>(); + private final List outgoings = new CopyOnWriteArrayList<>(); + private final Map clients = new ConcurrentHashMap<>(); + // connection-name to fingerprint map to check against same connection-name but different options + private final Map connectionFingerprints = new ConcurrentHashMap<>(); @Inject @Any @@ -263,28 +270,20 @@ public void terminate( outgoing.terminate(); } - clients.forEach((channel, rabbitMQClient) -> rabbitMQClient.stopAndAwait()); + for (Map.Entry entry : clients.entrySet()) { + stopClient(entry.getValue().client(), true); + } clients.clear(); + connectionFingerprints.clear(); } public Vertx vertx() { return executionHolder.vertx(); } - public void registerClient(String channel, RabbitMQClient client) { - RabbitMQClient old = clients.put(channel, client); - if (old != null) { - old.stopAndForget(); - } - } - public void reportIncomingFailure(String channel, Throwable reason) { log.failureReported(channel, reason); - RabbitMQClient client = clients.remove(channel); - if (client != null) { - // Called on vertx context, we can't block: stop clients without waiting - client.stopAndForget(); - } + releaseClient(channel, false); } public Instance failureHandlerFactories() { @@ -306,4 +305,43 @@ public Instance credentialsProviders() { public Instance> configMaps() { return configMaps; } + + public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config) { + String channel = config.getChannel(); + RabbitMQOptions options = RabbitMQClientHelper.buildClientOptions(this, config); + String connectionName = options.getConnectionName(); + String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options); + String existing = connectionFingerprints.putIfAbsent(connectionName, fingerprint); + if (existing != null && !existing.equals(fingerprint)) { + throw ex.illegalStateSharedConnectionConfigMismatch(connectionName); + } + return clients.compute(fingerprint, + (key, current) -> (current == null ? new ClientHolder(RabbitMQClient.create(vertx(), options)) : current) + .retain(channel)); + } + + public void releaseClient(String channel, boolean await) { + for (var e : clients.entrySet()) { + ClientHolder shared = e.getValue(); + if (shared.channels().contains(channel)) { + if (clients.computeIfPresent(e.getKey(), (k, c) -> c.release(channel) ? null : c) == null) { + connectionFingerprints.values().remove(e.getKey()); + stopClient(shared.client(), await); + } + return; + } + } + } + + private void stopClient(RabbitMQClient client, boolean await) { + if (client == null) { + return; + } + if (await) { + client.stopAndAwait(); + } else { + client.stopAndForget(); + } + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java index 2ca6b0c0e1..2a47f36a4c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java @@ -41,4 +41,7 @@ public interface RabbitMQExceptions { @Message(id = 16009, value = "Unable to create a client, probably a config error") IllegalStateException illegalStateUnableToCreateClient(@Cause Throwable t); + + @Message(id = 16010, value = "Shared connection '%s' has mismatched configuration; ensure all channels using the same shared-connection-name have identical connection settings") + IllegalStateException illegalStateSharedConnectionConfigMismatch(String sharedConnectionName); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 6420558b84..cfd37b71a0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -43,10 +43,11 @@ public class IncomingRabbitMQChannel { private final RabbitMQOpenTelemetryInstrumenter instrumenter; private final AtomicReference subscription = new AtomicReference<>(); - private volatile RabbitMQClient client; private final RabbitMQConnectorIncomingConfiguration config; private final Multi> stream; private final RabbitMQConnector connector; + private final Context incomingContext; + private volatile RabbitMQClient client; public IncomingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration ic, Instance openTelemetryInstance) { @@ -58,6 +59,8 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, } this.config = ic; this.connector = connector; + this.incomingContext = Context + .newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); @@ -65,7 +68,8 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, Multi> multi = createConsumer(connector, ic) .invoke(tuple -> client = tuple.getItem1().client()) // Translate all consumers into a merged stream of messages - .onItem().transformToMulti(tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), ic, onNack, onAck)); + .onItem().transformToMulti( + tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); if (ic.getBroadcast()) { multi = multi.broadcast().toAllSubscribers(); @@ -111,8 +115,8 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder private Uni> createConsumer(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration ic) { - // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic); + ClientHolder holder = connector.getClientHolder(ic); + final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { Uni uni; @@ -130,10 +134,7 @@ private Uni> createConsumer(RabbitMQConne .subscribe().with(ignored -> promise.complete(), promise::fail); }); - Context root = Context.newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); - final ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root); return holder.getOrEstablishConnection() - .invoke(() -> log.connectionEstablished(ic.getChannel())) .flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer))); } @@ -236,6 +237,7 @@ private Uni createConsumer(RabbitMQConnectorIncomingConfigurat private Multi> getStreamOfMessages( RabbitMQConsumer receiver, ClientHolder holder, + Context context, RabbitMQConnectorIncomingConfiguration ic, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) { @@ -247,8 +249,8 @@ private Multi> getStreamOfMessages( Multi> multi = receiver.toMulti() // close the consumer on stream termination .onTermination().call(receiver::cancel) - .emitOn(c -> VertxContext.runOnContext(holder.getContext().getDelegate(), c)) - .map(m -> new IncomingRabbitMQMessage<>(m, holder, onNack, onAck, contentTypeOverride)); + .emitOn(c -> VertxContext.runOnContext(context.getDelegate(), c)) + .map(m -> new IncomingRabbitMQMessage<>(m, holder, context, onNack, onAck, contentTypeOverride)); if (ic.getTracingEnabled()) { return multi.map(msg -> instrumenter.traceIncoming(msg, RabbitMQTrace.traceQueue(queueName, msg.message.envelope().getRoutingKey(), msg.getHeaders()))); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index 5302118942..a7462c763f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -26,21 +26,22 @@ public class OutgoingRabbitMQChannel { private final Flow.Subscriber> subscriber; private final RabbitMQConnectorOutgoingConfiguration config; private final ClientHolder holder; + private final RabbitMQMessageSender processor; private volatile RabbitMQPublisher publisher; public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc, Instance openTelemetryInstance) { this.config = oc; + holder = connector.getClientHolder(oc); // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc); + final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { // Ensure we create the exchange to which messages are to be sent RabbitMQClientHelper.declareExchangeIfNeeded(client, oc, connector.configMaps()) .subscribe().with((ignored) -> promise.complete(), promise::fail); }); - holder = new ClientHolder(client, oc, connector.vertx(), null); final Uni getSender = holder.getOrEstablishConnection() .onItem() .transformToUni(connection -> Uni.createFrom().item(RabbitMQPublisher.create(connector.vertx(), connection, @@ -54,10 +55,22 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut .onFailure().recoverWithNull().memoize().indefinitely(); // Set up a sender based on the publisher we established above - final RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance); + processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance); // Return a SubscriberBuilder - subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t))); + subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t)) + .onTermination().call(() -> { + RabbitMQPublisher pub = publisher; + publisher = null; + if (pub != null) { + return pub.stop() + .ifNoItem().after(Duration.ofSeconds(oc.getConnectionTimeout())).fail() + .onFailure() + .invoke(e -> log.infof(e, "Error terminating outgoing channel %s", config.getChannel())) + .onFailure().recoverWithNull(); + } + return Uni.createFrom().voidItem(); + })); } public Flow.Subscriber> getSubscriber() { @@ -95,12 +108,6 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder } public void terminate() { - if (publisher != null) { - try { - publisher.stop().await().atMost(Duration.ofMillis(config.getConnectionTimeout())); - } catch (Exception e) { - log.infof(e, "Error terminating outgoing channel %s", config.getChannel()); - } - } + processor.cancel(); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java index 79ba298c1f..70143fbbce 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java @@ -7,6 +7,9 @@ import static io.vertx.core.net.ClientOptionsBase.DEFAULT_METRICS_NAME; import static java.time.Duration.ofSeconds; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.*; import java.util.stream.Collectors; @@ -42,29 +45,44 @@ private RabbitMQClientHelper() { // avoid direct instantiation. } - static RabbitMQClient createClient(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { - Optional clientOptionsName = config.getClientOptionsName(); - Vertx vertx = connector.vertx(); - RabbitMQOptions options; + public static RabbitMQClient createClient(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { try { - if (clientOptionsName.isPresent()) { - options = getClientOptionsFromBean(connector.clientOptions(), clientOptionsName.get()); - } else { - options = getClientOptions(vertx, config, connector.credentialsProviders()); - } - if (DEFAULT_METRICS_NAME.equals(options.getMetricsName())) { - options.setMetricsName("rabbitmq|" + config.getChannel()); - } - RabbitMQOptions intercepted = ConfigUtils.customize(config.config(), connector.configCustomizers(), options); - RabbitMQClient client = RabbitMQClient.create(vertx, intercepted); - connector.registerClient(config.getChannel(), client); - return client; + RabbitMQOptions options = buildClientOptions(connector, config); + return RabbitMQClient.create(connector.vertx(), options); } catch (Exception e) { log.unableToCreateClient(e); throw ex.illegalStateUnableToCreateClient(e); } } + public static RabbitMQOptions buildClientOptions(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { + Optional clientOptionsName = config.getClientOptionsName(); + Vertx vertx = connector.vertx(); + RabbitMQOptions options; + String connectionLabel = config.getSharedConnectionName().orElse(config.getChannel()); + if (clientOptionsName.isPresent()) { + options = getClientOptionsFromBean(connector.clientOptions(), clientOptionsName.get()); + } else { + options = getClientOptions(vertx, config, connector.credentialsProviders()); + } + if (DEFAULT_METRICS_NAME.equals(options.getMetricsName())) { + options.setMetricsName("rabbitmq|" + connectionLabel); + } + if (options.getConnectionName() == null || options.getConnectionName().isEmpty()) { + options.setConnectionName(resolveConnectionName(config)); + } + return ConfigUtils.customize(config.config(), connector.configCustomizers(), options); + } + + public static String computeConnectionFingerprint(RabbitMQOptions options) { + JsonObject json = options.toJson(); + List
addresses = options.getAddresses(); + if (addresses != null) { + json.put("addresses", addresses.stream().map(Address::toString).collect(Collectors.toList())); + } + return sha256(json.encode()); + } + static RabbitMQOptions getClientOptionsFromBean(Instance options, String optionsBeanName) { options = options.select(Identifier.Literal.of(optionsBeanName)); if (options.isUnsatisfied()) { @@ -83,9 +101,7 @@ static RabbitMQOptions getClientOptionsFromBean(Instance option static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConfiguration config, Instance credentialsProviders) { - String connectionName = String.format("%s (%s)", - config.getChannel(), - config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing"); + String connectionName = resolveConnectionName(config); List
addresses = config.getAddresses() .map(s -> Arrays.asList(Address.parseAddresses(s))) .orElseGet(() -> Collections.singletonList(new Address(config.getHost(), config.getPort()))); @@ -160,6 +176,27 @@ static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConf return options; } + private static String resolveConnectionName(RabbitMQConnectorCommonConfiguration config) { + return config.getSharedConnectionName() + .orElseGet(() -> String.format("%s (%s)", + config.getChannel(), + config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing")); + } + + private static String sha256(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder hex = new StringBuilder(hash.length * 2); + for (byte b : hash) { + hex.append(String.format("%02x", b)); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("Unable to compute SHA-256 hash", e); + } + } + public static String serverQueueName(String name) { if (name.equals("(server.auto)")) { return ""; diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java new file mode 100644 index 0000000000..ee2c2f7823 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java @@ -0,0 +1,69 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class DualIncomingContextBean { + + private final CountDownLatch latch1 = new CountDownLatch(1); + private final CountDownLatch latch2 = new CountDownLatch(1); + private final AtomicReference context1 = new AtomicReference<>(); + private final AtomicReference context2 = new AtomicReference<>(); + private final AtomicReference eventLoop1 = new AtomicReference<>(); + private final AtomicReference eventLoop2 = new AtomicReference<>(); + + @Incoming("data1") + public Uni consume1(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + context1.set(context); + eventLoop1.set(context.isEventLoopContext()); + }); + latch1.countDown(); + return Uni.createFrom().voidItem(); + } + + @Incoming("data2") + public Uni consume2(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + context2.set(context); + eventLoop2.set(context.isEventLoopContext()); + }); + latch2.countDown(); + return Uni.createFrom().voidItem(); + } + + public boolean awaitMessages(long timeout, TimeUnit unit) throws InterruptedException { + return latch1.await(timeout, unit) && latch2.await(timeout, unit); + } + + public Context getContext1() { + return context1.get(); + } + + public Context getContext2() { + return context2.get(); + } + + public boolean isEventLoop1() { + Boolean value = eventLoop1.get(); + return value != null && value; + } + + public boolean isEventLoop2() { + Boolean value = eventLoop2.get(); + return value != null && value; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java new file mode 100644 index 0000000000..46eaa7214b --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java @@ -0,0 +1,46 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class IncomingContextBean { + + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicReference messageContext = new AtomicReference<>(); + private final AtomicReference eventLoopContext = new AtomicReference<>(); + + @Incoming("data") + public Uni consume(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + messageContext.set(context); + eventLoopContext.set(context.isEventLoopContext()); + }); + latch.countDown(); + return Uni.createFrom().voidItem(); + } + + public boolean awaitMessage(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + public Context getMessageContext() { + return messageContext.get(); + } + + public boolean isEventLoopContext() { + Boolean value = eventLoopContext.get(); + return value != null && value; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java index 71d22eeb56..4a9ee544e3 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java @@ -47,7 +47,7 @@ public void testDoubleAckBehavior() { Exception nackReason = new Exception("test"); - IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), + IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), null, doNothingNack, doNothingAck, "text/plain"); @@ -67,7 +67,7 @@ public void testDoubleNackBehavior() { Exception nackReason = new Exception("test"); - IncomingRabbitMQMessage nackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), + IncomingRabbitMQMessage nackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), null, doNothingNack, doNothingAck, "text/plain"); @@ -87,7 +87,7 @@ void testConvertPayloadFallback() { RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); IncomingRabbitMQMessage incomingRabbitMQMessage = new IncomingRabbitMQMessage<>(msg, - mock(ClientHolder.class), + mock(ClientHolder.class), null, doNothingNack, doNothingAck, null); assertThat(incomingRabbitMQMessage.getPayload()).isEqualTo(payloadBuffer); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java index 3367006052..49a6b708b9 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java @@ -2,9 +2,10 @@ import jakarta.enterprise.context.ApplicationScoped; -import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; +import io.smallrye.mutiny.Multi; + /** * A bean that can be registered to do just enough to support the * declaration of an exchange backing an outgoing rabbitmq channel. @@ -13,8 +14,8 @@ public class OutgoingBean { @Outgoing("sink") - public Message process() { - return Message.of("test"); + public Multi process() { + return Multi.createFrom().items("test", "test2", "test3"); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java index 7aef4925d4..2a0fd5b30a 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java @@ -55,7 +55,7 @@ private Proxy createContainerProxy(ToxiproxyContainer toxiproxy, int toxiPort) { } } - @Test + @Test // 15s void testSendingMessagesToRabbitMQ_connection_fails() { final String routingKey = "normal"; @@ -101,7 +101,7 @@ void testSendingMessagesToRabbitMQ_connection_fails() { } } - @Test + @Test // 17s void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { final String routingKey = "normal"; @@ -147,10 +147,95 @@ void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { } } + @Test + void testSharedConnectionReconnectionPreservesContext() { + final String routingKey = "shared"; + try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") + .asCompatibleSubstituteFor("shopify/toxiproxy")) + .withNetworkAliases("toxiproxy")) { + toxiproxy.withNetwork(Network.SHARED); + toxiproxy.start(); + await().until(toxiproxy::isRunning); + + List exposedPorts = toxiproxy.getExposedPorts(); + int toxiPort = exposedPorts.get(exposedPorts.size() - 1); + Proxy proxy = createContainerProxy(toxiproxy, toxiPort); + int exposedPort = toxiproxy.getMappedPort(toxiPort); + + weld.addBeanClass(ReconnectingContextBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.queue.durable", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", toxiproxy.getHost()) + .put("mp.messaging.incoming.data.port", exposedPort) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", toxiproxy.getHost()) + .put("mp.messaging.outgoing.sink.port", exposedPort) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-interval", 1) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + ReconnectingContextBean bean = get(container, ReconnectingContextBean.class); + + // Wait for at least one message before disconnect (from OutgoingBean) + await().atMost(1, TimeUnit.MINUTES).until(() -> !bean.getContexts().isEmpty()); + + // Verify pre-disconnect messages have event loop context + assertThat(bean.getEventLoopFlags().get(0)).isTrue(); + + int preDisconnectCount = bean.getContexts().size(); + + // Disconnect + proxy.disable(); + await().pollDelay(3, SECONDS).until(() -> !isRabbitMQConnectorAvailable(container)); + + // Reconnect + proxy.enable(); + await().atMost(1, TimeUnit.MINUTES).until(() -> isRabbitMQConnectorAvailable(container)); + + // Send messages after reconnection via the direct RabbitMQ client + AtomicInteger counter = new AtomicInteger(); + usage.produce(exchangeName, queueName, routingKey, 3, counter::getAndIncrement); + + // Wait for at least one more message after reconnection + await().atMost(1, TimeUnit.MINUTES).until(() -> bean.getContexts().size() > preDisconnectCount); + + // Verify post-reconnection messages also have event loop context. + // This should fail because the reconnection Uni closure captures the original + // null root parameter instead of reading rootContext.get(), so after reconnect + // the context falls back to Vertx.currentContext() which may not be an event loop. + List postReconnectFlags = bean.getEventLoopFlags() + .subList(preDisconnectCount, bean.getEventLoopFlags().size()); + assertThat(postReconnectFlags) + .as("After reconnection, all messages should still have event loop context") + .doesNotContain(false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Verifies that messages can be received from RabbitMQ. */ - @Test + @Test // 14s void testReceivingMessagesFromRabbitMQ_connection_fails() { final String routingKey = "xyzzy"; try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index b79c77c999..8f081d66a5 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.rabbitmq; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import java.util.Comparator; @@ -198,6 +199,227 @@ void testIncomingDeclarations() throws Exception { assertThat(binding2.getString("routing_key")).isEqualTo("urgent"); } + @Test + void testSharedConnectionIncomingAndOutgoingStartup() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + } + + @Test + void testSharedConnectionIncomingUsesEventLoopContext() throws InterruptedException { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingContextBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + IncomingContextBean bean = get(container, IncomingContextBean.class); + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> name != null && name.startsWith("shared-connection")) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + + usage.produce(exchangeName, queueName, routingKey, 1, () -> 1); + + assertThat(bean.awaitMessage(1, TimeUnit.MINUTES)).isTrue(); + assertThat(bean.getMessageContext()).isNotNull(); + assertThat(bean.isEventLoopContext()).isTrue(); + } + + @Test + void testSharedConnectionNameIsNotSuffixed() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> "shared-connection".equals(name)) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + } + + @Test + void testDefaultConnectionNameIncludesDirection() { + final String routingKey = "default"; + + weld.addBeanClass(IncomingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + boolean hasDefaultName = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .anyMatch(name -> "data (Incoming)".equals(name)); + assertThat(hasDefaultName).isTrue(); + }); + } + + private static String getConnectionName(JsonObject connection) { + String connectionName = connection.getString("connection_name"); + if (connectionName != null) { + return connectionName; + } + + JsonObject properties = connection.getJsonObject("client_properties"); + if (properties == null) { + return null; + } + + return properties.getString("connection_name"); + } + + @Test + void testSharedConnectionConfigMismatchFailsStartup() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", "some-other-host") + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + assertThatThrownBy(() -> container = weld.initialize()) + .isInstanceOf(Exception.class) + .hasStackTraceContaining("mismatched configuration"); + } + /** * Verifies that Exchanges, Queues and Bindings are correctly declared as a result of * incoming connector configuration that specifies DLQ/DLX overrides. @@ -850,4 +1072,100 @@ void testConsumerArguments() { }); } + @Test + void testSharedConnectionMultipleIncomingChannelsGetDistinctContexts() throws InterruptedException { + final String routingKey1 = "ctx1"; + final String routingKey2 = "ctx2"; + final String queueName2 = queueName + "-2"; + + weld.addBeanClass(DualIncomingContextBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data1.exchange.name", exchangeName) + .put("mp.messaging.incoming.data1.exchange.declare", true) + .put("mp.messaging.incoming.data1.queue.name", queueName) + .put("mp.messaging.incoming.data1.queue.declare", true) + .put("mp.messaging.incoming.data1.routing-keys", routingKey1) + .put("mp.messaging.incoming.data1.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data1.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data1.host", host) + .put("mp.messaging.incoming.data1.port", port) + .put("mp.messaging.incoming.data1.tracing.enabled", false) + .put("mp.messaging.incoming.data2.exchange.name", exchangeName) + .put("mp.messaging.incoming.data2.exchange.declare", true) + .put("mp.messaging.incoming.data2.queue.name", queueName2) + .put("mp.messaging.incoming.data2.queue.declare", true) + .put("mp.messaging.incoming.data2.routing-keys", routingKey2) + .put("mp.messaging.incoming.data2.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data2.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data2.host", host) + .put("mp.messaging.incoming.data2.port", port) + .put("mp.messaging.incoming.data2.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().atMost(1, TimeUnit.MINUTES).until(() -> isRabbitMQConnectorAvailable(container)); + + DualIncomingContextBean bean = get(container, DualIncomingContextBean.class); + + usage.produce(exchangeName, queueName, routingKey1, 1, () -> 1); + usage.produce(exchangeName, queueName2, routingKey2, 1, () -> 2); + + assertThat(bean.awaitMessages(1, TimeUnit.MINUTES)).isTrue(); + + assertThat(bean.isEventLoop1()).isTrue(); + assertThat(bean.isEventLoop2()).isTrue(); + assertThat(bean.getContext1()).isNotSameAs(bean.getContext2()); + + // Verify single shared connection + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> name != null && name.startsWith("shared-connection")) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + } + + @Test + void testNonSharedIncomingUsesEventLoopContext() throws InterruptedException { + final String routingKey = "nonshared"; + + weld.addBeanClass(IncomingContextBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + IncomingContextBean bean = get(container, IncomingContextBean.class); + + usage.produce(exchangeName, queueName, routingKey, 1, () -> 1); + + assertThat(bean.awaitMessage(1, TimeUnit.MINUTES)).isTrue(); + assertThat(bean.getMessageContext()).isNotNull(); + assertThat(bean.isEventLoopContext()).isTrue(); + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java index 19768102c1..8c9b3480b3 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java @@ -277,6 +277,26 @@ public JsonArray getBindings(final String exchangeName, final String queueName) } } + /** + * Returns the list of active connections. + * + * @return a {@link JsonArray} of connection descriptions + * @throws IOException if an error occurs + */ + public JsonArray getConnections() throws IOException { + final URL url = new URL(String.format("http://%s:%d/api/connections", options.getHost(), managementPort)); + final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty("Authorization", "Basic " + getBasicAuth()); + conn.connect(); + + if (conn.getResponseCode() == 200) { + final String jsonString = getResponseString(conn); + return new JsonArray(jsonString); + } else { + return null; + } + } + private JsonObject getObjectByTypeAndName(final String objectType, final String objectName) throws IOException { final URL url = new URL(String.format("http://%s:%d/api/%s/%%2F/%s", options.getHost(), managementPort, objectType, objectName)); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java new file mode 100644 index 0000000000..766065cf6b --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java @@ -0,0 +1,50 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class ReconnectingContextBean { + + private final CopyOnWriteArrayList contexts = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList eventLoopFlags = new CopyOnWriteArrayList<>(); + private volatile CountDownLatch latch = new CountDownLatch(1); + + @Incoming("data") + public Uni consume(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + contexts.add(context); + eventLoopFlags.add(context.isEventLoopContext()); + }); + latch.countDown(); + return Uni.createFrom().voidItem(); + } + + public void setExpectedMessages(int count) { + latch = new CountDownLatch(count); + } + + public boolean awaitMessages(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + public List getContexts() { + return contexts; + } + + public List getEventLoopFlags() { + return eventLoopFlags; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java new file mode 100644 index 0000000000..37d67bc1b2 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java @@ -0,0 +1,91 @@ +package io.smallrye.reactive.messaging.rabbitmq.internals; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.rabbitmq.client.Address; + +import io.vertx.rabbitmq.RabbitMQOptions; + +class RabbitMQClientHelperTest { + + @Test + void testIdenticalOptionsProduceSameFingerprint() { + RabbitMQOptions options1 = new RabbitMQOptions() + .setHost("localhost") + .setPort(5672) + .setUser("guest") + .setPassword("guest") + .setVirtualHost("/"); + + RabbitMQOptions options2 = new RabbitMQOptions() + .setHost("localhost") + .setPort(5672) + .setUser("guest") + .setPassword("guest") + .setVirtualHost("/"); + + String fingerprint1 = RabbitMQClientHelper.computeConnectionFingerprint(options1); + String fingerprint2 = RabbitMQClientHelper.computeConnectionFingerprint(options2); + + assertThat(fingerprint1).isEqualTo(fingerprint2); + } + + @Test + void testDifferentHostsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("host-a").setPort(5672); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("host-b").setPort(5672); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentAddressesProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setAddresses(List.of(new Address("host-a", 5672))); + RabbitMQOptions options2 = new RabbitMQOptions().setAddresses(List.of(new Address("host-a", 5673))); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentPortsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setPort(5672); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setPort(5673); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentUsersProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setUser("alice"); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setUser("bob"); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentVirtualHostsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setVirtualHost("/"); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setVirtualHost("/staging"); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentSslProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setSsl(false); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setSsl(true); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + +}