66import static io .smallrye .reactive .messaging .rabbitmq .i18n .RabbitMQExceptions .ex ;
77import static io .smallrye .reactive .messaging .rabbitmq .i18n .RabbitMQLogging .log ;
88
9- import java .util .ArrayList ;
109import java .util .List ;
1110import java .util .Map ;
1211import java .util .NoSuchElementException ;
1312import java .util .concurrent .ConcurrentHashMap ;
1413import java .util .concurrent .CopyOnWriteArrayList ;
1514import java .util .concurrent .Flow ;
16- import java .util .concurrent .atomic .AtomicInteger ;
1715
1816import jakarta .annotation .Priority ;
1917import jakarta .enterprise .context .ApplicationScoped ;
@@ -169,10 +167,11 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H
169167 @ Inject
170168 @ Any
171169 Instance <RabbitMQFailureHandler .Factory > failureHandlerFactories ;
172- private List <IncomingRabbitMQChannel > incomings = new CopyOnWriteArrayList <>();
173- private List <OutgoingRabbitMQChannel > outgoings = new CopyOnWriteArrayList <>();
174- private Map <String , ClientRegistration > clientRegistrations = new ConcurrentHashMap <>();
175- private Map <String , SharedClient > sharedClients = new ConcurrentHashMap <>();
170+ private final List <IncomingRabbitMQChannel > incomings = new CopyOnWriteArrayList <>();
171+ private final List <OutgoingRabbitMQChannel > outgoings = new CopyOnWriteArrayList <>();
172+ private final Map <String , ClientHolder > clients = new ConcurrentHashMap <>();
173+ // connection-name to fingerprint map to check against same connection-name but different options
174+ private final Map <String , String > connectionFingerprints = new ConcurrentHashMap <>();
176175
177176 @ Inject
178177 @ Any
@@ -271,11 +270,11 @@ public void terminate(
271270 outgoing .terminate ();
272271 }
273272
274- List <String > registeredChannels = new ArrayList <>(clientRegistrations .keySet ());
275- for (String channel : registeredChannels ) {
276- releaseClient (channel );
273+ for (Map .Entry <String , ClientHolder > entry : clients .entrySet ()) {
274+ stopClient (entry .getValue ().client (), true );
277275 }
278- sharedClients .clear ();
276+ clients .clear ();
277+ connectionFingerprints .clear ();
279278 }
280279
281280 public Vertx vertx () {
@@ -284,16 +283,7 @@ public Vertx vertx() {
284283
285284 public void reportIncomingFailure (String channel , Throwable reason ) {
286285 log .failureReported (channel , reason );
287- ClientRegistration registration = clientRegistrations .remove (channel );
288- if (registration == null ) {
289- return ;
290- }
291-
292- if (registration .shared ) {
293- releaseSharedClient (registration .key , false );
294- } else {
295- stopClient (registration .holder .client (), false );
296- }
286+ releaseClient (channel , false );
297287 }
298288
299289 public Instance <RabbitMQFailureHandler .Factory > failureHandlerFactories () {
@@ -316,71 +306,30 @@ public Instance<CredentialsProvider> credentialsProviders() {
316306 return configMaps ;
317307 }
318308
319- public ClientHolder getClientHolder (RabbitMQConnectorCommonConfiguration config , io .vertx .mutiny .core .Context context ) {
320- ClientRegistration existing = clientRegistrations .get (config .getChannel ());
321- if (existing != null ) {
322- return existing .holder ;
323- }
324-
325- return config .getSharedConnectionName ()
326- .map (name -> getOrCreateSharedHolder (config , context , name ))
327- .orElseGet (() -> createAndRegisterHolder (config , context , config .getChannel (), false ));
328- }
329-
330- private ClientHolder createAndRegisterHolder (RabbitMQConnectorCommonConfiguration config ,
331- io .vertx .mutiny .core .Context context , String key , boolean shared ) {
332- ClientHolder holder = new ClientHolder (RabbitMQClientHelper .createClient (this , config ), config .getChannel (), vertx (),
333- context );
334- clientRegistrations .put (config .getChannel (), new ClientRegistration (holder , shared , key ));
335- return holder ;
336- }
337-
338- private ClientHolder getOrCreateSharedHolder (RabbitMQConnectorCommonConfiguration config ,
339- io .vertx .mutiny .core .Context context , String name ) {
309+ public ClientHolder getClientHolder (RabbitMQConnectorCommonConfiguration config ) {
310+ String channel = config .getChannel ();
340311 RabbitMQOptions options = RabbitMQClientHelper .buildClientOptions (this , config );
312+ String connectionName = options .getConnectionName ();
341313 String fingerprint = RabbitMQClientHelper .computeConnectionFingerprint (options );
342- SharedClient shared = sharedClients .compute (name , (key , existing ) -> {
343- if (existing != null ) {
344- if (!existing .fingerprint .equals (fingerprint )) {
345- throw ex .illegalStateSharedConnectionConfigMismatch (name );
346- }
347- existing .retain ();
348- if (context != null ) {
349- existing .holder .ensureContext (context );
350- }
351- return existing ;
352- }
353- return new SharedClient (name , new ClientHolder (
354- RabbitMQClient .create (vertx (), options ),
355- config .getChannel (),
356- vertx (),
357- context ), fingerprint );
358- });
359- clientRegistrations .put (config .getChannel (), new ClientRegistration (shared .holder , true , name ));
360- return shared .holder ;
361- }
362-
363- public void releaseClient (String channel ) {
364- ClientRegistration registration = clientRegistrations .remove (channel );
365- if (registration == null ) {
366- return ;
367- }
368-
369- if (registration .shared ) {
370- releaseSharedClient (registration .key , true );
371- } else {
372- stopClient (registration .holder .client (), true );
314+ String existing = connectionFingerprints .putIfAbsent (connectionName , fingerprint );
315+ if (existing != null && !existing .equals (fingerprint )) {
316+ throw ex .illegalStateSharedConnectionConfigMismatch (connectionName );
373317 }
318+ return clients .compute (fingerprint ,
319+ (key , current ) -> (current == null ? new ClientHolder (RabbitMQClient .create (vertx (), options )) : current )
320+ .retain (channel ));
374321 }
375322
376- private void releaseSharedClient (String sharedName , boolean await ) {
377- SharedClient shared = sharedClients .get (sharedName );
378- if (shared == null ) {
379- return ;
380- }
381- if (shared .release ()) {
382- sharedClients .remove (sharedName , shared );
383- stopClient (shared .holder .client (), await );
323+ public void releaseClient (String channel , boolean await ) {
324+ for (var e : clients .entrySet ()) {
325+ ClientHolder shared = e .getValue ();
326+ if (shared .channels ().contains (channel )) {
327+ if (clients .computeIfPresent (e .getKey (), (k , c ) -> c .release (channel ) ? null : c ) == null ) {
328+ connectionFingerprints .values ().remove (e .getKey ());
329+ stopClient (shared .client (), await );
330+ }
331+ return ;
332+ }
384333 }
385334 }
386335
@@ -395,36 +344,4 @@ private void stopClient(RabbitMQClient client, boolean await) {
395344 }
396345 }
397346
398- private static final class ClientRegistration {
399- final ClientHolder holder ;
400- final boolean shared ;
401- final String key ;
402-
403- private ClientRegistration (ClientHolder holder , boolean shared , String key ) {
404- this .holder = holder ;
405- this .shared = shared ;
406- this .key = key ;
407- }
408- }
409-
410- private static final class SharedClient {
411- final String name ;
412- final ClientHolder holder ;
413- final String fingerprint ;
414- final AtomicInteger references = new AtomicInteger (1 );
415-
416- private SharedClient (String name , ClientHolder holder , String fingerprint ) {
417- this .name = name ;
418- this .holder = holder ;
419- this .fingerprint = fingerprint ;
420- }
421-
422- private void retain () {
423- references .incrementAndGet ();
424- }
425-
426- private boolean release () {
427- return references .decrementAndGet () == 0 ;
428- }
429- }
430347}
0 commit comments