Skip to content

Commit 1edffdd

Browse files
ozangunalpCassio Fiuza
andauthored
RabbitMQ improve test coverage of tests (#3354)
* feat(rabbitmq): share connections with context safety and config guard * Some tests and cleanup * Simplify the ClientHolder, context and shared client mechanism * Added some docs for RabbitMQ shared-connection-name property * Test coverage rabbitmq * RabbitMQ container shared between tests Bump the image to 4.2.5 --------- Co-authored-by: Cassio Fiuza <cassio.silva@syonet.com>
1 parent 34424be commit 1edffdd

27 files changed

+2642
-200
lines changed

documentation/src/main/docs/rabbitmq/rabbitmq-client-customization.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,31 @@ connector. You need to indicate the name of the client using the
1313
`client-options-name` attribute:
1414

1515
mp.messaging.incoming.prices.client-options-name=my-named-options
16+
17+
## Shared connections
18+
19+
By default, each channel opens its own connection to the RabbitMQ
20+
broker. If your application has multiple channels connecting to the same
21+
broker, you can configure them to share a single underlying connection
22+
using the `shared-connection-name` attribute:
23+
24+
``` properties
25+
mp.messaging.incoming.orders.connector=smallrye-rabbitmq
26+
mp.messaging.incoming.orders.shared-connection-name=my-connection
27+
28+
mp.messaging.outgoing.confirmations.connector=smallrye-rabbitmq
29+
mp.messaging.outgoing.confirmations.shared-connection-name=my-connection
30+
```
31+
32+
In the above example, the `orders` incoming channel and the
33+
`confirmations` outgoing channel share the same RabbitMQ connection.
34+
35+
All channels sharing a connection name **must** use identical connection
36+
options (host, port, credentials, SSL settings, virtual host, etc.).
37+
If two channels declare the same `shared-connection-name` but have
38+
different connection options, the connector throws an
39+
`IllegalStateException` at startup.
40+
41+
Shared connections are useful when your application has many channels
42+
connecting to the same broker and you want to reduce the number of TCP
43+
connections, or when the broker imposes connection limits.

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java

Lines changed: 47 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
44
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
55

6+
import java.util.Set;
7+
import java.util.concurrent.CompletableFuture;
68
import java.util.concurrent.CompletionStage;
9+
import java.util.concurrent.ConcurrentHashMap;
710
import java.util.concurrent.atomic.AtomicBoolean;
811
import java.util.concurrent.atomic.AtomicReference;
912
import java.util.function.Consumer;
@@ -13,55 +16,34 @@
1316
import io.smallrye.mutiny.Uni;
1417
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
1518
import io.vertx.mutiny.core.Context;
16-
import io.vertx.mutiny.core.Vertx;
1719
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
1820

1921
public class ClientHolder {
2022

2123
private final RabbitMQClient client;
2224

23-
private final AtomicBoolean connected = new AtomicBoolean(false);
24-
private final AtomicReference<CurrentConnection> connectionHolder = new AtomicReference<>();
25+
private final AtomicBoolean hasBeenConnected = new AtomicBoolean(false);
26+
private final AtomicReference<CompletableFuture<RabbitMQClient>> ongoingConnection = new AtomicReference<>();
2527
private final Uni<RabbitMQClient> connection;
28+
private final Set<String> channels = ConcurrentHashMap.newKeySet();
2629

27-
private final Vertx vertx;
28-
29-
public ClientHolder(RabbitMQClient client,
30-
RabbitMQConnectorCommonConfiguration configuration,
31-
Vertx vertx,
32-
Context root) {
30+
public ClientHolder(RabbitMQClient client) {
3331
this.client = client;
34-
this.vertx = vertx;
3532
this.connection = Uni.createFrom().deferred(() -> client.start()
3633
.onSubscription().invoke(() -> {
37-
connected.set(true);
38-
log.connectionEstablished(configuration.getChannel());
34+
hasBeenConnected.set(true);
35+
log.connectionEstablished(String.join(", ", channels));
3936
})
4037
.onItem().transform(ignored -> {
41-
connectionHolder
42-
.set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root));
43-
4438
// handle the case we are already disconnected.
45-
if (!client.isConnected() || connectionHolder.get() == null) {
39+
if (!client.isConnected()) {
4640
// Throwing the exception would trigger a retry.
47-
connectionHolder.set(null);
4841
throw ex.illegalStateConnectionDisconnected();
4942
}
5043
return client;
5144
})
52-
.onFailure().invoke(log::unableToConnectToBroker)
53-
.onFailure().invoke(t -> {
54-
connectionHolder.set(null);
55-
log.unableToRecoverFromConnectionDisruption(t);
56-
}))
57-
.memoize().until(() -> {
58-
CurrentConnection connection = connectionHolder.get();
59-
if (connection == null) {
60-
return true;
61-
}
62-
return !connection.client.isConnected();
63-
});
64-
45+
.onFailure().invoke(log::unableToConnectToBroker))
46+
.memoize().until(() -> !client.isConnected());
6547
}
6648

6749
public static CompletionStage<Void> runOnContext(Context context, IncomingRabbitMQMessage<?> msg,
@@ -80,21 +62,12 @@ public static CompletionStage<Void> runOnContextAndReportFailure(Context context
8062
});
8163
}
8264

