Skip to content

Commit 922b20c

Browse files
committed
Some tests and cleanup
1 parent d0afa90 commit 922b20c

File tree

11 files changed

+480
-155
lines changed

11 files changed

+480
-155
lines changed

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,40 @@ public class ClientHolder {
2424
private final AtomicReference<CurrentConnection> connectionHolder = new AtomicReference<>();
2525
private final AtomicReference<Context> rootContext;
2626
private final AtomicReference<CompletionStage<RabbitMQClient>> connectionStage = new AtomicReference<>();
27+
private final Uni<RabbitMQClient> connection;
2728

2829
private final Vertx vertx;
29-
private final RabbitMQConnectorCommonConfiguration configuration;
3030

3131
public ClientHolder(RabbitMQClient client,
32-
RabbitMQConnectorCommonConfiguration configuration,
32+
String channel,
3333
Vertx vertx,
3434
Context root) {
3535
this.client = client;
36-
this.configuration = configuration;
3736
this.vertx = vertx;
3837
this.rootContext = new AtomicReference<>(root);
38+
this.connection = Uni.createFrom().deferred(() -> client.start()
39+
.onSubscription().invoke(() -> {
40+
connected.set(true);
41+
log.connectionEstablished(channel);
42+
})
43+
.onItem().transform(ignored -> {
44+
Context ctx = rootContext.get();
45+
connectionHolder
46+
.set(new CurrentConnection(client, ctx == null ? Vertx.currentContext() : ctx));
47+
48+
// handle the case we are already disconnected.
49+
if (!client.isConnected() || connectionHolder.get() == null) {
50+
// Throwing the exception would trigger a retry.
51+
connectionHolder.set(null);
52+
throw ex.illegalStateConnectionDisconnected();
53+
}
54+
return client;
55+
})
56+
.onFailure().invoke(log::unableToConnectToBroker)
57+
.onFailure().invoke(t -> {
58+
connectionHolder.set(null);
59+
log.unableToRecoverFromConnectionDisruption(t);
60+
}));
3961
}
4062

4163
public static CompletionStage<Void> runOnContext(Context context, IncomingRabbitMQMessage<?> msg,
@@ -110,7 +132,7 @@ public Uni<RabbitMQClient> getOrEstablishConnection() {
110132
if (current != null) {
111133
return Uni.createFrom().completionStage(current);
112134
}
113-
CompletionStage<RabbitMQClient> created = createConnectionUni().subscribeAsCompletionStage();
135+
CompletionStage<RabbitMQClient> created = connection.subscribeAsCompletionStage();
114136
if (connectionStage.compareAndSet(null, created)) {
115137
created.whenComplete((result, error) -> {
116138
if (error != null) {
@@ -133,32 +155,4 @@ private CurrentConnection(RabbitMQClient client, Context context) {
133155
}
134156
}
135157

136-
private Uni<RabbitMQClient> createConnectionUni() {
137-
return Uni.createFrom().deferred(() -> client.start()
138-
.onSubscription().invoke(() -> {
139-
connected.set(true);
140-
log.connectionEstablished(configuration.getChannel());
141-
})
142-
.onItem().transform(ignored -> {
143-
Context context = rootContext.get();
144-
if (context == null) {
145-
context = Vertx.currentContext();
146-
}
147-
connectionHolder.set(new CurrentConnection(client, context));
148-
149-
// handle the case we are already disconnected.
150-
if (!client.isConnected() || connectionHolder.get() == null) {
151-
// Throwing the exception would trigger a retry.
152-
connectionHolder.set(null);
153-
throw ex.illegalStateConnectionDisconnected();
154-
}
155-
return client;
156-
})
157-
.onFailure().invoke(log::unableToConnectToBroker)
158-
.onFailure().invoke(t -> {
159-
connectionHolder.set(null);
160-
log.unableToRecoverFromConnectionDisruption(t);
161-
}));
162-
}
163-
164158
}

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,6 @@ public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder, Co
7272
this(delegate.getDelegate(), holder, context, onNack, onAck, contentTypeOverride);
7373
}
7474

75-
IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder,
76-
RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) {
77-
this(msg, holder, holder.getContext(), onNack, onAck, contentTypeOverride);
78-
}
79-
8075
IncomingRabbitMQMessage(io.vertx.rabbitmq.RabbitMQMessage msg, ClientHolder holder, Context context,
8176
RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck, String contentTypeOverride) {
8277
this.message = msg;

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,8 @@ public ClientHolder getClientHolder(RabbitMQConnectorCommonConfiguration config,
329329

330330
private ClientHolder createAndRegisterHolder(RabbitMQConnectorCommonConfiguration config,
331331
io.vertx.mutiny.core.Context context, String key, boolean shared) {
332-
ClientHolder holder = new ClientHolder(RabbitMQClientHelper.createClient(this, config), config, vertx(), context);
332+
ClientHolder holder = new ClientHolder(RabbitMQClientHelper.createClient(this, config), config.getChannel(), vertx(),
333+
context);
333334
clientRegistrations.put(config.getChannel(), new ClientRegistration(holder, shared, key));
334335
return holder;
335336
}
@@ -351,7 +352,7 @@ private ClientHolder getOrCreateSharedHolder(RabbitMQConnectorCommonConfiguratio
351352
}
352353
return new SharedClient(name, new ClientHolder(
353354
RabbitMQClient.create(vertx(), options),
354-
config,
355+
config.getChannel(),
355356
vertx(),
356357
context), fingerprint);
357358
});

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class OutgoingRabbitMQChannel {
2626
private final Flow.Subscriber<Message<?>> subscriber;
2727
private final RabbitMQConnectorOutgoingConfiguration config;
2828
private final ClientHolder holder;
29+
private final RabbitMQMessageSender processor;
2930
private volatile RabbitMQPublisher publisher;
3031

3132
public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc,
@@ -54,10 +55,19 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut
5455
.onFailure().recoverWithNull().memoize().indefinitely();
5556

5657
// Set up a sender based on the publisher we established above
57-
final RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance);
58+
processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance);
5859

5960
// Return a SubscriberBuilder
60-
subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t)));
61+
subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t))
62+
.onTermination().call(() -> {
63+
if (publisher != null) {
64+
return publisher.stop()
65+
.ifNoItem().after(Duration.ofSeconds(oc.getReconnectInterval())).fail()
66+
.onFailure()
67+
.invoke(e -> log.infof(e, "Error terminating outgoing channel %s", config.getChannel()));
68+
}
69+
return Uni.createFrom().voidItem();
70+
}));
6171
}
6272

