Skip to content

Commit c19aa66

Browse files
committed
Simplify the ClientHolder, context and shared client mechanism
1 parent e2745d3 commit c19aa66

File tree

6 files changed

+86
-201
lines changed

6 files changed

+86
-201
lines changed

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java

Lines changed: 43 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
44
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
55

6+
import java.util.List;
7+
import java.util.concurrent.CompletableFuture;
68
import java.util.concurrent.CompletionStage;
9+
import java.util.concurrent.CopyOnWriteArrayList;
710
import java.util.concurrent.atomic.AtomicBoolean;
811
import java.util.concurrent.atomic.AtomicReference;
912
import java.util.function.Consumer;
@@ -13,49 +16,39 @@
1316
import io.smallrye.mutiny.Uni;
1417
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
1518
import io.vertx.mutiny.core.Context;
16-
import io.vertx.mutiny.core.Vertx;
1719
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
1820

1921
public class ClientHolder {
2022

2123
private final RabbitMQClient client;
2224

23-
private final AtomicBoolean connected = new AtomicBoolean(false);
24-
private final AtomicReference<CurrentConnection> connectionHolder = new AtomicReference<>();
25-
private final AtomicReference<Context> rootContext;
26-
private final AtomicReference<CompletionStage<RabbitMQClient>> connectionStage = new AtomicReference<>();
25+
private final AtomicBoolean hasBeenConnected = new AtomicBoolean(false);
26+
private final AtomicBoolean isConnected = new AtomicBoolean(false);
27+
private final AtomicReference<CompletableFuture<RabbitMQClient>> ongoingConnection = new AtomicReference<>();
2728
private final Uni<RabbitMQClient> connection;
29+
private final List<String> channels = new CopyOnWriteArrayList<>();
2830

29-
private final Vertx vertx;
30-
31-
public ClientHolder(RabbitMQClient client,
32-
String channel,
33-
Vertx vertx,
34-
Context root) {
31+
public ClientHolder(RabbitMQClient client) {
3532
this.client = client;
36-
this.vertx = vertx;
37-
this.rootContext = new AtomicReference<>(root);
3833
this.connection = Uni.createFrom().deferred(() -> client.start()
3934
.onSubscription().invoke(() -> {
40-
connected.set(true);
41-
log.connectionEstablished(channel);
35+
hasBeenConnected.set(true);
36+
log.connectionEstablished(String.join(", ", channels));
4237
})
4338
.onItem().transform(ignored -> {
44-
Context ctx = rootContext.get();
45-
connectionHolder
46-
.set(new CurrentConnection(client, ctx == null ? Vertx.currentContext() : ctx));
39+
isConnected.set(true);
4740

4841
// handle the case we are already disconnected.
49-
if (!client.isConnected() || connectionHolder.get() == null) {
42+
if (!client.isConnected() || !isConnected.get()) {
5043
// Throwing the exception would trigger a retry.
51-
connectionHolder.set(null);
44+
isConnected.set(false);
5245
throw ex.illegalStateConnectionDisconnected();
5346
}
5447
return client;
5548
})
5649
.onFailure().invoke(log::unableToConnectToBroker)
5750
.onFailure().invoke(t -> {
58-
connectionHolder.set(null);
51+
isConnected.set(false);
5952
log.unableToRecoverFromConnectionDisruption(t);
6053
}));
6154
}
@@ -76,32 +69,12 @@ public static CompletionStage<Void> runOnContextAndReportFailure(Context context
7669
});
7770
}
7871

79-
public Context getContext() {
80-
CurrentConnection connection = connectionHolder.get();
81-
if (connection != null) {
82-
return connection.context;
83-
} else {
84-
return null;
85-
}
86-
}
87-
88-
public void ensureContext(Context context) {
89-
if (context == null) {
90-
return;
91-
}
92-
rootContext.compareAndSet(null, context);
93-
CurrentConnection connection = connectionHolder.get();
94-
if (connection != null && connection.context == null) {
95-
connectionHolder.compareAndSet(connection, new CurrentConnection(connection.client, context));
96-
}
97-
}
98-
9972
public RabbitMQClient client() {
10073
return client;
10174
}
10275

10376
public boolean hasBeenConnected() {
104-
return connected.get();
77+
return hasBeenConnected.get();
10578
}
10679

10780
@CheckReturnValue
@@ -113,46 +86,46 @@ public Function<Throwable, Uni<Void>> getNack(final long deliveryTag, final bool
11386
return t -> client.basicNack(deliveryTag, false, requeue);
11487
}
11588

116-
public Vertx getVertx() {
117-
return vertx;
118-
}
119-
12089
@CheckReturnValue
12190
public Uni<RabbitMQClient> getOrEstablishConnection() {
122-
CompletionStage<RabbitMQClient> existing = connectionStage.get();
91+
return Uni.createFrom().deferred(this::establishConnection);
92+
}
93+
94+
private Uni<RabbitMQClient> establishConnection() {
95+
CompletableFuture<RabbitMQClient> existing = ongoingConnection.get();
12396
if (existing != null) {
124-
if (!existing.toCompletableFuture().isDone() || client.isConnected()) {
97+
if (!existing.isDone() || client.isConnected()) {
12598
return Uni.createFrom().completionStage(existing);
12699
}
127-
connectionStage.compareAndSet(existing, null);
100+
ongoingConnection.compareAndSet(existing, null);
128101
}
129102

130-
for (;;) {
131-
CompletionStage<RabbitMQClient> current = connectionStage.get();
132-
if (current != null) {
133-
return Uni.createFrom().completionStage(current);
134-
}
135-
CompletionStage<RabbitMQClient> created = connection.subscribeAsCompletionStage();
136-
if (connectionStage.compareAndSet(null, created)) {
137-
created.whenComplete((result, error) -> {
138-
if (error != null) {
139-
connectionStage.compareAndSet(created, null);
140-
}
141-
});
142-
return Uni.createFrom().completionStage(created);
143-
}
103+
CompletableFuture<RabbitMQClient> placeholder = new CompletableFuture<>();
104+
CompletableFuture<RabbitMQClient> current = ongoingConnection.compareAndExchange(null, placeholder);
105+
if (current != null) {
106+
return Uni.createFrom().completionStage(current);
144107
}
108+
connection.subscribe().with(placeholder::complete, placeholder::completeExceptionally);
109+
placeholder.whenComplete((result, error) -> {
110+
if (error != null) {
111+
ongoingConnection.compareAndSet(placeholder, null);
112+
}
113+
});
114+
return Uni.createFrom().completionStage(placeholder);
145115
}
146116

147-
private static class CurrentConnection {
117+
public List<String> channels() {
118+
return channels;
119+
}
148120

149-
final RabbitMQClient client;
150-
final Context context;
121+
public ClientHolder retain(String channel) {
122+
channels.add(channel);
123+
return this;
124+
}
151125

152-
private CurrentConnection(RabbitMQClient client, Context context) {
153-
this.client = client;
154-
this.context = context;
155-
}
126+
public boolean release(String channel) {
127+
channels.remove(channel);
128+
return channels.isEmpty();
156129
}
157130

158131
}

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@ public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Meta
6060
private final String contentTypeOverride;
6161
private final T payload;
6262

63-
public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder,
64-
RabbitMQFailureHandler onNack,
65-
RabbitMQAckHandler onAck, String contentTypeOverride) {
66-
this(delegate.getDelegate(), holder, holder.getContext(), onNack, onAck, contentTypeOverride);
67-
}
68-
6963
public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Context context,
7064
RabbitMQFailureHandler onNack,
7165
RabbitMQAckHandler onAck, String contentTypeOverride) {
@@ -77,7 +71,7 @@ public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Co
7771
this.message = msg;
7872
this.deliveryTag = msg.envelope().getDeliveryTag();
7973
this.holder = holder;
80-
this.context = context != null ? context : holder.getContext();
74+
this.context = context;
8175
this.contentTypeOverride = contentTypeOverride;
8276
this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride);
8377
this.onNack = onNack;

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java

Lines changed: 28 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@
66
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
77
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
88

9-
import java.util.ArrayList;
109
import java.util.List;
1110
import java.util.Map;
1211
import java.util.NoSuchElementException;
1312
import java.util.concurrent.ConcurrentHashMap;
1413
import java.util.concurrent.CopyOnWriteArrayList;
1514
import java.util.concurrent.Flow;
16-
import java.util.concurrent.atomic.AtomicInteger;
1715

1816
import jakarta.annotation.Priority;
1917
import jakarta.enterprise.context.ApplicationScoped;
@@ -169,10 +167,11 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H
169167
@Inject
170168
@Any
171169
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories;
172-
private List<IncomingRabbitMQChannel> incomings = new CopyOnWriteArrayList<>();
173-
private List<OutgoingRabbitMQChannel> outgoings = new CopyOnWriteArrayList<>();
174-
private Map<String, ClientRegistration> clientRegistrations = new ConcurrentHashMap<>();
175-
private Map<String, SharedClient> sharedClients = new ConcurrentHashMap<>();
170+
private final List<IncomingRabbitMQChannel> incomings = new CopyOnWriteArrayList<>();
171+
private final List<OutgoingRabbitMQChannel> outgoings = new CopyOnWriteArrayList<>();
172+
private final Map<String, ClientHolder> clients = new ConcurrentHashMap<>();
173+
// connection-name to fingerprint map to check against same connection-name but different options
174+
private final Map<String, String> connectionFingerprints = new ConcurrentHashMap<>();
176175

177176
@Inject
178177
@Any
@@ -271,11 +270,11 @@ public void terminate(
271270
outgoing.terminate();
272271
}
273272

274-
List<String> registeredChannels = new ArrayList<>(clientRegistrations.keySet());
275-
for (String channel : registeredChannels) {
276-
releaseClient(channel);
273+
for (Map.Entry<String, ClientHolder> entry : clients.entrySet()) {
274+
stopClient(entry.getValue().client(), true);
277275
}
278-
sharedClients.clear();
276+
clients.clear();
277+
connectionFingerprints.clear();
279278
}
280279

281280
public Vertx vertx() {
@@ -284,16 +283,7 @@ public Vertx vertx() {
284283

285284
public void reportIncomingFailure(String channel, Throwable reason) {
286285
log.failureReported(channel, reason);
287-
ClientRegistration registration = clientRegistrations.remove(channel);
288-
if (registration == null) {
289-
return;
290-
}
291-
292-
if (registration.shared) {
293-
releaseSharedClient(registration.key, false);
294-
} else {
295-
stopClient(registration.holder.client(), false);
296-
}
286+
releaseClient(channel, false);
297287
}
298288

299289
public Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories() {
@@ -316,71 +306,29 @@ public Instance<CredentialsProvider> credentialsProviders() {
316306
return configMaps;
317307
}
318308

319-
public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config, io.vertx.mutiny.core.Context context) {
320-
ClientRegistration existing = clientRegistrations.get(config.getChannel());
321-
if (existing != null) {
322-
return existing.holder;
323-
}
324-
325-
return config.getSharedConnectionName()
326-
.map(name -> getOrCreateSharedHolder(config, context, name))
327-
.orElseGet(() -> createAndRegisterHolder(config, context, config.getChannel(), false));
328-
}
329-
330-
private ClientHolder createAndRegisterHolder(RabbitMQConnectorCommonConfiguration config,
331-
io.vertx.mutiny.core.Context context, String key, boolean shared) {
332-
ClientHolder holder = new ClientHolder(RabbitMQClientHelper.createClient(this, config), config.getChannel(), vertx(),
333-
context);
334-
clientRegistrations.put(config.getChannel(), new ClientRegistration(holder, shared, key));
335-
return holder;
336-
}
337-
338-
private ClientHolder getOrCreateSharedHolder(RabbitMQConnectorCommonConfiguration config,
339-
io.vertx.mutiny.core.Context context, String name) {
309+
public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config) {
310+
String channel = config.getChannel();
340311
RabbitMQOptions options = RabbitMQClientHelper.buildClientOptions(this, config);
312+
String connectionName = options.getConnectionName();
341313
String fingerprint = RabbitMQClientHelper.computeConnectionFingerprint(options);
342-
SharedClient shared = sharedClients.compute(name, (key, existing) -> {
343-
if (existing != null) {
344-
if (!existing.fingerprint.equals(fingerprint)) {
345-
throw ex.illegalStateSharedConnectionConfigMismatch(name);
346-
}
347-
existing.retain();
348-
if (context != null) {
349-
existing.holder.ensureContext(context);
350-
}
351-
return existing;
352-
}
353-
return new SharedClient(name, new ClientHolder(
354-
RabbitMQClient.create(vertx(), options),
355-
config.getChannel(),
356-
vertx(),
357-
context), fingerprint);
358-
});
359-
clientRegistrations.put(config.getChannel(), new ClientRegistration(shared.holder, true, name));
360-
return shared.holder;
361-
}
362-
363-
public void releaseClient(String channel) {
364-
ClientRegistration registration = clientRegistrations.remove(channel);
365-
if (registration == null) {
366-
return;
367-
}
368-
369-
if (registration.shared) {
370-
releaseSharedClient(registration.key, true);
371-
} else {
372-
stopClient(registration.holder.client(), true);
314+
String existing = connectionFingerprints.putIfAbsent(connectionName, fingerprint);
315+
if (existing != null && !existing.equals(fingerprint)) {
316+
throw ex.illegalStateSharedConnectionConfigMismatch(connectionName);
373317
}
318+
return clients.compute(fingerprint,
319+
(key, current) -> (current == null ? new ClientHolder(RabbitMQClient.create(vertx(), options)) : current)
320+
.retain(channel));
374321
}
375322

376-
private void releaseSharedClient(String sharedName, boolean await) {
377-
SharedClient shared = sharedClients.get(sharedName);
378-
if (shared == null) {
379-
return;
380-
}
381-
if (shared.release()) {
382-
sharedClients.remove(sharedName, shared);
383-
stopClient(shared.holder.client(), await);
323+
public void releaseClient(String channel, boolean await) {
324+
for (var e : clients.entrySet()) {
325+
ClientHolder shared = e.getValue();
326+
if (shared.channels().contains(channel)) {
327+
if (clients.computeIfPresent(e.getKey(), (k, c) -> c.release(channel) ? null : c) == null) {
328+
stopClient(shared.client(), await);
329+
}
330+
return;
331+
}
384332
}
385333
}
386334

@@ -395,36 +343,4 @@ private void stopClient(RabbitMQClient client, boolean await) {
395343
}
396344
}
397345

398-
private static final class ClientRegistration {
399-
final ClientHolder holder;
400-
final boolean shared;
401-
final String key;
402-
403-
private ClientRegistration(ClientHolder holder, boolean shared, String key) {
404-
this.holder = holder;
405-
this.shared = shared;
406-
this.key = key;
407-
}
408-
}
409-
410-
private static final class SharedClient {
411-
final String name;
412-
final ClientHolder holder;
413-
final String fingerprint;
414-
final AtomicInteger references = new AtomicInteger(1);
415-
416-
private SharedClient(String name, ClientHolder holder, String fingerprint) {
417-
this.name = name;
418-
this.holder = holder;
419-
this.fingerprint = fingerprint;
420-
}
421-
422-
private void retain() {
423-
references.incrementAndGet();
424-
}
425-
426-
private boolean release() {
427-
return references.decrementAndGet() == 0;
428-
}
429-
}
430346
}

0 commit comments

Comments
 (0)