83-
public Context getContext() {
84-
CurrentConnection connection = connectionHolder.get();
85-
if (connection != null) {
86-
return connection.context;
87-
} else {
88-
return null;
89-
}
90-
}
91-
9265
public RabbitMQClient client() {
9366
return client;
9467
}
9568

9669
public boolean hasBeenConnected() {
97-
return connected.get();
70+
return hasBeenConnected.get();
9871
}
9972

10073
@CheckReturnValue
@@ -106,24 +79,46 @@ public Function<Throwable, Uni<Void>> getNack(final long deliveryTag, final bool
10679
return t -> client.basicNack(deliveryTag, false, requeue);
10780
}
10881

109-
public Vertx getVertx() {
110-
return vertx;
111-
}
112-
11382
@CheckReturnValue
11483
public Uni<RabbitMQClient> getOrEstablishConnection() {
115-
return connection;
84+
return Uni.createFrom().deferred(this::establishConnection);
11685
}
11786

118-
private static class CurrentConnection {
119-
120-
final RabbitMQClient client;
121-
final Context context;
87+
private Uni<RabbitMQClient> establishConnection() {
88+
CompletableFuture<RabbitMQClient> existing = ongoingConnection.get();
89+
if (existing != null) {
90+
if (!existing.isDone() || client.isConnected()) {
91+
return Uni.createFrom().completionStage(existing);
92+
}
93+
ongoingConnection.compareAndSet(existing, null);
94+
}
12295

123-
private CurrentConnection(RabbitMQClient client, Context context) {
124-
this.client = client;
125-
this.context = context;
96+
CompletableFuture<RabbitMQClient> placeholder = new CompletableFuture<>();
97+
CompletableFuture<RabbitMQClient> current = ongoingConnection.compareAndExchange(null, placeholder);
98+
if (current != null) {
99+
return Uni.createFrom().completionStage(current);
126100
}
101+
connection.subscribe().with(placeholder::complete, placeholder::completeExceptionally);
102+
placeholder.whenComplete((result, error) -> {
103+
if (error != null) {
104+
ongoingConnection.compareAndSet(placeholder, null);
105+
}
106+
});
107+
return Uni.createFrom().completionStage(placeholder);
108+
}
109+
110+
public Set<String> channels() {
111+
return channels;
112+
}
113+
114+
public ClientHolder retain(String channel) {
115+
channels.add(channel);
116+
return this;
117+
}
118+
119+
public boolean release(String channel) {
120+
channels.remove(channel);
121+
return channels.isEmpty();
127122
}
128123

129124
}

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,18 @@ public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Meta
6060
private final String contentTypeOverride;
6161
private final T payload;
6262

63-
public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder,
63+
public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Context context,
6464
RabbitMQFailureHandler onNack,
6565
RabbitMQAckHandler onAck, String contentTypeOverride) {
66-
this(delegate.getDelegate(), holder, onNack, onAck, contentTypeOverride);
66+
this(delegate.getDelegate(), holder, context, onNack, onAck, contentTypeOverride);
6767
}
6868

