diff --git a/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md b/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md index c75337dcd3..b500c7d65b 100644 --- a/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md +++ b/documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md @@ -13,3 +13,31 @@ connector. You need to indicate the name of the client using the `client-options-name` attribute: mp.messaging.incoming.prices.client-options-name=my-named-options + +## Shared connections + +By default, each channel opens its own connection to the RabbitMQ +broker. If your application has multiple channels connecting to the same +broker, you can configure them to share a single underlying connection +using the `shared-connection-name` attribute: + +``` properties +mp.messaging.incoming.orders.connector=smallrye-rabbitmq +mp.messaging.incoming.orders.shared-connection-name=my-connection + +mp.messaging.outgoing.confirmations.connector=smallrye-rabbitmq +mp.messaging.outgoing.confirmations.shared-connection-name=my-connection +``` + +In the above example, the `orders` incoming channel and the +`confirmations` outgoing channel share the same RabbitMQ connection. + +All channels sharing a connection name **must** use identical connection +options (host, port, credentials, SSL settings, virtual host, etc.). +If two channels declare the same `shared-connection-name` but have +different connection options, the connector throws an +`IllegalStateException` at startup. + +Shared connections are useful when your application has many channels +connecting to the same broker and you want to reduce the number of TCP +connections, or when the broker imposes connection limits. diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index 5c9ef286a2..eae70ea412 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -3,7 +3,10 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -13,55 +16,34 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; import io.vertx.mutiny.core.Context; -import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; public class ClientHolder { private final RabbitMQClient client; - private final AtomicBoolean connected = new AtomicBoolean(false); - private final AtomicReference connectionHolder = new AtomicReference<>(); + private final AtomicBoolean hasBeenConnected = new AtomicBoolean(false); + private final AtomicReference> ongoingConnection = new AtomicReference<>(); private final Uni connection; + private final Set channels = ConcurrentHashMap.newKeySet(); - private final Vertx vertx; - - public ClientHolder(RabbitMQClient client, - RabbitMQConnectorCommonConfiguration configuration, - Vertx vertx, - Context root) { + public ClientHolder(RabbitMQClient client) { this.client = client; - this.vertx = vertx; this.connection = Uni.createFrom().deferred(() -> client.start() .onSubscription().invoke(() -> { - connected.set(true); - log.connectionEstablished(configuration.getChannel()); + hasBeenConnected.set(true); + log.connectionEstablished(String.join(", ", channels)); }) .onItem().transform(ignored -> { - connectionHolder - .set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root)); - // handle the case we are already disconnected. - if (!client.isConnected() || connectionHolder.get() == null) { + if (!client.isConnected()) { // Throwing the exception would trigger a retry. - connectionHolder.set(null); throw ex.illegalStateConnectionDisconnected(); } return client; }) - .onFailure().invoke(log::unableToConnectToBroker) - .onFailure().invoke(t -> { - connectionHolder.set(null); - log.unableToRecoverFromConnectionDisruption(t); - })) - .memoize().until(() -> { - CurrentConnection connection = connectionHolder.get(); - if (connection == null) { - return true; - } - return !connection.client.isConnected(); - }); - + .onFailure().invoke(log::unableToConnectToBroker)) + .memoize().until(() -> !client.isConnected()); } public static CompletionStage runOnContext(Context context, IncomingRabbitMQMessage msg, @@ -80,21 +62,12 @@ public static CompletionStage runOnContextAndReportFailure(Context context }); } - public Context getContext() { - CurrentConnection connection = connectionHolder.get(); - if (connection != null) { - return connection.context; - } else { - return null; - } - } - public RabbitMQClient client() { return client; } public boolean hasBeenConnected() { - return connected.get(); + return hasBeenConnected.get(); } @CheckReturnValue @@ -106,24 +79,46 @@ public Function> getNack(final long deliveryTag, final bool return t -> client.basicNack(deliveryTag, false, requeue); } - public Vertx getVertx() { - return vertx; - } - @CheckReturnValue public Uni getOrEstablishConnection() { - return connection; + return Uni.createFrom().deferred(this::establishConnection); } - private static class CurrentConnection { - - final RabbitMQClient client; - final Context context; + private Uni establishConnection() { + CompletableFuture existing = ongoingConnection.get(); + if (existing != null) { + if (!existing.isDone() || client.isConnected()) { + return Uni.createFrom().completionStage(existing); + } + ongoingConnection.compareAndSet(existing, null); + } - private CurrentConnection(RabbitMQClient client, Context context) { - this.client = client; - this.context = context; + CompletableFuture placeholder = new CompletableFuture<>(); + CompletableFuture current = ongoingConnection.compareAndExchange(null, placeholder); + if (current != null) { + return Uni.createFrom().completionStage(current); } + connection.subscribe().with(placeholder::complete, placeholder::completeExceptionally); + placeholder.whenComplete((result, error) -> { + if (error != null) { + ongoingConnection.compareAndSet(placeholder, null); + } + }); + return Uni.createFrom().completionStage(placeholder); + } + + public Set channels() { + return channels; + } + + public ClientHolder retain(String channel) { + channels.add(channel); + return this; + } + + public boolean release(String channel) { + channels.remove(channel); + return channels.isEmpty(); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index f795b37f17..dbcc592b3d 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -60,18 +60,18 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Meta private final String contentTypeOverride; private final T payload; - public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, + public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Context context, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { - this(delegate.getDelegate(), holder, onNack, onAck, contentTypeOverride); + this(delegate.getDelegate(), holder, context, onNack, onAck, contentTypeOverride); } - IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, + IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, Context context, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) { this.message = msg; this.deliveryTag = msg.envelope().getDeliveryTag(); this.holder = holder; - this.context = holder.getContext(); + this.context = context; this.contentTypeOverride = contentTypeOverride; this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride); this.onNack = onNack; @@ -134,17 +134,6 @@ public void acknowledgeMessage() { holder.getAck(this.deliveryTag).subscribeAsCompletionStage(); } - /** - * Rejects the message by nack'ing with requeue=false; this will either discard the message for good or - * (if a DLQ has been set up) send it to the DLQ. - * - * @param reason the cause of the rejection, which must not be null - */ - public void rejectMessage(Throwable reason) { - this.rejectMessage(reason, false); - holder.getNack(this.deliveryTag, false).apply(reason).subscribeAsCompletionStage(); - } - /** * Rejects the message by nack'ing it. *

diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index bd02070c57..392a692343 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -1,6 +1,9 @@ package io.smallrye.reactive.messaging.rabbitmq; -import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.*; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING; +import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING; +import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; import java.util.List; @@ -38,6 +41,7 @@ import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import io.smallrye.reactive.messaging.rabbitmq.internals.IncomingRabbitMQChannel; import io.smallrye.reactive.messaging.rabbitmq.internals.OutgoingRabbitMQChannel; +import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; import io.vertx.rabbitmq.RabbitMQOptions; @@ -64,6 +68,7 @@ @ConnectorAttribute(name = "reconnect-interval", direction = INCOMING_AND_OUTGOING, description = "The interval (in seconds) between two reconnection attempts", type = "int", alias = "rabbitmq-reconnect-interval", defaultValue = "10") @ConnectorAttribute(name = "network-recovery-interval", direction = INCOMING_AND_OUTGOING, description = "How long (ms) will automatic recovery wait before attempting to reconnect", type = "int", defaultValue = "5000") @ConnectorAttribute(name = "user", direction = INCOMING_AND_OUTGOING, description = "The user name to use when connecting to the broker", type = "string", defaultValue = "guest") +@ConnectorAttribute(name = "shared-connection-name", direction = INCOMING_AND_OUTGOING, description = "Optional identifier allowing multiple channels to share the same RabbitMQ connection when set to the same value", type = "string") @ConnectorAttribute(name = "include-properties", direction = INCOMING_AND_OUTGOING, description = "Whether to include properties when a broker message is passed on the event bus", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "requested-channel-max", direction = INCOMING_AND_OUTGOING, description = "The initially requested maximum channel number", type = "int", defaultValue = "2047") @ConnectorAttribute(name = "requested-heartbeat", direction = INCOMING_AND_OUTGOING, description = "The initially requested heartbeat interval (seconds), zero for none", type = "int", defaultValue = "60") @@ -162,9 +167,11 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H @Inject @Any Instance failureHandlerFactories; - private List incomings = new CopyOnWriteArrayList<>(); - private List outgoings = new CopyOnWriteArrayList<>(); - private Map clients = new ConcurrentHashMap<>(); + private final List incomings = new CopyOnWriteArrayList<>(); + private final List outgoings = new CopyOnWriteArrayList<>(); + private final Map clients = new ConcurrentHashMap<>(); + // connection-name to fingerprint map to check against same connection-name but different options + private final Map connectionFingerprints = new ConcurrentHashMap<>(); @Inject @Any @@ -263,28 +270,20 @@ public void terminate( outgoing.terminate(); } - clients.forEach((channel, rabbitMQClient) -> rabbitMQClient.stopAndAwait()); + for (Map.Entry entry : clients.entrySet()) { + stopClient(entry.getValue().client(), true); + } clients.clear(); + connectionFingerprints.clear(); } public Vertx vertx() { return executionHolder.vertx(); } - public void registerClient(String channel, RabbitMQClient client) { - RabbitMQClient old = clients.put(channel, client); - if (old != null) { - old.stopAndForget(); - } - } - public void reportIncomingFailure(String channel, Throwable reason) { log.failureReported(channel, reason); - RabbitMQClient client = clients.remove(channel); - if (client != null) { - // Called on vertx context, we can't block: stop clients without waiting - client.stopAndForget(); - } + releaseClient(channel, false); } public Instance failureHandlerFactories() { @@ -306,4 +305,43 @@ public Instance credentialsProviders() { public Instance> configMaps() { return configMaps; } + + public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config) { + String channel = config.getChannel(); + RabbitMQOptions options = RabbitMQClientHelper.buildClientOptions(this, config); + String connectionName = options.getConnectionName(); + String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options); + String existing = connectionFingerprints.putIfAbsent(connectionName, fingerprint); + if (existing != null && !existing.equals(fingerprint)) { + throw ex.illegalStateSharedConnectionConfigMismatch(connectionName); + } + return clients.compute(fingerprint, + (key, current) -> (current == null ? new ClientHolder(RabbitMQClient.create(vertx(), options)) : current) + .retain(channel)); + } + + public void releaseClient(String channel, boolean await) { + for (var e : clients.entrySet()) { + ClientHolder shared = e.getValue(); + if (shared.channels().contains(channel)) { + if (clients.computeIfPresent(e.getKey(), (k, c) -> c.release(channel) ? null : c) == null) { + connectionFingerprints.values().remove(e.getKey()); + stopClient(shared.client(), await); + } + return; + } + } + } + + private void stopClient(RabbitMQClient client, boolean await) { + if (client == null) { + return; + } + if (await) { + client.stopAndAwait(); + } else { + client.stopAndForget(); + } + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java index 2ca6b0c0e1..2a47f36a4c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java @@ -41,4 +41,7 @@ public interface RabbitMQExceptions { @Message(id = 16009, value = "Unable to create a client, probably a config error") IllegalStateException illegalStateUnableToCreateClient(@Cause Throwable t); + + @Message(id = 16010, value = "Shared connection '%s' has mismatched configuration; ensure all channels using the same shared-connection-name have identical connection settings") + IllegalStateException illegalStateSharedConnectionConfigMismatch(String sharedConnectionName); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 6420558b84..cfd37b71a0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -43,10 +43,11 @@ public class IncomingRabbitMQChannel { private final RabbitMQOpenTelemetryInstrumenter instrumenter; private final AtomicReference subscription = new AtomicReference<>(); - private volatile RabbitMQClient client; private final RabbitMQConnectorIncomingConfiguration config; private final Multi> stream; private final RabbitMQConnector connector; + private final Context incomingContext; + private volatile RabbitMQClient client; public IncomingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration ic, Instance openTelemetryInstance) { @@ -58,6 +59,8 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, } this.config = ic; this.connector = connector; + this.incomingContext = Context + .newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); @@ -65,7 +68,8 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, Multi> multi = createConsumer(connector, ic) .invoke(tuple -> client = tuple.getItem1().client()) // Translate all consumers into a merged stream of messages - .onItem().transformToMulti(tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), ic, onNack, onAck)); + .onItem().transformToMulti( + tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); if (ic.getBroadcast()) { multi = multi.broadcast().toAllSubscribers(); @@ -111,8 +115,8 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder private Uni> createConsumer(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration ic) { - // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic); + ClientHolder holder = connector.getClientHolder(ic); + final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { Uni uni; @@ -130,10 +134,7 @@ private Uni> createConsumer(RabbitMQConne .subscribe().with(ignored -> promise.complete(), promise::fail); }); - Context root = Context.newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); - final ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root); return holder.getOrEstablishConnection() - .invoke(() -> log.connectionEstablished(ic.getChannel())) .flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer))); } @@ -236,6 +237,7 @@ private Uni createConsumer(RabbitMQConnectorIncomingConfigurat private Multi> getStreamOfMessages( RabbitMQConsumer receiver, ClientHolder holder, + Context context, RabbitMQConnectorIncomingConfiguration ic, RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) { @@ -247,8 +249,8 @@ private Multi> getStreamOfMessages( Multi> multi = receiver.toMulti() // close the consumer on stream termination .onTermination().call(receiver::cancel) - .emitOn(c -> VertxContext.runOnContext(holder.getContext().getDelegate(), c)) - .map(m -> new IncomingRabbitMQMessage<>(m, holder, onNack, onAck, contentTypeOverride)); + .emitOn(c -> VertxContext.runOnContext(context.getDelegate(), c)) + .map(m -> new IncomingRabbitMQMessage<>(m, holder, context, onNack, onAck, contentTypeOverride)); if (ic.getTracingEnabled()) { return multi.map(msg -> instrumenter.traceIncoming(msg, RabbitMQTrace.traceQueue(queueName, msg.message.envelope().getRoutingKey(), msg.getHeaders()))); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index 5302118942..a7462c763f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -26,21 +26,22 @@ public class OutgoingRabbitMQChannel { private final Flow.Subscriber> subscriber; private final RabbitMQConnectorOutgoingConfiguration config; private final ClientHolder holder; + private final RabbitMQMessageSender processor; private volatile RabbitMQPublisher publisher; public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc, Instance openTelemetryInstance) { this.config = oc; + holder = connector.getClientHolder(oc); // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc); + final RabbitMQClient client = holder.client(); client.getDelegate().addConnectionEstablishedCallback(promise -> { // Ensure we create the exchange to which messages are to be sent RabbitMQClientHelper.declareExchangeIfNeeded(client, oc, connector.configMaps()) .subscribe().with((ignored) -> promise.complete(), promise::fail); }); - holder = new ClientHolder(client, oc, connector.vertx(), null); final Uni getSender = holder.getOrEstablishConnection() .onItem() .transformToUni(connection -> Uni.createFrom().item(RabbitMQPublisher.create(connector.vertx(), connection, @@ -54,10 +55,22 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut .onFailure().recoverWithNull().memoize().indefinitely(); // Set up a sender based on the publisher we established above - final RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance); + processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance); // Return a SubscriberBuilder - subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t))); + subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t)) + .onTermination().call(() -> { + RabbitMQPublisher pub = publisher; + publisher = null; + if (pub != null) { + return pub.stop() + .ifNoItem().after(Duration.ofSeconds(oc.getConnectionTimeout())).fail() + .onFailure() + .invoke(e -> log.infof(e, "Error terminating outgoing channel %s", config.getChannel())) + .onFailure().recoverWithNull(); + } + return Uni.createFrom().voidItem(); + })); } public Flow.Subscriber> getSubscriber() { @@ -95,12 +108,6 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder } public void terminate() { - if (publisher != null) { - try { - publisher.stop().await().atMost(Duration.ofMillis(config.getConnectionTimeout())); - } catch (Exception e) { - log.infof(e, "Error terminating outgoing channel %s", config.getChannel()); - } - } + processor.cancel(); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java index 79ba298c1f..b2f993114f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java @@ -7,6 +7,9 @@ import static io.vertx.core.net.ClientOptionsBase.DEFAULT_METRICS_NAME; import static java.time.Duration.ofSeconds; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.*; import java.util.stream.Collectors; @@ -42,27 +45,32 @@ private RabbitMQClientHelper() { // avoid direct instantiation. } - static RabbitMQClient createClient(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { + public static RabbitMQOptions buildClientOptions(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { Optional clientOptionsName = config.getClientOptionsName(); Vertx vertx = connector.vertx(); RabbitMQOptions options; - try { - if (clientOptionsName.isPresent()) { - options = getClientOptionsFromBean(connector.clientOptions(), clientOptionsName.get()); - } else { - options = getClientOptions(vertx, config, connector.credentialsProviders()); - } - if (DEFAULT_METRICS_NAME.equals(options.getMetricsName())) { - options.setMetricsName("rabbitmq|" + config.getChannel()); - } - RabbitMQOptions intercepted = ConfigUtils.customize(config.config(), connector.configCustomizers(), options); - RabbitMQClient client = RabbitMQClient.create(vertx, intercepted); - connector.registerClient(config.getChannel(), client); - return client; - } catch (Exception e) { - log.unableToCreateClient(e); - throw ex.illegalStateUnableToCreateClient(e); + String connectionLabel = config.getSharedConnectionName().orElse(config.getChannel()); + if (clientOptionsName.isPresent()) { + options = getClientOptionsFromBean(connector.clientOptions(), clientOptionsName.get()); + } else { + options = getClientOptions(vertx, config, connector.credentialsProviders()); } + if (DEFAULT_METRICS_NAME.equals(options.getMetricsName())) { + options.setMetricsName("rabbitmq|" + connectionLabel); + } + if (options.getConnectionName() == null || options.getConnectionName().isEmpty()) { + options.setConnectionName(resolveConnectionName(config)); + } + return ConfigUtils.customize(config.config(), connector.configCustomizers(), options); + } + + public static String computeConnectionFingerprint(RabbitMQOptions options) { + JsonObject json = options.toJson(); + List

