Skip to content

Commit 376b9df

Browse files
authored
Merge pull request #847 from rabbitmq/consumer-recovery
Refresh candidate nodes in recovery in case of connection reset
2 parents d3d507e + baa1f23 commit 376b9df

File tree

9 files changed

+270
-176
lines changed

9 files changed

+270
-176
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,11 @@ private Map<String, String> peerProperties() {
573573
if (request.error() == null) {
574574
return request.response.get();
575575
} else {
576-
throw new StreamException("Error when establishing stream connection", request.error());
576+
if (request.error() instanceof StreamException) {
577+
throw (StreamException) request.error();
578+
} else {
579+
throw new StreamException("Error when establishing stream connection", request.error());
580+
}
577581
}
578582
} catch (StreamException e) {
579583
this.handleRpcError(correlationId, e);

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 95 additions & 94 deletions
Large diffs are not rendered by default.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import com.rabbitmq.stream.StreamException;
18+
import com.rabbitmq.stream.StreamNotAvailableException;
19+
import java.nio.channels.ClosedChannelException;
20+
import java.util.function.Predicate;
21+
22+
final class CoordinatorUtils {
23+
24+
private static final Predicate<Throwable> REFRESH_CANDIDATES =
25+
e ->
26+
e instanceof ConnectionStreamException
27+
|| e instanceof ClientClosedException
28+
|| e instanceof StreamNotAvailableException
29+
|| e instanceof ClosedChannelException;
30+
31+
private CoordinatorUtils() {}
32+
33+
static boolean shouldRefreshCandidates(Throwable e) {
34+
return REFRESH_CANDIDATES.test(e) || REFRESH_CANDIDATES.test(e.getCause());
35+
}
36+
37+
static class ClientClosedException extends StreamException {
38+
39+
public ClientClosedException() {
40+
super("Client already closed");
41+
}
42+
}
43+
}

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 47 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.CoordinatorUtils.ClientClosedException;
18+
import static com.rabbitmq.stream.impl.CoordinatorUtils.shouldRefreshCandidates;
1719
import static com.rabbitmq.stream.impl.Tuples.pair;
1820
import static com.rabbitmq.stream.impl.Utils.AVAILABLE_PROCESSORS;
1921
import static com.rabbitmq.stream.impl.Utils.callAndMaybeRetry;
@@ -31,7 +33,6 @@
3133
import com.rabbitmq.stream.Constants;
3234
import com.rabbitmq.stream.StreamDoesNotExistException;
3335
import com.rabbitmq.stream.StreamException;
34-
import com.rabbitmq.stream.StreamNotAvailableException;
3536
import com.rabbitmq.stream.impl.Client.Broker;
3637
import com.rabbitmq.stream.impl.Client.ClientParameters;
3738
import com.rabbitmq.stream.impl.Client.MetadataListener;
@@ -193,22 +194,18 @@ private void addToManager(Broker node, List<BrokerWrapper> candidates, AgentTrac
193194
this.managers.add(pickedManager);
194195
} catch (IllegalStateException e) {
195196
pickedManager = null;
196-
} catch (ConnectionStreamException | ClientClosedException | StreamNotAvailableException e) {
197-
// manager connection is dead or stream not available
198-
// scheduling manager closing if necessary in another thread to avoid blocking this one
199-
if (pickedManager.isEmpty()) {
200-
ClientProducersManager manager = pickedManager;
201-
this.environment.execute(
202-
() -> {
203-
manager.closeIfEmpty();
204-
},
205-
"Producer manager closing after timeout, producer %d on stream '%s'",
206-
tracker.uniqueId(),
207-
tracker.stream());
208-
}
209-
throw e;
210197
} catch (RuntimeException e) {
211-
if (pickedManager != null) {
198+
if (shouldRefreshCandidates(e)) {
199+
// manager connection is dead or stream not available
200+
// scheduling manager closing if necessary in another thread to avoid blocking this one
201+
if (pickedManager.isEmpty()) {
202+
this.environment.execute(
203+
pickedManager::closeIfEmpty,
204+
"Producer manager closing after timeout, producer %d on stream '%s'",
205+
tracker.uniqueId(),
206+
tracker.stream());
207+
}
208+
} else {
212209
pickedManager.closeIfEmpty();
213210
}
214211
throw e;
@@ -337,12 +334,14 @@ public String toString() {
337334
managerBuilder.append(
338335
m.producers.values().stream()
339336
.map(
340-
p -> {
341-
StringBuilder producerBuilder = new StringBuilder("{");
342-
producerBuilder.append(jsonField("stream", p.stream())).append(",");
343-
producerBuilder.append(jsonField("producer_id", p.publisherId));
344-
return producerBuilder.append("}").toString();
345-
})
337+
p ->
338+
"{"
339+
+ jsonField("stream", p.stream())
340+
+ ","
341+
+ jsonField("producer_id", p.publisherId)
342+
+ ","
343+
+ jsonField("state", p.producer.state())
344+
+ "}")
346345
.collect(Collectors.joining(",")));
347346
managerBuilder.append("],");
348347
managerBuilder.append("\"tracking_consumers\" : [");
@@ -832,33 +831,33 @@ private void recoverAgent(Broker node, List<BrokerWrapper> candidates, AgentTrac
832831
tracker.stream());
833832
}
834833
reassignmentCompleted = true;
835-
} catch (ConnectionStreamException
836-
| ClientClosedException
837-
| StreamNotAvailableException e) {
838-
LOGGER.debug(
839-
"{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
840-
+ "refreshing candidate leader and retrying",
841-
tracker.type(),
842-
tracker.identifiable() ? tracker.id() : "N/A",
843-
tracker.stream());
844-
// maybe not a good candidate, let's refresh and retry for this one
845-
Pair<Broker, List<BrokerWrapper>> brokerAndCandidates =
846-
callAndMaybeRetry(
847-
() -> {
848-
List<BrokerWrapper> cs = findCandidateNodes(tracker.stream(), forceLeader);
849-
return pair(pickBroker(cs), cs);
850-
},
851-
ex -> !(ex instanceof StreamDoesNotExistException),
852-
environment.recoveryBackOffDelayPolicy(),
853-
"Candidate lookup for %s on stream '%s'",
854-
tracker.type(),
855-
tracker.stream());
856-
node = brokerAndCandidates.v1();
857-
candidates = brokerAndCandidates.v2();
858834
} catch (Exception e) {
859-
LOGGER.warn(
860-
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
861-
reassignmentCompleted = true;
835+
if (shouldRefreshCandidates(e)) {
836+
LOGGER.debug(
837+
"{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
838+
+ "refreshing candidate leader and retrying",
839+
tracker.type(),
840+
tracker.identifiable() ? tracker.id() : "N/A",
841+
tracker.stream());
842+
// maybe not a good candidate, let's refresh and retry for this one
843+
Pair<Broker, List<BrokerWrapper>> brokerAndCandidates =
844+
callAndMaybeRetry(
845+
() -> {
846+
List<BrokerWrapper> cs = findCandidateNodes(tracker.stream(), forceLeader);
847+
return pair(pickBroker(cs), cs);
848+
},
849+
ex -> !(ex instanceof StreamDoesNotExistException),
850+
environment.recoveryBackOffDelayPolicy(),
851+
"Candidate lookup for %s on stream '%s'",
852+
tracker.type(),
853+
tracker.stream());
854+
node = brokerAndCandidates.v1();
855+
candidates = brokerAndCandidates.v2();
856+
} else {
857+
LOGGER.warn(
858+
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
859+
reassignmentCompleted = true;
860+
}
862861
}
863862
}
864863
}
@@ -1022,13 +1021,6 @@ public int hashCode() {
10221021
private static final Predicate<Exception> RETRY_ON_TIMEOUT =
10231022
e -> e instanceof TimeoutStreamException;
10241023

1025-
private static class ClientClosedException extends StreamException {
1026-
1027-
public ClientClosedException() {
1028-
super("Client already closed");
1029-
}
1030-
}
1031-
10321024
static <T> int pickSlot(ConcurrentMap<Byte, T> map, T tracker, AtomicInteger sequence) {
10331025
int index = -1;
10341026
T previousValue = tracker;

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ public String toString() {
579579
Client subscriptionClient = this.subscriptionClient;
580580
Client trackingClient = this.trackingClient;
581581
return "{ "
582-
+ "\"id\" : "
582+
+ "\"consumer_id\" : "
583583
+ id
584584
+ ","
585585
+ "\"stream\" : \""

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import com.rabbitmq.stream.oauth2.CredentialsManager;
6262
import com.rabbitmq.stream.sasl.CredentialsProvider;
6363
import com.rabbitmq.stream.sasl.UsernamePasswordCredentialsProvider;
64-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
6564
import io.netty.buffer.ByteBufAllocator;
6665
import io.netty.channel.EventLoopGroup;
6766
import io.netty.handler.ssl.SslContext;
@@ -99,7 +98,7 @@
9998
import org.slf4j.Logger;
10099
import org.slf4j.LoggerFactory;
101100

102-
class StreamEnvironment implements Environment {
101+
final class StreamEnvironment implements Environment {
103102

104103
private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class);
105104

@@ -130,7 +129,6 @@ class StreamEnvironment implements Environment {
130129
private final ObservationCollector<?> observationCollector;
131130
private final Duration rpcTimeout;
132131

133-
@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
134132
StreamEnvironment(
135133
ScheduledExecutorService scheduledExecutorService,
136134
Client.ClientParameters clientParametersPrototype,

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ public int hashCode() {
608608
public String toString() {
609609
Client client = this.client;
610610
return "{ "
611-
+ "\"id\" : "
611+
+ "\"producer_id\" : "
612612
+ id
613613
+ ","
614614
+ "\"stream\" : \""
@@ -677,4 +677,8 @@ private void executeInLock(Runnable action) {
677677
this.unlock();
678678
}
679679
}
680+
681+
long id() {
682+
return this.id;
683+
}
680684
}

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,10 @@ static String jsonField(String name, Number value) {
599599
return quote(name) + " : " + value.longValue();
600600
}
601601

602+
static String jsonField(String name, Enum<?> value) {
603+
return quote(name) + " : " + (value == null ? "null" : value.name());
604+
}
605+
602606
static String jsonField(String name, String value) {
603607
return quote(name) + " : " + quote(value);
604608
}

0 commit comments

Comments
 (0)