6373
public Flow.Subscriber<Message<?>> getSubscriber() {
@@ -95,12 +105,6 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder
95105
}
96106

97107
public void terminate() {
98-
if (publisher != null) {
99-
try {
100-
publisher.stop().await().atMost(Duration.ofMillis(config.getConnectionTimeout()));
101-
} catch (Exception e) {
102-
log.infof(e, "Error terminating outgoing channel %s", config.getChannel());
103-
}
104-
}
108+
processor.cancel();
105109
}
106110
}

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java

Lines changed: 4 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@
3232
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
3333
import io.vertx.core.json.JsonObject;
3434
import io.vertx.core.net.JksOptions;
35-
import io.vertx.core.net.KeyCertOptions;
36-
import io.vertx.core.net.PemKeyCertOptions;
37-
import io.vertx.core.net.PemTrustOptions;
38-
import io.vertx.core.net.PfxOptions;
39-
import io.vertx.core.net.TrustOptions;
4035
import io.vertx.mutiny.core.Vertx;
4136
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
4237
import io.vertx.rabbitmq.RabbitMQOptions;
@@ -80,47 +75,12 @@ public static RabbitMQOptions buildClientOptions(RabbitMQConnector connector, Ra
8075
}
8176

8277
public static String computeConnectionFingerprint(RabbitMQOptions options) {
83-
StringBuilder raw = new StringBuilder();
84-
append(raw, "uri", options.getUri());
85-
78+
JsonObject json = options.toJson();
8679
List<Address> addresses = options.getAddresses();
87-
if (addresses != null && !addresses.isEmpty()) {
88-
List<String> normalized = addresses.stream()
89-
.map(address -> address.getHost() + ":" + address.getPort())
90-
.sorted()
91-
.collect(Collectors.toList());
92-
append(raw, "addresses", String.join(",", normalized));
93-
} else {
94-
append(raw, "host", options.getHost());
95-
append(raw, "port", Integer.toString(options.getPort()));
80+
if (addresses != null) {
81+
json.put("addresses", addresses.stream().map(Address::toString).collect(Collectors.toList()));
9682
}
97-
98-
append(raw, "virtualHost", options.getVirtualHost());
99-
append(raw, "user", options.getUser());
100-
append(raw, "passwordHash", hashValue(options.getPassword()));
101-
102-
append(raw, "ssl", Boolean.toString(options.isSsl()));
103-
append(raw, "trustAll", Boolean.toString(options.isTrustAll()));
104-
append(raw, "hostnameVerificationAlgorithm", options.getHostnameVerificationAlgorithm());
105-
append(raw, "keyCertOptions", keyCertFingerprint(options.getKeyCertOptions()));
106-
append(raw, "trustOptions", trustFingerprint(options.getTrustOptions()));
107-
108-
append(raw, "connectionTimeout", Integer.toString(options.getConnectionTimeout()));
109-
append(raw, "handshakeTimeout", Integer.toString(options.getHandshakeTimeout()));
110-
append(raw, "requestedHeartbeat", Integer.toString(options.getRequestedHeartbeat()));
111-
append(raw, "requestedChannelMax", Integer.toString(options.getRequestedChannelMax()));
112-
append(raw, "networkRecoveryInterval", Long.toString(options.getNetworkRecoveryInterval()));
113-
append(raw, "automaticRecoveryEnabled", Boolean.toString(options.isAutomaticRecoveryEnabled()));
114-
append(raw, "automaticRecoveryOnInitialConnection", Boolean.toString(options.isAutomaticRecoveryOnInitialConnection()));
115-
append(raw, "useNio", Boolean.toString(options.isNioEnabled()));
116-
append(raw, "reconnectAttempts", Integer.toString(options.getReconnectAttempts()));
117-
append(raw, "reconnectInterval", Long.toString(options.getReconnectInterval()));
118-
119-
append(raw, "credentialsProvider", className(options.getCredentialsProvider()));
120-
append(raw, "credentialsRefreshService", className(options.getCredentialsRefreshService()));
121-
append(raw, "saslConfig", className(options.getSaslConfig()));
122-
123-
return sha256(raw.toString());
83+
return sha256(json.encode());
12484
}
12585

12686
static RabbitMQOptions getClientOptionsFromBean(Instance<RabbitMQOptions> options, String optionsBeanName) {
@@ -223,21 +183,6 @@ private static String resolveConnectionName(RabbitMQConnectorCommonConfiguration
223183
config instanceof RabbitMQConnectorIncomingConfiguration ? "Incoming" : "Outgoing"));
224184
}
225185