69-
IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder,
69+
IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, Context context,
7070
RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) {
7171
this.message = msg;
7272
this.deliveryTag = msg.envelope().getDeliveryTag();
7373
this.holder = holder;
74-
this.context = holder.getContext();
74+
this.context = context;
7575
this.contentTypeOverride = contentTypeOverride;
7676
this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride);
7777
this.onNack = onNack;
@@ -134,17 +134,6 @@ public void acknowledgeMessage() {
134134
holder.getAck(this.deliveryTag).subscribeAsCompletionStage();
135135
}
136136

137-
/**
138-
* Rejects the message by nack'ing with requeue=false; this will either discard the message for good or
139-
* (if a DLQ has been set up) send it to the DLQ.
140-
*
141-
* @param reason the cause of the rejection, which must not be null
142-
*/
143-
public void rejectMessage(Throwable reason) {
144-
this.rejectMessage(reason, false);
145-
holder.getNack(this.deliveryTag, false).apply(reason).subscribeAsCompletionStage();
146-
}
147-
148137
/**
149138
* Rejects the message by nack'ing it.
150139
* <p>

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.smallrye.reactive.messaging.rabbitmq;
22

3-
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.*;
3+
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
4+
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
5+
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;
6+
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
47
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
58

69
import java.util.List;
@@ -38,6 +41,7 @@
3841
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
3942
import io.smallrye.reactive.messaging.rabbitmq.internals.IncomingRabbitMQChannel;
4043
import io.smallrye.reactive.messaging.rabbitmq.internals.OutgoingRabbitMQChannel;
44+
import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper;
4145
import io.vertx.mutiny.core.Vertx;
4246
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
4347
import io.vertx.rabbitmq.RabbitMQOptions;
@@ -64,6 +68,7 @@
6468
@ConnectorAttribute(name = "reconnect-interval", direction = INCOMING_AND_OUTGOING, description = "The interval (in seconds) between two reconnection attempts", type = "int", alias = "rabbitmq-reconnect-interval", defaultValue = "10")
6569
@ConnectorAttribute(name = "network-recovery-interval", direction = INCOMING_AND_OUTGOING, description = "How long (ms) will automatic recovery wait before attempting to reconnect", type = "int", defaultValue = "5000")
6670
@ConnectorAttribute(name = "user", direction = INCOMING_AND_OUTGOING, description = "The user name to use when connecting to the broker", type = "string", defaultValue = "guest")
71+
@ConnectorAttribute(name = "shared-connection-name", direction = INCOMING_AND_OUTGOING, description = "Optional identifier allowing multiple channels to share the same RabbitMQ connection when set to the same value", type = "string")
6772
@ConnectorAttribute(name = "include-properties", direction = INCOMING_AND_OUTGOING, description = "Whether to include properties when a broker message is passed on the event bus", type = "boolean", defaultValue = "false")
6873
@ConnectorAttribute(name = "requested-channel-max", direction = INCOMING_AND_OUTGOING, description = "The initially requested maximum channel number", type = "int", defaultValue = "2047")
6974
@ConnectorAttribute(name = "requested-heartbeat", direction = INCOMING_AND_OUTGOING, description = "The initially requested heartbeat interval (seconds), zero for none", type = "int", defaultValue = "60")
@@ -162,9 +167,11 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H
162167
@Inject
163168
@Any
164169
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories;
165-
private List<IncomingRabbitMQChannel> incomings = new CopyOnWriteArrayList<>();
166-
private List<OutgoingRabbitMQChannel> outgoings = new CopyOnWriteArrayList<>();
167-
private Map<String, RabbitMQClient> clients = new ConcurrentHashMap<>();
170+
private final List<IncomingRabbitMQChannel> incomings = new CopyOnWriteArrayList<>();
171+
private final List<OutgoingRabbitMQChannel> outgoings = new CopyOnWriteArrayList<>();
172+
private final Map<String, ClientHolder> clients = new ConcurrentHashMap<>();
173+
// connection-name to fingerprint map to check against same connection-name but different options
174+
private final Map<String, String> connectionFingerprints = new ConcurrentHashMap<>();
168175

169176
@Inject
170177
@Any
@@ -263,28 +270,20 @@ public void terminate(
263270
outgoing.terminate();
264271
}
265272

266-
clients.forEach((channel, rabbitMQClient) -> rabbitMQClient.stopAndAwait());
273+
for (Map.Entry<String, ClientHolder> entry : clients.entrySet()) {
274+
stopClient(entry.getValue().client(), true);
275+
}
267276
clients.clear();
277+
connectionFingerprints.clear();
268278
}
269279

270280
public Vertx vertx() {
271281
return executionHolder.vertx();
272282
}
273283

274-
public void registerClient(String channel, RabbitMQClient client) {
275-
RabbitMQClient old = clients.put(channel, client);
276-
if (old != null) {
277-
old.stopAndForget();
278-
}
279-
}
280-
281284
public void reportIncomingFailure(String channel, Throwable reason) {
282285
log.failureReported(channel, reason);
283-
RabbitMQClient client = clients.remove(channel);
284-
if (client != null) {
285-
// Called on vertx context, we can't block: stop clients without waiting
286-
client.stopAndForget();
287-
}
286+
releaseClient(channel, false);
288287
}
289288

290289
public Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories() {
@@ -306,4 +305,43 @@ public Instance<CredentialsProvider> credentialsProviders() {
306305
public Instance<Map<String, ?>> configMaps() {
307306
return configMaps;
308307
}
308+
309+
public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config) {
310+
String channel = config.getChannel();
311+
RabbitMQOptions options = RabbitMQClientHelper.buildClientOptions(this, config);
312+
String connectionName = options.getConnectionName();
313+
String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options);
314+
String existing = connectionFingerprints.putIfAbsent(connectionName, fingerprint);
315+
if (existing != null && !existing.equals(fingerprint)) {
316+
throw ex.illegalStateSharedConnectionConfigMismatch(connectionName);
317+
}
318+
return clients.compute(fingerprint,
319+
(key, current) -> (current == null ? new ClientHolder(RabbitMQClient.create(vertx(), options)) : current)
320+
.retain(channel));
321+
}
322+
323+
public void releaseClient(String channel, boolean await) {
324+
for (var e : clients.entrySet()) {
325+
ClientHolder shared = e.getValue();
326+
if (shared.channels().contains(channel)) {
327+
if (clients.computeIfPresent(e.getKey(), (k, c) -> c.release(channel) ? null : c) == null) {
328+
connectionFingerprints.values().remove(e.getKey());
329+
stopClient(shared.client(), await);
330+
}
331+
return;
332+
}
333+
}
334+
}
335+
336+
private void stopClient(RabbitMQClient client, boolean await) {
337+
if (client == null) {
338+
return;
339+
}
340+
if (await) {
341+
client.stopAndAwait();
342+
} else {
343+
client.stopAndForget();
344+
}
345+
}
346+
309347
}

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQExceptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,7 @@ public interface RabbitMQExceptions {
4141

4242
@Message(id = 16009, value = "Unable to create a client, probably a config error")
4343
IllegalStateException illegalStateUnableToCreateClient(@Cause Throwable t);
44+
45+
@Message(id = 16010, value = "Shared connection '%s' has mismatched configuration; ensure all channels using the same shared-connection-name have identical connection settings")
46+
IllegalStateException illegalStateSharedConnectionConfigMismatch(String sharedConnectionName);
4447
}

0 commit comments

Comments
 (0)