Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,31 @@ connector. You need to indicate the name of the client using the
`client-options-name` attribute:

mp.messaging.incoming.prices.client-options-name=my-named-options

## Shared connections

By default, each channel opens its own connection to the RabbitMQ
broker. If your application has multiple channels connecting to the same
broker, you can configure them to share a single underlying connection
using the `shared-connection-name` attribute:

``` properties
mp.messaging.incoming.orders.connector=smallrye-rabbitmq
mp.messaging.incoming.orders.shared-connection-name=my-connection

mp.messaging.outgoing.confirmations.connector=smallrye-rabbitmq
mp.messaging.outgoing.confirmations.shared-connection-name=my-connection
```

In the above example, the `orders` incoming channel and the
`confirmations` outgoing channel share the same RabbitMQ connection.

All channels sharing a connection name **must** use identical connection
options (host, port, credentials, SSL settings, virtual host, etc.).
If two channels declare the same `shared-connection-name` but have
different connection options, the connector throws an
`IllegalStateException` at startup.

Shared connections are useful when your application has many channels
connecting to the same broker and you want to reduce the number of TCP
connections, or when the broker imposes connection limits.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -13,55 +16,34 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;

public class ClientHolder {

private final RabbitMQClient client;

private final AtomicBoolean connected = new AtomicBoolean(false);
private final AtomicReference<CurrentConnection> connectionHolder = new AtomicReference<>();
private final AtomicBoolean hasBeenConnected = new AtomicBoolean(false);
private final AtomicReference<CompletableFuture<RabbitMQClient>> ongoingConnection = new AtomicReference<>();
private final Uni<RabbitMQClient> connection;
private final Set<String> channels = ConcurrentHashMap.newKeySet();

private final Vertx vertx;

public ClientHolder(RabbitMQClient client,
RabbitMQConnectorCommonConfiguration configuration,
Vertx vertx,
Context root) {
public ClientHolder(RabbitMQClient client) {
this.client = client;
this.vertx = vertx;
this.connection = Uni.createFrom().deferred(() -> client.start()
.onSubscription().invoke(() -> {
connected.set(true);
log.connectionEstablished(configuration.getChannel());
hasBeenConnected.set(true);
log.connectionEstablished(String.join(", ", channels));
})
.onItem().transform(ignored -> {
connectionHolder
.set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
if (!client.isConnected()) {
// Throwing the exception would trigger a retry.
connectionHolder.set(null);
throw ex.illegalStateConnectionDisconnected();
}
return client;
})
.onFailure().invoke(log::unableToConnectToBroker)
.onFailure().invoke(t -> {
connectionHolder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
}))
.memoize().until(() -> {
CurrentConnection connection = connectionHolder.get();
if (connection == null) {
return true;
}
return !connection.client.isConnected();
});

.onFailure().invoke(log::unableToConnectToBroker))
.memoize().until(() -> !client.isConnected());
}

public static CompletionStage<Void> runOnContext(Context context, IncomingRabbitMQMessage<?> msg,
Expand All @@ -80,21 +62,12 @@ public static CompletionStage<Void> runOnContextAndReportFailure(Context context
});
}

public Context getContext() {
CurrentConnection connection = connectionHolder.get();
if (connection != null) {
return connection.context;
} else {
return null;
}
}

public RabbitMQClient client() {
return client;
}

public boolean hasBeenConnected() {
return connected.get();
return hasBeenConnected.get();
}

@CheckReturnValue
Expand All @@ -106,24 +79,46 @@ public Function<Throwable, Uni<Void>> getNack(final long deliveryTag, final bool
return t -> client.basicNack(deliveryTag, false, requeue);
}

public Vertx getVertx() {
return vertx;
}

@CheckReturnValue
public Uni<RabbitMQClient> getOrEstablishConnection() {
return connection;
return Uni.createFrom().deferred(this::establishConnection);
}