226-
private static void append(StringBuilder target, String key, String value) {
227-
target.append(key).append('=').append(value == null ? "" : value).append(';');
228-
}
229-
230-
private static String className(Object value) {
231-
return value == null ? "" : value.getClass().getName();
232-
}
233-
234-
private static String hashValue(String value) {
235-
if (value == null) {
236-
return "";
237-
}
238-
return sha256(value);
239-
}
240-
241186
private static String sha256(String value) {
242187
try {
243188
MessageDigest digest = MessageDigest.getInstance("SHA-256");
@@ -252,48 +197,6 @@ private static String sha256(String value) {
252197
}
253198
}
254199

255-
private static String keyCertFingerprint(KeyCertOptions options) {
256-
if (options == null) {
257-
return "";
258-
}
259-
if (options instanceof JksOptions) {
260-
JksOptions jks = (JksOptions) options;
261-
return String.join(":", "JKS", nullToEmpty(jks.getPath()), nullToEmpty(jks.getAlias()));
262-
}
263-
if (options instanceof PfxOptions) {
264-
PfxOptions pfx = (PfxOptions) options;
265-
return String.join(":", "PFX", nullToEmpty(pfx.getPath()), nullToEmpty(pfx.getAlias()));
266-
}
267-
if (options instanceof PemKeyCertOptions) {
268-
PemKeyCertOptions pem = (PemKeyCertOptions) options;
269-
return String.join(":", "PEM", String.join(",", pem.getKeyPaths()), String.join(",", pem.getCertPaths()));
270-
}
271-
return options.getClass().getName();
272-
}
273-
274-
private static String trustFingerprint(TrustOptions options) {
275-
if (options == null) {
276-
return "";
277-
}
278-
if (options instanceof JksOptions) {
279-
JksOptions jks = (JksOptions) options;
280-
return String.join(":", "JKS", nullToEmpty(jks.getPath()), nullToEmpty(jks.getAlias()));
281-
}
282-
if (options instanceof PfxOptions) {
283-
PfxOptions pfx = (PfxOptions) options;
284-
return String.join(":", "PFX", nullToEmpty(pfx.getPath()), nullToEmpty(pfx.getAlias()));
285-
}
286-
if (options instanceof PemTrustOptions) {
287-
PemTrustOptions pem = (PemTrustOptions) options;
288-
return String.join(":", "PEM", String.join(",", pem.getCertPaths()));
289-
}
290-
return options.getClass().getName();
291-
}
292-
293-
private static String nullToEmpty(String value) {
294-
return value == null ? "" : value;
295-
}
296-
297200
public static String serverQueueName(String name) {
298201
if (name.equals("(server.auto)")) {
299202
return "";
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.smallrye.reactive.messaging.rabbitmq;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
7+
import jakarta.enterprise.context.ApplicationScoped;
8+
9+
import org.eclipse.microprofile.reactive.messaging.Incoming;
10+
import org.eclipse.microprofile.reactive.messaging.Message;
11+
12+
import io.smallrye.mutiny.Uni;
13+
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
14+
import io.vertx.core.Context;
15+
16+
@ApplicationScoped
17+
public class DualIncomingContextBean {
18+
19+
private final CountDownLatch latch1 = new CountDownLatch(1);
20+
private final CountDownLatch latch2 = new CountDownLatch(1);
21+
private final AtomicReference<Context> context1 = new AtomicReference<>();
22+
private final AtomicReference<Context> context2 = new AtomicReference<>();
23+
private final AtomicReference<Boolean> eventLoop1 = new AtomicReference<>();
24+
private final AtomicReference<Boolean> eventLoop2 = new AtomicReference<>();
25+
26+
@Incoming("data1")
27+
public Uni<Void> consume1(Message<Integer> message) {
28+
message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> {
29+
Context context = metadata.context();
30+
context1.set(context);
31+
eventLoop1.set(context.isEventLoopContext());
32+
});
33+
latch1.countDown();
34+
return Uni.createFrom().voidItem();
35+
}
36+
37+
@Incoming("data2")
38+
public Uni<Void> consume2(Message<Integer> message) {
39+
message.getMetadata(LocalContextMetadata.class).ifPresent(metadata -> {
40+
Context context = metadata.context();
41+
context2.set(context);
42+
eventLoop2.set(context.isEventLoopContext());
43+
});
44+
latch2.countDown();
45+
return Uni.createFrom().voidItem();
46+
}
47+
48+
public boolean awaitMessages(long timeout, TimeUnit unit) throws InterruptedException {
49+
return latch1.await(timeout, unit) && latch2.await(timeout, unit);
50+
}
51+
52+
public Context getContext1() {
53+
return context1.get();
54+
}
55+
56+
public Context getContext2() {
57+
return context2.get();
58+
}
59+
60+
public boolean isEventLoop1() {
61+
Boolean value = eventLoop1.get();
62+
return value != null && value;
63+
}
64+
65+
public boolean isEventLoop2() {
66+
Boolean value = eventLoop2.get();
67+
return value != null && value;
68+
}
69+
}

smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/OutgoingBean.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import jakarta.enterprise.context.ApplicationScoped;
44

5-
import org.eclipse.microprofile.reactive.messaging.Message;
65
import org.eclipse.microprofile.reactive.messaging.Outgoing;
76

7+
import io.smallrye.mutiny.Multi;
8+
89
/**
910
* A bean that can be registered to do just enough to support the
1011
* declaration of an exchange backing an outgoing rabbitmq channel.
@@ -13,8 +14,8 @@
1314
public class OutgoingBean {
1415

1516
@Outgoing("sink")
16-
public Message<String> process() {
17-
return Message.of("test");
17+
public Multi<String> process() {
18+
return Multi.createFrom().items("test", "test2", "test3");
1819
}
1920

2021
}

0 commit comments

Comments
 (0)