addresses = options.getAddresses(); + if (addresses != null) { + json.put("addresses", addresses.stream().map(Address::toString).collect(Collectors.toList())); + } + return sha256(json.encode()); } static RabbitMQOptions getClientOptionsFromBean(Instance options, String optionsBeanName) { @@ -83,9 +91,7 @@ static RabbitMQOptions getClientOptionsFromBean(Instance option static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConfiguration config, Instance credentialsProviders) { - String connectionName = String.format("%s (%s)", - config.getChannel(), - config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing"); + String connectionName = resolveConnectionName(config); List
addresses = config.getAddresses() .map(s -> Arrays.asList(Address.parseAddresses(s))) .orElseGet(() -> Collections.singletonList(new Address(config.getHost(), config.getPort()))); @@ -160,6 +166,27 @@ static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConf return options; } + private static String resolveConnectionName(RabbitMQConnectorCommonConfiguration config) { + return config.getSharedConnectionName() + .orElseGet(() -> String.format("%s (%s)", + config.getChannel(), + config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing")); + } + + private static String sha256(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder hex = new StringBuilder(hash.length * 2); + for (byte b : hash) { + hex.append(String.format("%02x", b)); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("Unable to compute SHA-256 hash", e); + } + } + public static String serverQueueName(String name) { if (name.equals("(server.auto)")) { return ""; diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java new file mode 100644 index 0000000000..ee2c2f7823 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/DualIncomingContextBean.java @@ -0,0 +1,69 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class DualIncomingContextBean { + + private final CountDownLatch latch1 = new CountDownLatch(1); + private final CountDownLatch latch2 = new CountDownLatch(1); + private final AtomicReference context1 = new AtomicReference<>(); + private final AtomicReference context2 = new AtomicReference<>(); + private final AtomicReference eventLoop1 = new AtomicReference<>(); + private final AtomicReference eventLoop2 = new AtomicReference<>(); + + @Incoming("data1") + public Uni consume1(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + context1.set(context); + eventLoop1.set(context.isEventLoopContext()); + }); + latch1.countDown(); + return Uni.createFrom().voidItem(); + } + + @Incoming("data2") + public Uni consume2(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + context2.set(context); + eventLoop2.set(context.isEventLoopContext()); + }); + latch2.countDown(); + return Uni.createFrom().voidItem(); + } + + public boolean awaitMessages(long timeout, TimeUnit unit) throws InterruptedException { + return latch1.await(timeout, unit) && latch2.await(timeout, unit); + } + + public Context getContext1() { + return context1.get(); + } + + public Context getContext2() { + return context2.get(); + } + + public boolean isEventLoop1() { + Boolean value = eventLoop1.get(); + return value != null && value; + } + + public boolean isEventLoop2() { + Boolean value = eventLoop2.get(); + return value != null && value; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java new file mode 100644 index 0000000000..46eaa7214b --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingContextBean.java @@ -0,0 +1,46 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class IncomingContextBean { + + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicReference messageContext = new AtomicReference<>(); + private final AtomicReference eventLoopContext = new AtomicReference<>(); + + @Incoming("data") + public Uni consume(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + messageContext.set(context); + eventLoopContext.set(context.isEventLoopContext()); + }); + latch.countDown(); + return Uni.createFrom().voidItem(); + } + + public boolean awaitMessage(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + public Context getMessageContext() { + return messageContext.get(); + } + + public boolean isEventLoopContext() { + Boolean value = eventLoopContext.get(); + return value != null && value; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java index 71d22eeb56..d280bdac99 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java @@ -2,9 +2,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -12,13 +15,11 @@ import org.junit.jupiter.api.Test; import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Envelope; import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler; import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import io.vertx.core.buffer.Buffer; import io.vertx.mutiny.core.Context; -import io.vertx.mutiny.rabbitmq.RabbitMQMessage; public class IncomingRabbitMQMessageTest { @@ -39,15 +40,13 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Meta @Test public void testDoubleAckBehavior() { - io.vertx.rabbitmq.RabbitMQMessage mockMsg = mock(io.vertx.rabbitmq.RabbitMQMessage.class); - when(mockMsg.body()).thenReturn(Buffer.buffer()); - when(mockMsg.properties()).thenReturn(new BasicProperties()); - when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); - RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .envelope(13456L, false, "test", "test") + .build(); Exception nackReason = new Exception("test"); - IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), + IncomingRabbitMQMessage ackMsg = new IncomingRabbitMQMessage<>(testMsg, null, null, doNothingNack, doNothingAck, "text/plain"); @@ -59,15 +58,13 @@ public void testDoubleAckBehavior() { @Test public void testDoubleNackBehavior() { - io.vertx.rabbitmq.RabbitMQMessage mockMsg = mock(io.vertx.rabbitmq.RabbitMQMessage.class); - when(mockMsg.body()).thenReturn(Buffer.buffer()); - when(mockMsg.properties()).thenReturn(new BasicProperties()); - when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); - RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .envelope(13456L, false, "test", "test") + .build(); Exception nackReason = new Exception("test"); - IncomingRabbitMQMessage nackMsg = new IncomingRabbitMQMessage<>(msg, mock(ClientHolder.class), + IncomingRabbitMQMessage nackMsg = new IncomingRabbitMQMessage<>(testMsg, null, null, doNothingNack, doNothingAck, "text/plain"); @@ -79,17 +76,241 @@ public void testDoubleNackBehavior() { @Test void testConvertPayloadFallback() { - io.vertx.rabbitmq.RabbitMQMessage mockMsg = mock(io.vertx.rabbitmq.RabbitMQMessage.class); Buffer payloadBuffer = Buffer.buffer("payload"); - when(mockMsg.body()).thenReturn(payloadBuffer); - when(mockMsg.properties()).thenReturn(new BasicProperties.Builder().contentType("application/json").build()); - when(mockMsg.envelope()).thenReturn(new Envelope(13456, false, "test", "test")); - RabbitMQMessage msg = RabbitMQMessage.newInstance(mockMsg); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body(payloadBuffer) + .properties(new BasicProperties.Builder().contentType("application/json").build()) + .envelope(13456L, false, "test", "test") + .build(); - IncomingRabbitMQMessage incomingRabbitMQMessage = new IncomingRabbitMQMessage<>(msg, - mock(ClientHolder.class), + IncomingRabbitMQMessage incomingRabbitMQMessage = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); assertThat(incomingRabbitMQMessage.getPayload()).isEqualTo(payloadBuffer); } + + // --- getEffectiveContentType tests --- + + @Test + void testEffectiveContentTypeWithOverride() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .properties(new BasicProperties.Builder().contentType("application/json").build()) + .envelope(1L, false, "test", "test") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, "text/plain"); + + // Override takes precedence over the property + assertThat(incoming.getEffectiveContentType()).hasValue("text/plain"); + // getContentType returns the raw property value + assertThat(incoming.getContentType()).hasValue("application/json"); + } + + @Test + void testEffectiveContentTypeWithNullOverride() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .properties(new BasicProperties.Builder().contentType("application/xml").build()) + .envelope(1L, false, "test", "test") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + // No override → falls back to property + assertThat(incoming.getEffectiveContentType()).hasValue("application/xml"); + } + + @Test + void testEffectiveContentTypeWithNullOverrideAndNullProperty() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .envelope(1L, false, "test", "test") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + assertThat(incoming.getEffectiveContentType()).isEmpty(); + } + + // --- Content encoding warning path --- + + @Test + void testConstructorWithContentEncodingAndNonOctetStreamContentType() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("data") + .properties(new BasicProperties.Builder() + .contentType("text/plain") + .contentEncoding("UTF-8") + .build()) + .envelope(1L, false, "test", "test") + .build(); + + // Should not throw — the warning is logged but creation succeeds + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + assertThat(incoming.getContentEncoding()).hasValue("UTF-8"); + assertThat(incoming.getContentType()).hasValue("text/plain"); + } + + @Test + void testConstructorWithContentEncodingAndOctetStreamContentType() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body(new byte[] { 0x01, 0x02 }) + .properties(new BasicProperties.Builder() + .contentType("application/octet-stream") + .contentEncoding("binary") + .build()) + .envelope(1L, false, "test", "test") + .build(); + + // Binary content with encoding — no warning should be logged + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + assertThat(incoming.getContentEncoding()).hasValue("binary"); + } + + @Test + void testConstructorWithNullContentEncoding() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("data") + .properties(new BasicProperties.Builder() + .contentType("text/plain") + .build()) + .envelope(1L, false, "test", "test") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + assertThat(incoming.getContentEncoding()).isEmpty(); + } + + // --- injectMetadata --- + + @Test + void testInjectMetadata() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .envelope(1L, false, "test", "test") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + // Metadata should contain IncomingRabbitMQMetadata by default + assertThat(incoming.getMetadata(IncomingRabbitMQMetadata.class)).isPresent(); + + // Inject custom metadata + String customMeta = "custom-metadata"; + incoming.injectMetadata(customMeta); + + // Both original and injected metadata should be accessible + assertThat(incoming.getMetadata(IncomingRabbitMQMetadata.class)).isPresent(); + assertThat(incoming.getMetadata(String.class)).hasValue(customMeta); + } + + // --- Metadata delegation methods --- + + @Test + void testMetadataDelegation() { + Map headers = new HashMap<>(); + headers.put("x-custom", "header-value"); + + Date timestamp = new Date(); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("body") + .properties(new BasicProperties.Builder() + .contentType("text/plain") + .contentEncoding("UTF-8") + .headers(headers) + .deliveryMode(2) + .priority(5) + .correlationId("corr-123") + .replyTo("reply-queue") + .expiration("60000") + .messageId("msg-001") + .timestamp(timestamp) + .type("test-type") + .userId("user1") + .appId("app1") + .build()) + .envelope(42L, true, "exchange1", "rk1") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + assertThat(incoming.getHeaders()).containsEntry("x-custom", "header-value"); + assertThat(incoming.getContentType()).hasValue("text/plain"); + assertThat(incoming.getContentEncoding()).hasValue("UTF-8"); + assertThat(incoming.getDeliveryMode()).hasValue(2); + assertThat(incoming.getPriority()).hasValue(5); + assertThat(incoming.getCorrelationId()).hasValue("corr-123"); + assertThat(incoming.getReplyTo()).hasValue("reply-queue"); + assertThat(incoming.getExpiration()).hasValue("60000"); + assertThat(incoming.getMessageId()).hasValue("msg-001"); + assertThat(incoming.getTimestamp(ZoneOffset.UTC)).isPresent(); + assertThat(incoming.getType()).hasValue("test-type"); + assertThat(incoming.getUserId()).hasValue("user1"); + assertThat(incoming.getAppId()).hasValue("app1"); + } + + @Test + void testMetadataDelegationWithEmptyProperties() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .envelope(1L, false, "test", "test") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + assertThat(incoming.getContentType()).isEmpty(); + assertThat(incoming.getContentEncoding()).isEmpty(); + assertThat(incoming.getDeliveryMode()).isEmpty(); + assertThat(incoming.getPriority()).isEmpty(); + assertThat(incoming.getCorrelationId()).isEmpty(); + assertThat(incoming.getReplyTo()).isEmpty(); + assertThat(incoming.getExpiration()).isEmpty(); + assertThat(incoming.getMessageId()).isEmpty(); + assertThat(incoming.getTimestamp(ZoneOffset.UTC)).isEmpty(); + assertThat(incoming.getType()).isEmpty(); + assertThat(incoming.getUserId()).isEmpty(); + assertThat(incoming.getAppId()).isEmpty(); + } + + @Test + void testGetRabbitMQMessage() { + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .envelope(1L, false, "exchange", "routing-key") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + io.vertx.mutiny.rabbitmq.RabbitMQMessage retrieved = incoming.getRabbitMQMessage(); + assertThat(retrieved).isNotNull(); + assertThat(retrieved.envelope().getRoutingKey()).isEqualTo("routing-key"); + assertThat(retrieved.envelope().getExchange()).isEqualTo("exchange"); + } + + @SuppressWarnings("deprecation") + @Test + void testDeprecatedGetCreationTime() { + Date timestamp = new Date(); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .properties(new BasicProperties.Builder().timestamp(timestamp).build()) + .envelope(1L, false, "test", "test") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, doNothingNack, doNothingAck, null); + + // getCreationTime is deprecated and delegates to getTimestamp + assertThat(incoming.getCreationTime(ZoneId.of("UTC"))).isPresent(); + assertThat(incoming.getCreationTime(ZoneId.of("UTC"))) + .isEqualTo(incoming.getTimestamp(ZoneId.of("UTC"))); + } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java index 3367006052..49a6b708b9 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java @@ -2,9 +2,10 @@ import jakarta.enterprise.context.ApplicationScoped; -import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; +import io.smallrye.mutiny.Multi; + /** * A bean that can be registered to do just enough to support the * declaration of an exchange backing an outgoing rabbitmq channel. @@ -13,8 +14,8 @@ public class OutgoingBean { @Outgoing("sink") - public Message process() { - return Message.of("test"); + public Multi process() { + return Multi.createFrom().items("test", "test2", "test3"); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQArgumentsCDIConfigTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQArgumentsCDIConfigTest.java index 8e0ff201b1..234aa90260 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQArgumentsCDIConfigTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQArgumentsCDIConfigTest.java @@ -49,7 +49,7 @@ public void testConfigByCDIQueueArguments() throws IOException { JsonObject queue = usage.getQueue(queueName); assertThat(queue.getJsonObject("arguments").getMap()) - .containsExactlyInAnyOrderEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); + .containsAllEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); @@ -84,11 +84,11 @@ public void testConfigByCDIQueueDefaultArguments() throws IOException { JsonObject queue = usage.getQueue(queueName); assertThat(queue.getJsonObject("arguments").getMap()) - .containsExactlyInAnyOrderEntriesOf(Map.of("default-queue-arg", "default-value")); + .containsAllEntriesOf(Map.of("default-queue-arg", "default-value")); JsonObject exchange = usage.getExchange(queueName); assertThat(exchange.getJsonObject("arguments").getMap()) - .containsExactlyInAnyOrderEntriesOf(Map.of("default-exchange-arg", "default-value")); + .containsAllEntriesOf(Map.of("default-exchange-arg", "default-value")); await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); @@ -124,7 +124,7 @@ public void testConfigByCDIExchangeArguments() throws IOException { JsonObject exchange = usage.getExchange(queueName); assertThat(exchange.getJsonObject("arguments").getMap()) - .containsExactlyInAnyOrderEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); + .containsAllEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); @@ -165,11 +165,11 @@ public void testConfigByCDIDLQArguments() throws IOException { JsonObject queue = usage.getQueue(dlqName); assertThat(queue.getJsonObject("arguments").getMap()) - .containsExactlyInAnyOrderEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); + .containsAllEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); JsonObject exchange = usage.getExchange("DLX"); assertThat(exchange.getJsonObject("arguments").getMap()) - .containsExactlyInAnyOrderEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); + .containsAllEntriesOf(Map.of("my-str-arg", "str-value", "my-int-arg", 4)); await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerExtension.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerExtension.java new file mode 100644 index 0000000000..9b1637fad4 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerExtension.java @@ -0,0 +1,121 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import static org.junit.jupiter.api.extension.ExtensionContext.Namespace.GLOBAL; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.time.Duration; + +import org.jboss.logging.Logger; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +/** + * JUnit 5 extension that manages a singleton RabbitMQ container shared across all test classes. + * The container starts once on first use and stops when the JVM shuts down. + */ +public class RabbitMQBrokerExtension implements BeforeAllCallback, ParameterResolver, CloseableResource { + + private static final Logger LOGGER = Logger.getLogger(RabbitMQBrokerExtension.class.getName()); + + public static final String RABBITMQ_IMAGE_NAME = "rabbitmq:4.2.5-management-alpine"; + public static final String RABBITMQ_IMAGE_NAME_KEY = "rabbitmq.container.image"; + + private GenericContainer rabbit; + private String host; + private int port; + private int managementPort; + + @Override + public void beforeAll(ExtensionContext context) { + ExtensionContext.Store globalStore = context.getRoot().getStore(GLOBAL); + RabbitMQBrokerExtension extension = (RabbitMQBrokerExtension) globalStore.get(RabbitMQBrokerExtension.class); + if (extension == null) { + LOGGER.info("Starting RabbitMQ broker"); + startBroker(); + globalStore.put(RabbitMQBrokerExtension.class, this); + } + } + + @Override + public void close() { + LOGGER.info("Stopping RabbitMQ broker"); + if (rabbit != null) { + try { + rabbit.stop(); + } catch (Exception e) { + // Ignore it. + } + } + } + + private void startBroker() { + String imageName = System.getProperty(RABBITMQ_IMAGE_NAME_KEY, RABBITMQ_IMAGE_NAME); + rabbit = new GenericContainer<>(DockerImageName.parse(imageName)) + .withExposedPorts(5672, 15672) + .withNetworkAliases("rabbitmq") + .withNetwork(Network.SHARED) + .withLogConsumer(of -> LOGGER.debug(of.getUtf8String())) + .waitingFor(Wait.forLogMessage(".*Server startup complete.*\\n", 1) + .withStartupTimeout(Duration.ofSeconds(30))) + .withCopyFileToContainer(MountableFile.forClasspathResource("rabbitmq/enabled_plugins"), + "/etc/rabbitmq/enabled_plugins"); + rabbit.start(); + + host = rabbit.getHost(); + port = rabbit.getMappedPort(5672); + managementPort = rabbit.getMappedPort(15672); + LOGGER.infof("RabbitMQ broker started: %s:%d (management: %d)", host, port, managementPort); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + return parameterContext.isAnnotated(RabbitMQHost.class) + || parameterContext.isAnnotated(RabbitMQPort.class) + || parameterContext.isAnnotated(RabbitMQManagementPort.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + ExtensionContext.Store globalStore = extensionContext.getRoot().getStore(GLOBAL); + RabbitMQBrokerExtension extension = (RabbitMQBrokerExtension) globalStore.get(RabbitMQBrokerExtension.class); + if (parameterContext.isAnnotated(RabbitMQHost.class)) { + return extension.host; + } + if (parameterContext.isAnnotated(RabbitMQPort.class)) { + return extension.port; + } + if (parameterContext.isAnnotated(RabbitMQManagementPort.class)) { + return extension.managementPort; + } + return null; + } + + @Target({ ElementType.FIELD, ElementType.PARAMETER }) + @Retention(RetentionPolicy.RUNTIME) + public @interface RabbitMQHost { + } + + @Target({ ElementType.FIELD, ElementType.PARAMETER }) + @Retention(RetentionPolicy.RUNTIME) + public @interface RabbitMQPort { + } + + @Target({ ElementType.FIELD, ElementType.PARAMETER }) + @Retention(RetentionPolicy.RUNTIME) + public @interface RabbitMQManagementPort { + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java index c6b1c2e553..8f4886688c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java @@ -2,7 +2,6 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.time.Duration; import java.util.UUID; import jakarta.enterprise.inject.Any; @@ -10,18 +9,11 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.weld.environment.se.WeldContainer; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; +import org.junit.jupiter.api.extension.ExtendWith; import io.smallrye.config.SmallRyeConfigProviderResolver; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; @@ -32,21 +24,9 @@ /** * Provides a basis for test classes, by managing the RabbitMQ broker test container. */ +@ExtendWith(RabbitMQBrokerExtension.class) public class RabbitMQBrokerTestBase { - private static final Logger LOGGER = LoggerFactory.getLogger("RabbitMQ"); - - private static final GenericContainer RABBIT = new GenericContainer<>( - DockerImageName.parse("rabbitmq:3.12-management")) - .withExposedPorts(5672, 15672) - .withNetworkAliases("rabbitmq") - .withNetwork(Network.SHARED) - .withLogConsumer(of -> LOGGER.debug(of.getUtf8String())) - .waitingFor(Wait.forLogMessage(".*Server startup complete.*\\n", 1) - .withStartupTimeout(Duration.ofSeconds(30))) - .withCopyFileToContainer(MountableFile.forClasspathResource("rabbitmq/enabled_plugins"), - "/etc/rabbitmq/enabled_plugins"); - protected static String host; protected static int port; protected static int managementPort; @@ -59,12 +39,13 @@ public class RabbitMQBrokerTestBase { protected String queueName; @BeforeAll - public static void startBroker() { - RABBIT.start(); - - port = RABBIT.getMappedPort(5672); - managementPort = RABBIT.getMappedPort(15672); - host = RABBIT.getContainerIpAddress(); + public static void initBroker( + @RabbitMQBrokerExtension.RabbitMQHost String h, + @RabbitMQBrokerExtension.RabbitMQPort int p, + @RabbitMQBrokerExtension.RabbitMQManagementPort int mp) { + host = h; + port = p; + managementPort = mp; System.setProperty("rabbitmq-host", host); System.setProperty("rabbitmq-port", Integer.toString(port)); @@ -72,13 +53,6 @@ public static void startBroker() { System.setProperty("rabbitmq-password", password); } - @AfterAll - public static void stopBroker() { - RABBIT.stop(); - System.clearProperty("rabbitmq-host"); - System.clearProperty("rabbitmq-port"); - } - @BeforeEach public void setup() { executionHolder = new ExecutionHolder(Vertx.vertx()); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMessageConverterTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMessageConverterTest.java new file mode 100644 index 0000000000..ee2a01caf6 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQMessageConverterTest.java @@ -0,0 +1,484 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.junit.jupiter.api.Test; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BasicProperties; + +import io.netty.handler.codec.http.HttpHeaderValues; +import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler; +import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.buffer.Buffer; + +class RabbitMQMessageConverterTest { + + RabbitMQAckHandler doNothingAck = new RabbitMQAckHandler() { + @Override + public CompletionStage handle(IncomingRabbitMQMessage message, Context context) { + return CompletableFuture.completedFuture(null); + } + }; + + RabbitMQFailureHandler doNothingNack = new RabbitMQFailureHandler() { + @Override + public CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, + Throwable reason) { + return CompletableFuture.completedFuture(null); + } + }; + + // --- convert: plain message (no RabbitMQMessage), various payload types --- + + @Test + void convertWithStringPayload() { + Message message = Message.of("hello"); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("hello"); + assertThat(result.getRoutingKey()).isEqualTo("default-key"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + @Test + void convertWithNullPayload() { + Message message = Message.of(null); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().length()).isZero(); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.APPLICATION_OCTET_STREAM.toString()); + } + + @Test + void convertWithUUIDPayload() { + UUID uuid = UUID.randomUUID(); + Message message = Message.of(uuid); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo(uuid.toString()); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + @Test + void convertWithIntegerPayload() { + Message message = Message.of(42); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("42"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + @Test + void convertWithBooleanPayload() { + Message message = Message.of(true); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("true"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + @Test + void convertWithMutinyBufferPayload() { + Buffer buffer = Buffer.buffer("buffer-content"); + Message message = Message.of(buffer); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("buffer-content"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.APPLICATION_OCTET_STREAM.toString()); + } + + @Test + void convertWithVertxCoreBufferPayload() { + io.vertx.core.buffer.Buffer coreBuffer = io.vertx.core.buffer.Buffer.buffer("core-buffer"); + Message message = Message.of(coreBuffer); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("core-buffer"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.APPLICATION_OCTET_STREAM.toString()); + } + + @Test + void convertWithByteArrayPayload() { + byte[] bytes = "bytes".getBytes(); + Message message = Message.of(bytes); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("bytes"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.APPLICATION_OCTET_STREAM.toString()); + } + + @Test + void convertWithJsonObjectPayload() { + JsonObject json = new JsonObject().put("key", "value"); + Message message = Message.of(json); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo(json.encode()); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString()); + } + + @Test + void convertWithJsonArrayPayload() { + JsonArray array = new JsonArray().add("a").add("b"); + Message message = Message.of(array); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo(array.encode()); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString()); + } + + @Test + void convertWithPojoPayload() { + // A plain object should be JSON-serialized + Map pojo = Map.of("foo", "bar"); + Message> message = Message.of(pojo); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).contains("foo").contains("bar"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString()); + } + + // --- convert: with OutgoingRabbitMQMetadata --- + + @Test + void convertWithOutgoingMetadataAndTimestamp() { + ZonedDateTime ts = ZonedDateTime.of(2024, 1, 15, 12, 0, 0, 0, ZoneOffset.UTC); + OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder() + .withTimestamp(ts) + .withContentType("text/plain") + .withCorrelationId("corr-123") + .withRoutingKey("meta-key") + .build(); + + Message message = Message.of("payload", Metadata.of(metadata)); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getRoutingKey()).isEqualTo("meta-key"); + assertThat(result.getProperties().getContentType()).isEqualTo("text/plain"); + assertThat(result.getProperties().getCorrelationId()).isEqualTo("corr-123"); + assertThat(result.getProperties().getTimestamp()).isNotNull(); + } + + @Test + void convertWithOutgoingMetadataWithoutTimestamp() { + OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder() + .withContentType("application/xml") + .build(); + + Message message = Message.of("payload", Metadata.of(metadata)); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getProperties().getTimestamp()).isNull(); + assertThat(result.getProperties().getContentType()).isEqualTo("application/xml"); + } + + @Test + void convertWithoutOutgoingMetadataFallsBackToDefaults() { + Message message = Message.of("hello"); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.of(5000L), false); + + // No OutgoingRabbitMQMetadata → creates default with defaultTtl + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + assertThat(result.getProperties().getExpiration()).isEqualTo("5000"); + } + + @Test + void convertWithoutMetadataAndNoDefaultTtl() { + Message message = Message.of("hello"); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getProperties().getExpiration()).isNull(); + } + + @Test + void convertMetadataContentTypeFallsBackToDefault() { + // Metadata with null contentType → falls back to default for payload type + OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder().build(); + Message message = Message.of("hello", Metadata.of(metadata)); + + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + // --- convert: with IncomingRabbitMQMessage (RabbitMQMessage present) --- + + @Test + void convertWithIncomingRabbitMQMessage() { + Map headers = new HashMap<>(); + headers.put("x-custom", "val"); + BasicProperties props = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .contentEncoding("UTF-8") + .headers(headers) + .correlationId("corr-1") + .expiration("3000") + .build(); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("incoming-body") + .properties(props) + .envelope(1L, false, "exchange", "incoming-key") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, + doNothingNack, + doNothingAck, + null); + + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, incoming, "exchange", "default-key", Optional.of(9999L), false); + + // Routing key comes from the envelope + assertThat(result.getRoutingKey()).isEqualTo("incoming-key"); + // Content type from source properties + assertThat(result.getProperties().getContentType()).isEqualTo("text/plain"); + // Expiration from source properties (not overridden by default TTL since it's already set) + assertThat(result.getProperties().getExpiration()).isEqualTo("3000"); + assertThat(result.getProperties().getCorrelationId()).isEqualTo("corr-1"); + } + + @Test + void convertWithIncomingRabbitMQMessageNoExpirationUsesDefaultTtl() { + Map headers = new HashMap<>(); + BasicProperties props = new AMQP.BasicProperties.Builder() + .headers(headers) + .build(); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("body") + .properties(props) + .envelope(1L, false, "exchange", "rk") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, + doNothingNack, + doNothingAck, + null); + + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, incoming, "exchange", "default-key", Optional.of(7000L), false); + + // No expiration on source → should use default TTL + assertThat(result.getProperties().getExpiration()).isEqualTo("7000"); + } + + @Test + void convertWithIncomingRabbitMQMessageNoContentTypeFallsBackToDefault() { + Map headers = new HashMap<>(); + BasicProperties props = new AMQP.BasicProperties.Builder() + .headers(headers) + .build(); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("body") + .properties(props) + .envelope(1L, false, "exchange", "rk") + .build(); + + IncomingRabbitMQMessage incoming = new IncomingRabbitMQMessage<>(testMsg, + null, null, + doNothingNack, + doNothingAck, + null); + + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, incoming, "exchange", "default-key", Optional.empty(), false); + + // No content type on source → falls back to default for payload type + assertThat(result.getProperties().getContentType()).isNotNull(); + } + + // --- convert: with mutiny RabbitMQMessage payload --- + + @Test + void convertWithMutinyRabbitMQMessagePayload() { + Map headers = new HashMap<>(); + BasicProperties props = new AMQP.BasicProperties.Builder() + .contentType("application/json") + .headers(headers) + .build(); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("mutiny-payload") + .properties(props) + .envelope(1L, false, "ex", "mutiny-rk") + .build(); + + io.vertx.mutiny.rabbitmq.RabbitMQMessage mutinyMsg = io.vertx.mutiny.rabbitmq.RabbitMQMessage + .newInstance(testMsg); + + // Message wrapping a mutiny RabbitMQMessage as payload (not IncomingRabbitMQMessage) + Message message = Message.of(mutinyMsg); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getRoutingKey()).isEqualTo("mutiny-rk"); + assertThat(result.getProperties().getContentType()).isEqualTo("application/json"); + } + + // --- convert: with core RabbitMQMessage payload --- + + @Test + void convertWithCoreRabbitMQMessagePayload() { + Map headers = new HashMap<>(); + BasicProperties props = new AMQP.BasicProperties.Builder() + .contentType("text/xml") + .headers(headers) + .build(); + io.vertx.rabbitmq.RabbitMQMessage testMsg = TestRabbitMQMessage.builder() + .body("core-payload") + .properties(props) + .envelope(2L, false, "ex", "core-rk") + .build(); + + // Message wrapping a core (non-mutiny) RabbitMQMessage as payload + Message message = Message.of(testMsg); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getRoutingKey()).isEqualTo("core-rk"); + assertThat(result.getProperties().getContentType()).isEqualTo("text/xml"); + } + + // --- getRoutingKey: from OutgoingRabbitMQMetadata --- + + @Test + void convertUsesRoutingKeyFromOutgoingMetadata() { + OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder() + .withRoutingKey("meta-routing-key") + .build(); + Message message = Message.of("data", Metadata.of(metadata)); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "default-key", Optional.empty(), false); + + assertThat(result.getRoutingKey()).isEqualTo("meta-routing-key"); + } + + @Test + void convertFallsBackToDefaultRoutingKey() { + Message message = Message.of("data"); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "fallback-key", Optional.empty(), false); + + assertThat(result.getRoutingKey()).isEqualTo("fallback-key"); + } + + // --- convert: Long and other primitive types --- + + @Test + void convertWithLongPayload() { + Message message = Message.of(123456789L); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("123456789"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + @Test + void convertWithDoublePayload() { + Message message = Message.of(3.14); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("3.14"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + @Test + void convertWithCharacterPayload() { + Message message = Message.of('A'); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + assertThat(result.getBody().toString()).isEqualTo("A"); + assertThat(result.getProperties().getContentType()) + .isEqualTo(HttpHeaderValues.TEXT_PLAIN.toString()); + } + + // --- Metadata properties preservation --- + + @Test + void convertPreservesAllOutgoingMetadataProperties() { + ZonedDateTime ts = ZonedDateTime.of(2024, 6, 1, 10, 30, 0, 0, ZoneOffset.UTC); + OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder() + .withContentType("application/xml") + .withContentEncoding("gzip") + .withDeliveryMode(2) + .withPriority(5) + .withCorrelationId("c-id") + .withReplyTo("reply-queue") + .withExpiration("10000") + .withMessageId("msg-id") + .withTimestamp(ts) + .withType("my-type") + .withUserId("user") + .withAppId("app") + .withClusterId("cluster") + .build(); + + Message message = Message.of("data", Metadata.of(metadata)); + RabbitMQMessageConverter.OutgoingRabbitMQMessage result = RabbitMQMessageConverter.convert( + null, message, "exchange", "key", Optional.empty(), false); + + BasicProperties props = result.getProperties(); + assertThat(props.getContentType()).isEqualTo("application/xml"); + assertThat(props.getContentEncoding()).isEqualTo("gzip"); + assertThat(props.getDeliveryMode()).isEqualTo(2); + assertThat(props.getPriority()).isEqualTo(5); + assertThat(props.getCorrelationId()).isEqualTo("c-id"); + assertThat(props.getReplyTo()).isEqualTo("reply-queue"); + assertThat(props.getExpiration()).isEqualTo("10000"); + assertThat(props.getMessageId()).isEqualTo("msg-id"); + assertThat(props.getTimestamp()).isNotNull(); + assertThat(props.getType()).isEqualTo("my-type"); + assertThat(props.getUserId()).isEqualTo("user"); + assertThat(props.getAppId()).isEqualTo("app"); + // clusterId is set in OutgoingRabbitMQMetadata but not exposed by BasicProperties interface + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java index 7aef4925d4..59540fad83 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java @@ -5,34 +5,127 @@ import static org.awaitility.Awaitility.await; import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.time.Duration; import java.util.List; +import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.se.SeContainer; + import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; import org.jboss.weld.environment.se.Weld; import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.smallrye.reactive.messaging.providers.extension.HealthCenter; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; -public class RabbitMQReconnectionTest extends RabbitMQBrokerTestBase { +/** + * Tests RabbitMQ reconnection scenarios using Toxiproxy for fault injection. + * Uses its own dedicated RabbitMQ broker to avoid disrupting the shared container. + */ +public class RabbitMQReconnectionTest { - private WeldContainer container; + private static Network network; + private static GenericContainer rabbit; + private static String host; + private static int port; + private static int managementPort; + static final String username = "guest"; + static final String password = "guest"; + + private RabbitMQUsage usage; + private ExecutionHolder executionHolder; + private String exchangeName; + private String queueName; + + private WeldContainer container; Weld weld = new Weld(); + @BeforeAll + static void startBroker() { + network = Network.newNetwork(); + rabbit = new GenericContainer<>(DockerImageName.parse("rabbitmq:3.12-management")) + .withExposedPorts(5672, 15672) + .withNetworkAliases("rabbitmq") + .withNetwork(network) + .waitingFor(Wait.forLogMessage(".*Server startup complete.*\\n", 1) + .withStartupTimeout(Duration.ofSeconds(30))) + .withCopyFileToContainer(MountableFile.forClasspathResource("rabbitmq/enabled_plugins"), + "/etc/rabbitmq/enabled_plugins"); + rabbit.start(); + + host = rabbit.getHost(); + port = rabbit.getMappedPort(5672); + managementPort = rabbit.getMappedPort(15672); + + System.setProperty("rabbitmq-host", host); + System.setProperty("rabbitmq-port", Integer.toString(port)); + System.setProperty("rabbitmq-username", username); + System.setProperty("rabbitmq-password", password); + } + + @AfterAll + static void stopBroker() { + if (rabbit != null) { + rabbit.stop(); + } + if (network != null) { + network.close(); + } + System.clearProperty("rabbitmq-host"); + System.clearProperty("rabbitmq-port"); + } + + @BeforeEach + void setup() { + executionHolder = new ExecutionHolder(Vertx.vertx()); + usage = new RabbitMQUsage(executionHolder.vertx(), host, port, managementPort, username, password); + SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); + MapBasedConfig.cleanup(); + } + + @BeforeEach + void initQueueExchange(TestInfo testInfo) { + String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString()); + String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString()); + queueName = "queue" + cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits(); + exchangeName = "exchange" + cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits(); + } + + @AfterEach + void tearDown() { + usage.close(); + executionHolder.terminate(null); + SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); + MapBasedConfig.cleanup(); + } + @AfterEach - public void cleanup() { + void cleanup() { if (container != null) { container.select(RabbitMQConnector.class, ConnectorLiteral.of(RabbitMQConnector.CONNECTOR_NAME)).get() .terminate(null); @@ -43,6 +136,20 @@ public void cleanup() { SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); } + boolean isRabbitMQConnectorAvailable(WeldContainer container) { + final RabbitMQConnector connector = get(container, RabbitMQConnector.class, Any.Literal.INSTANCE); + return connector.getLiveness().isOk(); + } + + boolean isRabbitMQConnectorAlive(SeContainer container) { + HealthCenter health = get(container, HealthCenter.class); + return health.getLiveness().isOk(); + } + + T get(SeContainer container, Class beanType, Annotation... annotations) { + return container.getBeanManager().createInstance().select(beanType, annotations).get(); + } + private Proxy createContainerProxy(ToxiproxyContainer toxiproxy, int toxiPort) { try { // Create toxiproxy client @@ -55,7 +162,7 @@ private Proxy createContainerProxy(ToxiproxyContainer toxiproxy, int toxiPort) { } } - @Test + @Test // 15s void testSendingMessagesToRabbitMQ_connection_fails() { final String routingKey = "normal"; @@ -64,7 +171,7 @@ void testSendingMessagesToRabbitMQ_connection_fails() { try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") .asCompatibleSubstituteFor("shopify/toxiproxy")) .withNetworkAliases("toxiproxy")) { - toxiproxy.withNetwork(Network.SHARED); + toxiproxy.withNetwork(network); toxiproxy.start(); await().until(toxiproxy::isRunning); @@ -101,7 +208,7 @@ void testSendingMessagesToRabbitMQ_connection_fails() { } } - @Test + @Test // 17s void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { final String routingKey = "normal"; @@ -110,7 +217,7 @@ void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") .asCompatibleSubstituteFor("shopify/toxiproxy")) .withNetworkAliases("toxiproxy")) { - toxiproxy.withNetwork(Network.SHARED); + toxiproxy.withNetwork(network); toxiproxy.start(); await().until(toxiproxy::isRunning); @@ -147,16 +254,101 @@ void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { } } + @Test + void testSharedConnectionReconnectionPreservesContext() { + final String routingKey = "shared"; + try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") + .asCompatibleSubstituteFor("shopify/toxiproxy")) + .withNetworkAliases("toxiproxy")) { + toxiproxy.withNetwork(network); + toxiproxy.start(); + await().until(toxiproxy::isRunning); + + List exposedPorts = toxiproxy.getExposedPorts(); + int toxiPort = exposedPorts.get(exposedPorts.size() - 1); + Proxy proxy = createContainerProxy(toxiproxy, toxiPort); + int exposedPort = toxiproxy.getMappedPort(toxiPort); + + weld.addBeanClass(ReconnectingContextBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.queue.durable", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", toxiproxy.getHost()) + .put("mp.messaging.incoming.data.port", exposedPort) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", toxiproxy.getHost()) + .put("mp.messaging.outgoing.sink.port", exposedPort) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-interval", 1) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + ReconnectingContextBean bean = get(container, ReconnectingContextBean.class); + + // Wait for at least one message before disconnect (from OutgoingBean) + await().atMost(1, TimeUnit.MINUTES).until(() -> !bean.getContexts().isEmpty()); + + // Verify pre-disconnect messages have event loop context + assertThat(bean.getEventLoopFlags().get(0)).isTrue(); + + int preDisconnectCount = bean.getContexts().size(); + + // Disconnect + proxy.disable(); + await().pollDelay(3, SECONDS).until(() -> !isRabbitMQConnectorAvailable(container)); + + // Reconnect + proxy.enable(); + await().atMost(1, TimeUnit.MINUTES).until(() -> isRabbitMQConnectorAvailable(container)); + + // Send messages after reconnection via the direct RabbitMQ client + AtomicInteger counter = new AtomicInteger(); + usage.produce(exchangeName, queueName, routingKey, 3, counter::getAndIncrement); + + // Wait for at least one more message after reconnection + await().atMost(1, TimeUnit.MINUTES).until(() -> bean.getContexts().size() > preDisconnectCount); + + // Verify post-reconnection messages also have event loop context. + // This should fail because the reconnection Uni closure captures the original + // null root parameter instead of reading rootContext.get(), so after reconnect + // the context falls back to Vertx.currentContext() which may not be an event loop. + List postReconnectFlags = bean.getEventLoopFlags() + .subList(preDisconnectCount, bean.getEventLoopFlags().size()); + assertThat(postReconnectFlags) + .as("After reconnection, all messages should still have event loop context") + .doesNotContain(false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Verifies that messages can be received from RabbitMQ. */ - @Test + @Test // 14s void testReceivingMessagesFromRabbitMQ_connection_fails() { final String routingKey = "xyzzy"; try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") .asCompatibleSubstituteFor("shopify/toxiproxy")) .withNetworkAliases("toxiproxy")) { - toxiproxy.withNetwork(Network.SHARED); + toxiproxy.withNetwork(network); toxiproxy.start(); await().until(toxiproxy::isRunning); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index b79c77c999..8f081d66a5 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.rabbitmq; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import java.util.Comparator; @@ -198,6 +199,227 @@ void testIncomingDeclarations() throws Exception { assertThat(binding2.getString("routing_key")).isEqualTo("urgent"); } + @Test + void testSharedConnectionIncomingAndOutgoingStartup() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + } + + @Test + void testSharedConnectionIncomingUsesEventLoopContext() throws InterruptedException { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingContextBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + IncomingContextBean bean = get(container, IncomingContextBean.class); + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> name != null && name.startsWith("shared-connection")) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + + usage.produce(exchangeName, queueName, routingKey, 1, () -> 1); + + assertThat(bean.awaitMessage(1, TimeUnit.MINUTES)).isTrue(); + assertThat(bean.getMessageContext()).isNotNull(); + assertThat(bean.isEventLoopContext()).isTrue(); + } + + @Test + void testSharedConnectionNameIsNotSuffixed() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared-connection") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> "shared-connection".equals(name)) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + } + + @Test + void testDefaultConnectionNameIncludesDirection() { + final String routingKey = "default"; + + weld.addBeanClass(IncomingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + boolean hasDefaultName = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .anyMatch(name -> "data (Incoming)".equals(name)); + assertThat(hasDefaultName).isTrue(); + }); + } + + private static String getConnectionName(JsonObject connection) { + String connectionName = connection.getString("connection_name"); + if (connectionName != null) { + return connectionName; + } + + JsonObject properties = connection.getJsonObject("client_properties"); + if (properties == null) { + return null; + } + + return properties.getString("connection_name"); + } + + @Test + void testSharedConnectionConfigMismatchFailsStartup() { + final String routingKey = "shared"; + + weld.addBeanClass(IncomingBean.class); + weld.addBeanClass(OutgoingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.shared-connection-name", "shared") + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", true) + .put("mp.messaging.outgoing.sink.shared-connection-name", "shared") + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", "some-other-host") + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + assertThatThrownBy(() -> container = weld.initialize()) + .isInstanceOf(Exception.class) + .hasStackTraceContaining("mismatched configuration"); + } + /** * Verifies that Exchanges, Queues and Bindings are correctly declared as a result of * incoming connector configuration that specifies DLQ/DLX overrides. @@ -850,4 +1072,100 @@ void testConsumerArguments() { }); } + @Test + void testSharedConnectionMultipleIncomingChannelsGetDistinctContexts() throws InterruptedException { + final String routingKey1 = "ctx1"; + final String routingKey2 = "ctx2"; + final String queueName2 = queueName + "-2"; + + weld.addBeanClass(DualIncomingContextBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data1.exchange.name", exchangeName) + .put("mp.messaging.incoming.data1.exchange.declare", true) + .put("mp.messaging.incoming.data1.queue.name", queueName) + .put("mp.messaging.incoming.data1.queue.declare", true) + .put("mp.messaging.incoming.data1.routing-keys", routingKey1) + .put("mp.messaging.incoming.data1.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data1.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data1.host", host) + .put("mp.messaging.incoming.data1.port", port) + .put("mp.messaging.incoming.data1.tracing.enabled", false) + .put("mp.messaging.incoming.data2.exchange.name", exchangeName) + .put("mp.messaging.incoming.data2.exchange.declare", true) + .put("mp.messaging.incoming.data2.queue.name", queueName2) + .put("mp.messaging.incoming.data2.queue.declare", true) + .put("mp.messaging.incoming.data2.routing-keys", routingKey2) + .put("mp.messaging.incoming.data2.shared-connection-name", "shared-connection") + .put("mp.messaging.incoming.data2.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data2.host", host) + .put("mp.messaging.incoming.data2.port", port) + .put("mp.messaging.incoming.data2.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().atMost(1, TimeUnit.MINUTES).until(() -> isRabbitMQConnectorAvailable(container)); + + DualIncomingContextBean bean = get(container, DualIncomingContextBean.class); + + usage.produce(exchangeName, queueName, routingKey1, 1, () -> 1); + usage.produce(exchangeName, queueName2, routingKey2, 1, () -> 2); + + assertThat(bean.awaitMessages(1, TimeUnit.MINUTES)).isTrue(); + + assertThat(bean.isEventLoop1()).isTrue(); + assertThat(bean.isEventLoop2()).isTrue(); + assertThat(bean.getContext1()).isNotSameAs(bean.getContext2()); + + // Verify single shared connection + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + JsonArray connections = usage.getConnections(); + assertThat(connections).isNotNull(); + + List sharedConnectionNames = connections.stream() + .map(JsonObject.class::cast) + .map(RabbitMQTest::getConnectionName) + .filter(name -> name != null && name.startsWith("shared-connection")) + .distinct() + .collect(Collectors.toList()); + assertThat(sharedConnectionNames).hasSize(1); + }); + } + + @Test + void testNonSharedIncomingUsesEventLoopContext() throws InterruptedException { + final String routingKey = "nonshared"; + + weld.addBeanClass(IncomingContextBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + IncomingContextBean bean = get(container, IncomingContextBean.class); + + usage.produce(exchangeName, queueName, routingKey, 1, () -> 1); + + assertThat(bean.awaitMessage(1, TimeUnit.MINUTES)).isTrue(); + assertThat(bean.getMessageContext()).isNotNull(); + assertThat(bean.isEventLoopContext()).isTrue(); + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java index 19768102c1..8c9b3480b3 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java @@ -277,6 +277,26 @@ public JsonArray getBindings(final String exchangeName, final String queueName) } } + /** + * Returns the list of active connections. + * + * @return a {@link JsonArray} of connection descriptions + * @throws IOException if an error occurs + */ + public JsonArray getConnections() throws IOException { + final URL url = new URL(String.format("http://%s:%d/api/connections", options.getHost(), managementPort)); + final HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty("Authorization", "Basic " + getBasicAuth()); + conn.connect(); + + if (conn.getResponseCode() == 200) { + final String jsonString = getResponseString(conn); + return new JsonArray(jsonString); + } else { + return null; + } + } + private JsonObject getObjectByTypeAndName(final String objectType, final String objectName) throws IOException { final URL url = new URL(String.format("http://%s:%d/api/%s/%%2F/%s", options.getHost(), managementPort, objectType, objectName)); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java new file mode 100644 index 0000000000..766065cf6b --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ReconnectingContextBean.java @@ -0,0 +1,50 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.vertx.core.Context; + +@ApplicationScoped +public class ReconnectingContextBean { + + private final CopyOnWriteArrayList contexts = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList eventLoopFlags = new CopyOnWriteArrayList<>(); + private volatile CountDownLatch latch = new CountDownLatch(1); + + @Incoming("data") + public Uni consume(Message message) { + message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> { + Context context = metadata.context(); + contexts.add(context); + eventLoopFlags.add(context.isEventLoopContext()); + }); + latch.countDown(); + return Uni.createFrom().voidItem(); + } + + public void setExpectedMessages(int count) { + latch = new CountDownLatch(count); + } + + public boolean awaitMessages(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + public List getContexts() { + return contexts; + } + + public List getEventLoopFlags() { + return eventLoopFlags; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/TestRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/TestRabbitMQMessage.java new file mode 100644 index 0000000000..302882f22a --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/TestRabbitMQMessage.java @@ -0,0 +1,111 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BasicProperties; +import com.rabbitmq.client.Envelope; + +import io.vertx.core.buffer.Buffer; +import io.vertx.rabbitmq.RabbitMQMessage; + +/** + * A simple concrete implementation of {@link RabbitMQMessage} for unit tests, + * avoiding the need for Mockito mocks. + */ +public class TestRabbitMQMessage implements RabbitMQMessage { + + private final Buffer body; + private final BasicProperties properties; + private final Envelope envelope; + private final String consumerTag; + private final Integer messageCount; + + private TestRabbitMQMessage(Buffer body, BasicProperties properties, Envelope envelope, + String consumerTag, Integer messageCount) { + this.body = body; + this.properties = properties; + this.envelope = envelope; + this.consumerTag = consumerTag; + this.messageCount = messageCount; + } + + @Override + public Buffer body() { + return body; + } + + @Override + public String consumerTag() { + return consumerTag; + } + + @Override + public Envelope envelope() { + return envelope; + } + + @Override + public BasicProperties properties() { + return properties; + } + + @Override + public Integer messageCount() { + return messageCount; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Buffer body = Buffer.buffer(); + private BasicProperties properties = new AMQP.BasicProperties(); + private Envelope envelope = new Envelope(1L, false, "test-exchange", "test-key"); + private String consumerTag = "test-consumer"; + private Integer messageCount = 0; + + public Builder body(Buffer body) { + this.body = body; + return this; + } + + public Builder body(String body) { + this.body = Buffer.buffer(body); + return this; + } + + public Builder body(byte[] body) { + this.body = Buffer.buffer(body); + return this; + } + + public Builder properties(BasicProperties properties) { + this.properties = properties; + return this; + } + + public Builder envelope(Envelope envelope) { + this.envelope = envelope; + return this; + } + + public Builder envelope(long deliveryTag, boolean redeliver, String exchange, String routingKey) { + this.envelope = new Envelope(deliveryTag, redeliver, exchange, routingKey); + return this; + } + + public Builder consumerTag(String consumerTag) { + this.consumerTag = consumerTag; + return this; + } + + public Builder messageCount(Integer messageCount) { + this.messageCount = messageCount; + return this; + } + + public TestRabbitMQMessage build() { + return new TestRabbitMQMessage(body, properties, envelope, consumerTag, messageCount); + } + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/UnsatisfiedInstance.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/UnsatisfiedInstance.java new file mode 100644 index 0000000000..8c57f29713 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/UnsatisfiedInstance.java @@ -0,0 +1,73 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.lang.annotation.Annotation; +import java.util.Collections; +import java.util.Iterator; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.util.TypeLiteral; + +/** + * A trivial {@link Instance} implementation that is always unsatisfied. + */ +public class UnsatisfiedInstance implements Instance { + + private static final UnsatisfiedInstance INSTANCE = new UnsatisfiedInstance<>(); + + @SuppressWarnings("unchecked") + public static Instance instance() { + return (Instance) INSTANCE; + } + + private UnsatisfiedInstance() { + } + + @Override + public Instance select(Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(Class subtype, Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(TypeLiteral subtype, Annotation... qualifiers) { + return instance(); + } + + @Override + public boolean isUnsatisfied() { + return true; + } + + @Override + public boolean isAmbiguous() { + return false; + } + + @Override + public void destroy(T instance) { + } + + @Override + public Handle getHandle() { + return null; + } + + @Override + public Iterable> handles() { + return Collections.emptyList(); + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override + public T get() { + throw new UnsupportedOperationException("Unsatisfied instance"); + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandlerTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandlerTest.java new file mode 100644 index 0000000000..9385611e5e --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandlerTest.java @@ -0,0 +1,224 @@ +package io.smallrye.reactive.messaging.rabbitmq.fault; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata; +import io.smallrye.reactive.messaging.rabbitmq.WeldTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class RabbitMQFailureHandlerTest extends WeldTestBase { + + @Override + @BeforeEach + public void initWeld() { + super.initWeld(); + weld.addBeanClass(RabbitMQRequeue.Factory.class); + } + + private MapBasedConfig dataconfig(String failureStrategy) { + return commonConfig() + .with("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data.queue.name", queueName) + .with("mp.messaging.incoming.data.exchange.name", exchangeName) + .with("mp.messaging.incoming.data.exchange.routing-keys", routingKeys) + .with("mp.messaging.incoming.data.tracing.enabled", false) + .with("mp.messaging.incoming.data.failure-strategy", failureStrategy); + } + + private void produceMessages() { + AtomicInteger counter = new AtomicInteger(1); + usage.produce(exchangeName, queueName, routingKeys, 5, counter::getAndIncrement); + } + + @Test + void rejectStrategyDropsMessages() { + RejectBean bean = runApplication(dataconfig("reject"), RejectBean.class); + produceMessages(); + + await().until(() -> bean.getProcessed().size() >= 5); + assertThat(bean.getProcessed()).containsExactly(1, 2, 3, 4, 5); + // With reject (requeue=false by default), messages are dropped, not redelivered + assertThat(bean.getRedelivered()).isEmpty(); + } + + @Test + void requeueStrategyRequeuesMessages() { + RequeueBean bean = runApplication(dataconfig("requeue"), RequeueBean.class); + produceMessages(); + + // Each message is nacked on first delivery (requeued), then acked on redelivery + await().until(() -> bean.getRedelivered().size() >= 5); + assertThat(bean.getFirstDeliveries()).containsExactlyInAnyOrder(1, 2, 3, 4, 5); + assertThat(bean.getRedelivered()).containsExactlyInAnyOrder(1, 2, 3, 4, 5); + } + + @Test + void rejectWithRequeueMetadataOverride() { + RejectWithRequeueOverrideBean bean = runApplication(dataconfig("reject"), + RejectWithRequeueOverrideBean.class); + produceMessages(); + + // Despite reject strategy (default requeue=false), metadata overrides to requeue=true + await().until(() -> bean.getRedelivered().size() >= 5); + assertThat(bean.getFirstDeliveries()).containsExactlyInAnyOrder(1, 2, 3, 4, 5); + assertThat(bean.getRedelivered()).containsExactlyInAnyOrder(1, 2, 3, 4, 5); + } + + @Test + void requeueWithNoRequeueMetadataOverride() { + RequeueWithNoRequeueOverrideBean bean = runApplication(dataconfig("requeue"), + RequeueWithNoRequeueOverrideBean.class); + produceMessages(); + + await().until(() -> bean.getProcessed().size() >= 5); + assertThat(bean.getProcessed()).containsExactly(1, 2, 3, 4, 5); + // Despite requeue strategy (default requeue=true), metadata overrides to requeue=false + assertThat(bean.getRedelivered()).isEmpty(); + } + + // --- Inner beans --- + + /** + * Nacks every message (reject strategy will drop them). + */ + @ApplicationScoped + public static class RejectBean { + private final List processed = new CopyOnWriteArrayList<>(); + private final List redelivered = new CopyOnWriteArrayList<>(); + + @Incoming("data") + public CompletionStage process(Message msg) { + int value = Integer.parseInt(msg.getPayload()); + processed.add(value); + boolean redeliver = msg.getMetadata(IncomingRabbitMQMetadata.class) + .map(IncomingRabbitMQMetadata::isRedeliver) + .orElse(false); + if (redeliver) { + redelivered.add(value); + } + return msg.nack(new RuntimeException("reject")); + } + + public List getProcessed() { + return processed; + } + + public List getRedelivered() { + return redelivered; + } + } + + /** + * Nacks on first delivery, acks on redelivery (requeue strategy will requeue). + */ + @ApplicationScoped + public static class RequeueBean { + private final List firstDeliveries = new CopyOnWriteArrayList<>(); + private final List redelivered = new CopyOnWriteArrayList<>(); + + @Incoming("data") + public CompletionStage process(Message msg) { + int value = Integer.parseInt(msg.getPayload()); + boolean redeliver = msg.getMetadata(IncomingRabbitMQMetadata.class) + .map(IncomingRabbitMQMetadata::isRedeliver) + .orElse(false); + if (redeliver) { + redelivered.add(value); + return msg.ack(); + } else { + firstDeliveries.add(value); + return msg.nack(new RuntimeException("requeue")); + } + } + + public List getFirstDeliveries() { + return firstDeliveries; + } + + public List getRedelivered() { + return redelivered; + } + } + + /** + * Uses reject strategy but overrides requeue to true via metadata. + * Nacks on first delivery with requeue=true metadata, acks on redelivery. + */ + @ApplicationScoped + public static class RejectWithRequeueOverrideBean { + private final List firstDeliveries = new CopyOnWriteArrayList<>(); + private final List redelivered = new CopyOnWriteArrayList<>(); + + @Incoming("data") + public CompletionStage process(Message msg) { + int value = Integer.parseInt(msg.getPayload()); + boolean redeliver = msg.getMetadata(IncomingRabbitMQMetadata.class) + .map(IncomingRabbitMQMetadata::isRedeliver) + .orElse(false); + if (redeliver) { + redelivered.add(value); + return msg.ack(); + } else { + firstDeliveries.add(value); + return msg.nack(new RuntimeException("reject-with-requeue"), + Metadata.of(new RabbitMQRejectMetadata(true))); + } + } + + public List getFirstDeliveries() { + return firstDeliveries; + } + + public List getRedelivered() { + return redelivered; + } + } + + /** + * Uses requeue strategy but overrides requeue to false via metadata. + * Nacks every message with requeue=false metadata. + */ + @ApplicationScoped + public static class RequeueWithNoRequeueOverrideBean { + private final List processed = new CopyOnWriteArrayList<>(); + private final List redelivered = new CopyOnWriteArrayList<>(); + + @Incoming("data") + public CompletionStage process(Message msg) { + int value = Integer.parseInt(msg.getPayload()); + processed.add(value); + boolean redeliver = msg.getMetadata(IncomingRabbitMQMetadata.class) + .map(IncomingRabbitMQMetadata::isRedeliver) + .orElse(false); + if (redeliver) { + redelivered.add(value); + } + return msg.nack(new RuntimeException("no-requeue"), + Metadata.of(new RabbitMQRejectMetadata(false))); + } + + public List getProcessed() { + return processed; + } + + public List getRedelivered() { + return redelivered; + } + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java new file mode 100644 index 0000000000..a4ebb62f7e --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelperTest.java @@ -0,0 +1,192 @@ +package io.smallrye.reactive.messaging.rabbitmq.internals; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.junit.jupiter.api.Test; + +import com.rabbitmq.client.Address; + +import io.vertx.rabbitmq.RabbitMQOptions; + +class RabbitMQClientHelperTest { + + @Test + void testIdenticalOptionsProduceSameFingerprint() { + RabbitMQOptions options1 = new RabbitMQOptions() + .setHost("localhost") + .setPort(5672) + .setUser("guest") + .setPassword("guest") + .setVirtualHost("/"); + + RabbitMQOptions options2 = new RabbitMQOptions() + .setHost("localhost") + .setPort(5672) + .setUser("guest") + .setPassword("guest") + .setVirtualHost("/"); + + String fingerprint1 = RabbitMQClientHelper.computeConnectionFingerprint(options1); + String fingerprint2 = RabbitMQClientHelper.computeConnectionFingerprint(options2); + + assertThat(fingerprint1).isEqualTo(fingerprint2); + } + + @Test + void testDifferentHostsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("host-a").setPort(5672); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("host-b").setPort(5672); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentAddressesProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setAddresses(List.of(new Address("host-a", 5672))); + RabbitMQOptions options2 = new RabbitMQOptions().setAddresses(List.of(new Address("host-a", 5673))); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentPortsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setPort(5672); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setPort(5673); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentUsersProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setUser("alice"); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setUser("bob"); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentVirtualHostsProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setVirtualHost("/"); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setVirtualHost("/staging"); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + @Test + void testDifferentSslProduceDifferentFingerprints() { + RabbitMQOptions options1 = new RabbitMQOptions().setHost("localhost").setSsl(false); + RabbitMQOptions options2 = new RabbitMQOptions().setHost("localhost").setSsl(true); + + assertThat(RabbitMQClientHelper.computeConnectionFingerprint(options1)) + .isNotEqualTo(RabbitMQClientHelper.computeConnectionFingerprint(options2)); + } + + // --- serverQueueName --- + + @Test + void testServerQueueNameWithServerAuto() { + assertThat(RabbitMQClientHelper.serverQueueName("(server.auto)")).isEmpty(); + } + + @Test + void testServerQueueNameWithRegularName() { + assertThat(RabbitMQClientHelper.serverQueueName("my-queue")).isEqualTo("my-queue"); + } + + @Test + void testServerQueueNameWithEmptyString() { + assertThat(RabbitMQClientHelper.serverQueueName("")).isEmpty(); + } + + // --- parseArguments --- + + @Test + void testParseArgumentsWithEmpty() { + Map result = RabbitMQClientHelper.parseArguments(Optional.empty()); + assertThat(result).isEmpty(); + } + + @Test + void testParseArgumentsWithSingleStringArgument() { + Map result = RabbitMQClientHelper.parseArguments(Optional.of("x-queue-type:quorum")); + assertThat(result).containsEntry("x-queue-type", "quorum"); + } + + @Test + void testParseArgumentsWithSingleIntegerArgument() { + Map result = RabbitMQClientHelper.parseArguments(Optional.of("x-priority:10")); + assertThat(result).containsEntry("x-priority", 10); + } + + @Test + void testParseArgumentsWithMultipleArguments() { + Map result = RabbitMQClientHelper.parseArguments( + Optional.of("x-priority:10,x-queue-type:quorum")); + assertThat(result) + .containsEntry("x-priority", 10) + .containsEntry("x-queue-type", "quorum") + .hasSize(2); + } + + @Test + void testParseArgumentsWithSpaces() { + Map result = RabbitMQClientHelper.parseArguments( + Optional.of(" x-priority:5 , x-queue-type:classic ")); + assertThat(result) + .containsEntry("x-priority", 5) + .containsEntry("x-queue-type", "classic"); + } + + @Test + void testParseArgumentsIgnoresMalformedSegments() { + // Segments without ":" separator or with multiple ":" are skipped (length != 2) + Map result = RabbitMQClientHelper.parseArguments( + Optional.of("no-colon,valid-key:valid-value")); + assertThat(result) + .containsEntry("valid-key", "valid-value") + .hasSize(1); + } + + // --- computeConnectionFingerprint determinism --- + + @Test + void testFingerprintIsDeterministic() { + RabbitMQOptions options = new RabbitMQOptions() + .setHost("localhost") + .setPort(5672) + .setUser("guest"); + + String first = RabbitMQClientHelper.computeConnectionFingerprint(options); + String second = RabbitMQClientHelper.computeConnectionFingerprint(options); + + assertThat(first).isEqualTo(second); + } + + @Test + void testFingerprintWithNullAddresses() { + RabbitMQOptions options = new RabbitMQOptions().setHost("localhost"); + // No addresses set → should still compute fingerprint without errors + String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options); + assertThat(fingerprint).isNotEmpty(); + } + + @Test + void testFingerprintIsHexSha256() { + RabbitMQOptions options = new RabbitMQOptions().setHost("localhost"); + String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options); + + // SHA-256 produces 64-character hex string + assertThat(fingerprint).hasSize(64); + assertThat(fingerprint).matches("[0-9a-f]+"); + } + +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSenderTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSenderTest.java new file mode 100644 index 0000000000..2ec1e29428 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSenderTest.java @@ -0,0 +1,108 @@ +package io.smallrye.reactive.messaging.rabbitmq.internals; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.junit.jupiter.api.Test; + +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; +import io.smallrye.reactive.messaging.rabbitmq.WeldTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class RabbitMQMessageSenderTest extends WeldTestBase { + + private MapBasedConfig outgoingConfig() { + return commonConfig() + .with("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .with("mp.messaging.outgoing.sink.exchange.declare", false) + .with("mp.messaging.outgoing.sink.default-routing-key", routingKeys) + .with("mp.messaging.outgoing.sink.tracing.enabled", false); + } + + @Test + void sendingMessages() { + List received = new CopyOnWriteArrayList<>(); + usage.consumeIntegers(exchangeName, routingKeys, received::add); + + SenderBean bean = runApplication(outgoingConfig(), SenderBean.class); + for (int i = 1; i <= 5; i++) { + bean.send(i); + } + + await().until(() -> received.size() >= 5); + assertThat(received).containsExactlyInAnyOrder(1, 2, 3, 4, 5); + } + + @Test + void sendingWithDefaultTtl() { + List expirations = new CopyOnWriteArrayList<>(); + usage.consume(exchangeName, routingKeys, msg -> { + expirations.add(msg.properties().getExpiration()); + }); + + SenderBean bean = runApplication(outgoingConfig() + .with("mp.messaging.outgoing.sink.default-ttl", 5000L), + SenderBean.class); + for (int i = 1; i <= 3; i++) { + bean.send(i); + } + + await().until(() -> expirations.size() >= 3); + assertThat(expirations).allMatch("5000"::equals); + } + + @Test + void sendingWithPublishConfirms() { + List received = new CopyOnWriteArrayList<>(); + usage.consumeIntegers(exchangeName, routingKeys, received::add); + + SenderBean bean = runApplication(outgoingConfig() + .with("mp.messaging.outgoing.sink.publish-confirms", true), + SenderBean.class); + for (int i = 1; i <= 5; i++) { + bean.send(i); + } + + await().until(() -> received.size() >= 5); + assertThat(received).containsExactlyInAnyOrder(1, 2, 3, 4, 5); + } + + @Test + void sendingWithMaxInflightMessages() { + List received = new CopyOnWriteArrayList<>(); + usage.consumeIntegers(exchangeName, routingKeys, received::add); + + SenderBean bean = runApplication(outgoingConfig() + .with("mp.messaging.outgoing.sink.max-inflight-messages", 2), + SenderBean.class); + for (int i = 1; i <= 10; i++) { + bean.send(i); + } + + await().until(() -> received.size() >= 10); + assertThat(received).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + // --- Inner beans --- + + @ApplicationScoped + public static class SenderBean { + @Inject + @Channel("sink") + Emitter emitter; + + public void send(int value) { + emitter.send(value); + } + } + +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceTextMapGetterTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceTextMapGetterTest.java new file mode 100644 index 0000000000..60848af782 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceTextMapGetterTest.java @@ -0,0 +1,89 @@ +package io.smallrye.reactive.messaging.rabbitmq.tracing; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +class RabbitMQTraceTextMapGetterTest { + + @Test + void keysWithNonNullHeaders() { + Map headers = new HashMap<>(); + headers.put("traceparent", "00-abc-def-01"); + headers.put("tracestate", "key=value"); + + RabbitMQTrace trace = RabbitMQTrace.traceQueue("dest", "rk", headers); + + Iterable keys = RabbitMQTraceTextMapGetter.INSTANCE.keys(trace); + assertThat(keys).containsExactlyInAnyOrder("traceparent", "tracestate"); + } + + @Test + void keysWithNullHeaders() { + RabbitMQTrace trace = RabbitMQTrace.traceQueue("dest", "rk", null); + + Iterable keys = RabbitMQTraceTextMapGetter.INSTANCE.keys(trace); + assertThat(keys).isEmpty(); + } + + @Test + void getWithExistingKey() { + Map headers = new HashMap<>(); + headers.put("traceparent", "00-abc-def-01"); + + RabbitMQTrace trace = RabbitMQTrace.traceQueue("dest", "rk", headers); + + String value = RabbitMQTraceTextMapGetter.INSTANCE.get(trace, "traceparent"); + assertThat(value).isEqualTo("00-abc-def-01"); + } + + @Test + void getWithMissingKey() { + Map headers = new HashMap<>(); + headers.put("traceparent", "00-abc-def-01"); + + RabbitMQTrace trace = RabbitMQTrace.traceQueue("dest", "rk", headers); + + String value = RabbitMQTraceTextMapGetter.INSTANCE.get(trace, "nonexistent"); + assertThat(value).isNull(); + } + + @Test + void getWithNullHeaders() { + RabbitMQTrace trace = RabbitMQTrace.traceQueue("dest", "rk", null); + + String value = RabbitMQTraceTextMapGetter.INSTANCE.get(trace, "traceparent"); + assertThat(value).isNull(); + } + + @Test + void getWithNullCarrier() { + String value = RabbitMQTraceTextMapGetter.INSTANCE.get(null, "traceparent"); + assertThat(value).isNull(); + } + + @Test + void getWithNonStringHeaderValue() { + Map headers = new HashMap<>(); + headers.put("x-count", 42); + + RabbitMQTrace trace = RabbitMQTrace.traceQueue("dest", "rk", headers); + + String value = RabbitMQTraceTextMapGetter.INSTANCE.get(trace, "x-count"); + assertThat(value).isEqualTo("42"); + } + + @Test + void getWithNullHeaderValue() { + Map headers = new HashMap<>(); + headers.put("x-null", null); + + RabbitMQTrace trace = RabbitMQTrace.traceQueue("dest", "rk", headers); + + String value = RabbitMQTraceTextMapGetter.INSTANCE.get(trace, "x-null"); + assertThat(value).isNull(); + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceTextMapSetterTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceTextMapSetterTest.java new file mode 100644 index 0000000000..67b4a55ff1 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQTraceTextMapSetterTest.java @@ -0,0 +1,60 @@ +package io.smallrye.reactive.messaging.rabbitmq.tracing; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +class RabbitMQTraceTextMapSetterTest { + + @Test + void setAddsHeaderToCarrier() { + Map headers = new HashMap<>(); + RabbitMQTrace trace = RabbitMQTrace.traceExchange("exchange", "rk", headers); + + RabbitMQTraceTextMapSetter.INSTANCE.set(trace, "traceparent", "00-abc-def-01"); + + assertThat(headers).containsEntry("traceparent", "00-abc-def-01"); + } + + @Test + void setOverwritesExistingHeader() { + Map headers = new HashMap<>(); + headers.put("traceparent", "old-value"); + RabbitMQTrace trace = RabbitMQTrace.traceExchange("exchange", "rk", headers); + + RabbitMQTraceTextMapSetter.INSTANCE.set(trace, "traceparent", "new-value"); + + assertThat(headers).containsEntry("traceparent", "new-value"); + } + + @Test + void setWithNullCarrierDoesNotThrow() { + // Should silently do nothing + RabbitMQTraceTextMapSetter.INSTANCE.set(null, "key", "value"); + } + + @Test + void setWithNullHeadersDoesNotThrow() { + RabbitMQTrace trace = RabbitMQTrace.traceExchange("exchange", "rk", null); + + // Should silently do nothing when headers are null + RabbitMQTraceTextMapSetter.INSTANCE.set(trace, "key", "value"); + } + + @Test + void setMultipleHeaders() { + Map headers = new HashMap<>(); + RabbitMQTrace trace = RabbitMQTrace.traceExchange("exchange", "rk", headers); + + RabbitMQTraceTextMapSetter.INSTANCE.set(trace, "traceparent", "parent-val"); + RabbitMQTraceTextMapSetter.INSTANCE.set(trace, "tracestate", "state-val"); + + assertThat(headers) + .containsEntry("traceparent", "parent-val") + .containsEntry("tracestate", "state-val") + .hasSize(2); + } +}