private static class CurrentConnection {

final RabbitMQClient client;
final Context context;
private Uni<RabbitMQClient> establishConnection() {
CompletableFuture<RabbitMQClient> existing = ongoingConnection.get();
if (existing != null) {
if (!existing.isDone() || client.isConnected()) {
return Uni.createFrom().completionStage(existing);
}
ongoingConnection.compareAndSet(existing, null);
}

private CurrentConnection(RabbitMQClient client, Context context) {
this.client = client;
this.context = context;
CompletableFuture<RabbitMQClient> placeholder = new CompletableFuture<>();
CompletableFuture<RabbitMQClient> current = ongoingConnection.compareAndExchange(null, placeholder);
if (current != null) {
return Uni.createFrom().completionStage(current);
}
connection.subscribe().with(placeholder::complete, placeholder::completeExceptionally);
placeholder.whenComplete((result, error) -> {
if (error != null) {
ongoingConnection.compareAndSet(placeholder, null);
}
});
return Uni.createFrom().completionStage(placeholder);
}

public Set<String> channels() {
return channels;
}

public ClientHolder retain(String channel) {
channels.add(channel);
return this;
}

public boolean release(String channel) {
channels.remove(channel);
return channels.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Meta
private final String contentTypeOverride;
private final T payload;

public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder,
public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Context context,
RabbitMQFailureHandler onNack,
RabbitMQAckHandler onAck, String contentTypeOverride) {
this(delegate.getDelegate(), holder, onNack, onAck, contentTypeOverride);
this(delegate.getDelegate(), holder, context, onNack, onAck, contentTypeOverride);
}

IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder,
IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, Context context,
RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) {
this.message = msg;
this.deliveryTag = msg.envelope().getDeliveryTag();
this.holder = holder;
this.context = holder.getContext();
this.context = context;
this.contentTypeOverride = contentTypeOverride;
this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride);
this.onNack = onNack;
Expand Down Expand Up @@ -134,17 +134,6 @@ public void acknowledgeMessage() {
holder.getAck(this.deliveryTag).subscribeAsCompletionStage();
}

/**
* Rejects the message by nack'ing with requeue=false; this will either discard the message for good or
* (if a DLQ has been set up) send it to the DLQ.
*
* @param reason the cause of the rejection, which must not be null
*/
public void rejectMessage(Throwable reason) {
this.rejectMessage(reason, false);
holder.getNack(this.deliveryTag, false).apply(reason).subscribeAsCompletionStage();
}

/**
* Rejects the message by nack'ing it.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.smallrye.reactive.messaging.rabbitmq;

import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.*;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.List;
Expand Down Expand Up @@ -38,6 +41,7 @@
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.internals.IncomingRabbitMQChannel;
import io.smallrye.reactive.messaging.rabbitmq.internals.OutgoingRabbitMQChannel;
import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQOptions;
Expand All @@ -64,6 +68,7 @@
@ConnectorAttribute(name = "reconnect-interval", direction = INCOMING_AND_OUTGOING, description = "The interval (in seconds) between two reconnection attempts", type = "int", alias = "rabbitmq-reconnect-interval", defaultValue = "10")
@ConnectorAttribute(name = "network-recovery-interval", direction = INCOMING_AND_OUTGOING, description = "How long (ms) will automatic recovery wait before attempting to reconnect", type = "int", defaultValue = "5000")
@ConnectorAttribute(name = "user", direction = INCOMING_AND_OUTGOING, description = "The user name to use when connecting to the broker", type = "string", defaultValue = "guest")
@ConnectorAttribute(name = "shared-connection-name", direction = INCOMING_AND_OUTGOING, description = "Optional identifier allowing multiple channels to share the same RabbitMQ connection when set to the same value", type = "string")
@ConnectorAttribute(name = "include-properties", direction = INCOMING_AND_OUTGOING, description = "Whether to include properties when a broker message is passed on the event bus", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "requested-channel-max", direction = INCOMING_AND_OUTGOING, description = "The initially requested maximum channel number", type = "int", defaultValue = "2047")
@ConnectorAttribute(name = "requested-heartbeat", direction = INCOMING_AND_OUTGOING, description = "The initially requested heartbeat interval (seconds), zero for none", type = "int", defaultValue = "60")
Expand Down Expand Up @@ -162,9 +167,11 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H
@Inject
@Any
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories;
private List<IncomingRabbitMQChannel> incomings = new CopyOnWriteArrayList<>();
private List<OutgoingRabbitMQChannel> outgoings = new CopyOnWriteArrayList<>();
private Map<String, RabbitMQClient> clients = new ConcurrentHashMap<>();
private final List<IncomingRabbitMQChannel> incomings = new CopyOnWriteArrayList<>();
private final List<OutgoingRabbitMQChannel> outgoings = new CopyOnWriteArrayList<>();
private final Map<String, ClientHolder> clients = new ConcurrentHashMap<>();
// connection-name to fingerprint map to check against same connection-name but different options
private final Map<String, String> connectionFingerprints = new ConcurrentHashMap<>();

@Inject
@Any
Expand Down Expand Up @@ -263,28 +270,20 @@ public void terminate(
outgoing.terminate();
}

clients.forEach((channel, rabbitMQClient) -> rabbitMQClient.stopAndAwait());
for (Map.Entry<String, ClientHolder> entry : clients.entrySet()) {
stopClient(entry.getValue().client(), true);
}
clients.clear();
connectionFingerprints.clear();
}

public Vertx vertx() {
return executionHolder.vertx();
}

public void registerClient(String channel, RabbitMQClient client) {
RabbitMQClient old = clients.put(channel, client);
if (old != null) {
old.stopAndForget();
}
}

public void reportIncomingFailure(String channel, Throwable reason) {
log.failureReported(channel, reason);
RabbitMQClient client = clients.remove(channel);
if (client != null) {
// Called on vertx context, we can't block: stop clients without waiting
client.stopAndForget();
}
releaseClient(channel, false);
}

public Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories() {
Expand All @@ -306,4 +305,43 @@ public Instance<CredentialsProvider> credentialsProviders() {
public Instance<Map<String, ?>> configMaps() {
return configMaps;
}

public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config) {
String channel = config.getChannel();
RabbitMQOptions options = RabbitMQClientHelper.buildClientOptions(this, config);
String connectionName = options.getConnectionName();
String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options);
String existing = connectionFingerprints.putIfAbsent(connectionName, fingerprint);
if (existing != null && !existing.equals(fingerprint)) {
throw ex.illegalStateSharedConnectionConfigMismatch(connectionName);
}
return clients.compute(fingerprint,
(key, current) -> (current == null ? new ClientHolder(RabbitMQClient.create(vertx(), options)) : current)
.retain(channel));
}

public void releaseClient(String channel, boolean await) {
for (var e : clients.entrySet()) {
ClientHolder shared = e.getValue();
if (shared.channels().contains(channel)) {
if (clients.computeIfPresent(e.getKey(), (k, c) -> c.release(channel) ? null : c) == null) {
connectionFingerprints.values().remove(e.getKey());
stopClient(shared.client(), await);
}
return;
}
}
}

private void stopClient(RabbitMQClient client, boolean await) {
if (client == null) {
return;
}
if (await) {
client.stopAndAwait();
} else {
client.stopAndForget();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ public interface RabbitMQExceptions {

@Message(id = 16009, value = "Unable to create a client, probably a config error")
IllegalStateException illegalStateUnableToCreateClient(@Cause Throwable t);

@Message(id = 16010, value = "Shared connection '%s' has mismatched configuration; ensure all channels using the same shared-connection-name have identical connection settings")
IllegalStateException illegalStateSharedConnectionConfigMismatch(String sharedConnectionName);
}
Loading