1- // Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+ // Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33//
44// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -70,6 +70,7 @@ class ConsumersCoordinator {
7070 private final AtomicLong managerIdSequence = new AtomicLong (0 );
7171 private final NavigableSet <ClientSubscriptionsManager > managers = new ConcurrentSkipListSet <>();
7272 private final AtomicLong trackerIdSequence = new AtomicLong (0 );
73+ private final Function <List <Broker >, Broker > brokerPicker ;
7374
7475 private final List <SubscriptionTracker > trackers = new CopyOnWriteArrayList <>();
7576 private final ExecutorServiceFactory executorServiceFactory =
@@ -83,16 +84,14 @@ class ConsumersCoordinator {
8384 int maxConsumersByConnection ,
8485 Function <ClientConnectionType , String > connectionNamingStrategy ,
8586 ClientFactory clientFactory ,
86- boolean forceReplica ) {
87+ boolean forceReplica ,
88+ Function <List <Broker >, Broker > brokerPicker ) {
8789 this .environment = environment ;
8890 this .clientFactory = clientFactory ;
8991 this .maxConsumersByConnection = maxConsumersByConnection ;
9092 this .connectionNamingStrategy = connectionNamingStrategy ;
9193 this .forceReplica = forceReplica ;
92- }
93-
94- private static String keyForClientSubscription (Client .Broker broker ) {
95- return broker .getHost () + ":" + broker .getPort ();
94+ this .brokerPicker = brokerPicker ;
9695 }
9796
9897 private BackOffDelayPolicy recoveryBackOffDelayPolicy () {
@@ -138,7 +137,7 @@ Runnable subscribe(
138137 flowStrategy );
139138
140139 try {
141- addToManager (newNode , subscriptionTracker , offsetSpecification , true );
140+ addToManager (newNode , candidates , subscriptionTracker , offsetSpecification , true );
142141 } catch (ConnectionStreamException e ) {
143142 // these exceptions are not public
144143 throw new StreamException (e .getMessage ());
@@ -162,6 +161,7 @@ Runnable subscribe(
162161
163162 private void addToManager (
164163 Broker node ,
164+ List <Broker > candidates ,
165165 SubscriptionTracker tracker ,
166166 OffsetSpecification offsetSpecification ,
167167 boolean isInitialSubscription ) {
@@ -189,9 +189,9 @@ private void addToManager(
189189 }
190190 }
191191 if (pickedManager == null ) {
192- String name = keyForClientSubscription (node );
192+ String name = keyForNode (node );
193193 LOGGER .debug ("Creating subscription manager on {}" , name );
194- pickedManager = new ClientSubscriptionsManager (node , clientParameters );
194+ pickedManager = new ClientSubscriptionsManager (node , candidates , clientParameters );
195195 LOGGER .debug ("Created subscription manager on {}, id {}" , name , pickedManager .id );
196196 }
197197 try {
@@ -571,6 +571,7 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
571571 private final long id ;
572572 private final Broker node ;
573573 private final Client client ;
574+ // <host>:<port> (actual or advertised)
574575 private final String name ;
575576 // the 2 data structures track the subscriptions, they must remain consistent
576577 private final Map <String , Set <SubscriptionTracker >> streamToStreamSubscriptions =
@@ -582,12 +583,12 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
582583 private volatile int trackerCount ;
583584 private final AtomicBoolean closed = new AtomicBoolean (false );
584585
585- private ClientSubscriptionsManager (Broker node , Client .ClientParameters clientParameters ) {
586+ private ClientSubscriptionsManager (
587+ Broker targetNode , List <Broker > candidates , Client .ClientParameters clientParameters ) {
586588 this .id = managerIdSequence .getAndIncrement ();
587- this .node = node ;
588- this .name = keyForClientSubscription (node );
589- LOGGER .debug ("creating subscription manager on {}" , name );
590589 this .trackerCount = 0 ;
590+ AtomicReference <String > nameReference = new AtomicReference <>();
591+
591592 AtomicBoolean clientInitializedInManager = new AtomicBoolean (false );
592593 ChunkListener chunkListener =
593594 (client , subscriptionId , offset , messageCount , dataSize ) -> {
@@ -639,7 +640,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
639640 "Could not find stream subscription {} in manager {}, node {} for message listener" ,
640641 subscriptionId ,
641642 this .id ,
642- this . name );
643+ nameReference . get () );
643644 }
644645 };
645646 MessageIgnoredListener messageIgnoredListener =
@@ -663,7 +664,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
663664 "Could not find stream subscription {} in manager {}, node {} for message ignored listener" ,
664665 subscriptionId ,
665666 this .id ,
666- this . name );
667+ nameReference . get () );
667668 }
668669 };
669670 ShutdownListener shutdownListener =
@@ -675,7 +676,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
675676 if (shutdownContext .isShutdownUnexpected ()) {
676677 LOGGER .debug (
677678 "Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment" ,
678- name );
679+ nameReference . get () );
679680 LOGGER .debug (
680681 "Subscription connection has {} consumer(s) over {} stream(s) to recover" ,
681682 this .subscriptionTrackers .stream ().filter (Objects ::nonNull ).count (),
@@ -718,7 +719,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
718719 }
719720 },
720721 "Consumers re-assignment after disconnection from %s" ,
721- name ));
722+ nameReference . get () ));
722723 }
723724 };
724725 MetadataListener metadataListener =
@@ -792,18 +793,23 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
792793 };
793794 String connectionName = connectionNamingStrategy .apply (ClientConnectionType .CONSUMER );
794795 ClientFactoryContext clientFactoryContext =
795- ClientFactoryContext .fromParameters (
796- clientParameters
797- .clientProperty ("connection_name" , connectionName )
798- .chunkListener (chunkListener )
799- .creditNotification (creditNotification )
800- .messageListener (messageListener )
801- .messageIgnoredListener (messageIgnoredListener )
802- .shutdownListener (shutdownListener )
803- .metadataListener (metadataListener )
804- .consumerUpdateListener (consumerUpdateListener ))
805- .key (name );
796+ new ClientFactoryContext (
797+ clientParameters
798+ .clientProperty ("connection_name" , connectionName )
799+ .chunkListener (chunkListener )
800+ .creditNotification (creditNotification )
801+ .messageListener (messageListener )
802+ .messageIgnoredListener (messageIgnoredListener )
803+ .shutdownListener (shutdownListener )
804+ .metadataListener (metadataListener )
805+ .consumerUpdateListener (consumerUpdateListener ),
806+ keyForNode (targetNode ),
807+ candidates );
806808 this .client = clientFactory .client (clientFactoryContext );
809+ this .node = brokerFromClient (this .client );
810+ this .name = keyForNode (this .node );
811+ nameReference .set (this .name );
812+ LOGGER .debug ("creating subscription manager on {}" , name );
807813 LOGGER .debug ("Created consumer connection '{}'" , connectionName );
808814 clientInitializedInManager .set (true );
809815 }
@@ -906,7 +912,7 @@ private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tr
906912 } else {
907913 offsetSpecification = tracker .initialOffsetSpecification ;
908914 }
909- addToManager (broker , tracker , offsetSpecification , false );
915+ addToManager (broker , candidates , tracker , offsetSpecification , false );
910916 }
911917 }
912918 } else {
0 commit comments