14141515package com .rabbitmq .stream .impl ;
1616
17+ import static com .rabbitmq .stream .impl .Tuples .pair ;
1718import static com .rabbitmq .stream .impl .Utils .*;
1819import static java .util .stream .Collectors .toList ;
1920import static java .util .stream .Collectors .toSet ;
3031import com .rabbitmq .stream .impl .Client .PublishErrorListener ;
3132import com .rabbitmq .stream .impl .Client .Response ;
3233import com .rabbitmq .stream .impl .Client .ShutdownListener ;
34+ import com .rabbitmq .stream .impl .Tuples .Pair ;
3335import com .rabbitmq .stream .impl .Utils .ClientConnectionType ;
3436import com .rabbitmq .stream .impl .Utils .ClientFactory ;
3537import com .rabbitmq .stream .impl .Utils .ClientFactoryContext ;
@@ -219,10 +221,13 @@ List<BrokerWrapper> findCandidateNodes(String stream, boolean forceLeader) {
219221
220222 List <BrokerWrapper > candidates = new ArrayList <>();
221223 Client .Broker leader = streamMetadata .getLeader ();
222- if (leader == null && forceLeader ) {
223- throw new IllegalStateException ("Not leader available for stream " + stream );
224+ if (leader == null ) {
225+ if (forceLeader ) {
226+ throw new IllegalStateException ("Not leader available for stream " + stream );
227+ }
228+ } else {
229+ candidates .add (new BrokerWrapper (leader , true ));
224230 }
225- candidates .add (new BrokerWrapper (leader , true ));
226231
227232 if (!forceLeader && streamMetadata .hasReplicas ()) {
228233 candidates .addAll (
@@ -231,7 +236,11 @@ List<BrokerWrapper> findCandidateNodes(String stream, boolean forceLeader) {
231236 .collect (toList ()));
232237 }
233238
234- LOGGER .debug ("Candidates to publish to {}: {}" , stream , candidates );
239+ if (candidates .isEmpty ()) {
240+ throw new IllegalStateException ("No stream member available to publish for stream " + stream );
241+ } else {
242+ LOGGER .debug ("Candidates to publish to {}: {}" , stream , candidates );
243+ }
235244
236245 return List .copyOf (candidates );
237246 }
@@ -721,15 +730,20 @@ private ClientProducersManager(
721730
722731 private void assignProducersToNewManagers (
723732 Collection <AgentTracker > trackers , String stream , BackOffDelayPolicy delayPolicy ) {
724- AsyncRetry .asyncRetry (() -> findCandidateNodes (stream , forceLeader ))
733+ AsyncRetry .asyncRetry (
734+ () -> {
735+ List <BrokerWrapper > candidates = findCandidateNodes (stream , forceLeader );
736+ return pair (pickBroker (candidates ), candidates );
737+ })
725738 .description ("Candidate lookup to publish to " + stream )
726739 .scheduler (environment .scheduledExecutorService ())
727740 .retry (ex -> !(ex instanceof StreamDoesNotExistException ))
728741 .delayPolicy (delayPolicy )
729742 .build ()
730743 .thenAccept (
731- candidates -> {
732- Broker broker = pickBroker (candidates );
744+ brokerAndCandidates -> {
745+ Broker broker = brokerAndCandidates .v1 ();
746+ List <BrokerWrapper > candidates = brokerAndCandidates .v2 ();
733747 String key = keyForNode (broker );
734748 LOGGER .debug (
735749 "Assigning {} producer(s) and consumer tracker(s) to {}" , trackers .size (), key );
@@ -805,15 +819,19 @@ private void recoverAgent(Broker node, List<BrokerWrapper> candidates, AgentTrac
805819 tracker .identifiable () ? tracker .id () : "N/A" ,
806820 tracker .stream ());
807821 // maybe not a good candidate, let's refresh and retry for this one
808- candidates =
809- Utils .callAndMaybeRetry (
810- () -> findCandidateNodes (tracker .stream (), forceLeader ),
822+ Pair <Broker , List <BrokerWrapper >> brokerAndCandidates =
823+ callAndMaybeRetry (
824+ () -> {
825+ List <BrokerWrapper > cs = findCandidateNodes (tracker .stream (), forceLeader );
826+ return pair (pickBroker (cs ), cs );
827+ },
811828 ex -> !(ex instanceof StreamDoesNotExistException ),
812829 environment .recoveryBackOffDelayPolicy (),
813830 "Candidate lookup for %s on stream '%s'" ,
814831 tracker .type (),
815832 tracker .stream ());
816- node = pickBroker (candidates );
833+ node = brokerAndCandidates .v1 ();
834+ candidates = brokerAndCandidates .v2 ();
817835 } catch (Exception e ) {
818836 LOGGER .warn (
819837 "Error while re-assigning {} (stream '{}')" , tracker .type (), tracker .stream (), e );
0 commit comments