Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,11 @@ private Map<String, String> peerProperties() {
if (request.error() == null) {
return request.response.get();
} else {
throw new StreamException("Error when establishing stream connection", request.error());
if (request.error() instanceof StreamException) {
throw (StreamException) request.error();
} else {
throw new StreamException("Error when establishing stream connection", request.error());
}
}
} catch (StreamException e) {
this.handleRpcError(correlationId, e);
Expand Down
189 changes: 95 additions & 94 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/CoordinatorUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamNotAvailableException;
import java.nio.channels.ClosedChannelException;
import java.util.function.Predicate;

final class CoordinatorUtils {

private static final Predicate<Throwable> REFRESH_CANDIDATES =
e ->
e instanceof ConnectionStreamException
|| e instanceof ClientClosedException
|| e instanceof StreamNotAvailableException
|| e instanceof ClosedChannelException;

private CoordinatorUtils() {}

static boolean shouldRefreshCandidates(Throwable e) {
return REFRESH_CANDIDATES.test(e) || REFRESH_CANDIDATES.test(e.getCause());
}

static class ClientClosedException extends StreamException {

public ClientClosedException() {
super("Client already closed");
}
}
}
102 changes: 47 additions & 55 deletions src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// [email protected].
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.CoordinatorUtils.ClientClosedException;
import static com.rabbitmq.stream.impl.CoordinatorUtils.shouldRefreshCandidates;
import static com.rabbitmq.stream.impl.Tuples.pair;
import static com.rabbitmq.stream.impl.Utils.AVAILABLE_PROCESSORS;
import static com.rabbitmq.stream.impl.Utils.callAndMaybeRetry;
Expand All @@ -31,7 +33,6 @@
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamNotAvailableException;
import com.rabbitmq.stream.impl.Client.Broker;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.MetadataListener;
Expand Down Expand Up @@ -193,22 +194,18 @@ private void addToManager(Broker node, List<BrokerWrapper> candidates, AgentTrac
this.managers.add(pickedManager);
} catch (IllegalStateException e) {
pickedManager = null;
} catch (ConnectionStreamException | ClientClosedException | StreamNotAvailableException e) {
// manager connection is dead or stream not available
// scheduling manager closing if necessary in another thread to avoid blocking this one
if (pickedManager.isEmpty()) {
ClientProducersManager manager = pickedManager;
this.environment.execute(
() -> {
manager.closeIfEmpty();
},
"Producer manager closing after timeout, producer %d on stream '%s'",
tracker.uniqueId(),
tracker.stream());
}
throw e;
} catch (RuntimeException e) {
if (pickedManager != null) {
if (shouldRefreshCandidates(e)) {
// manager connection is dead or stream not available
// scheduling manager closing if necessary in another thread to avoid blocking this one
if (pickedManager.isEmpty()) {
this.environment.execute(
pickedManager::closeIfEmpty,
"Producer manager closing after timeout, producer %d on stream '%s'",
tracker.uniqueId(),
tracker.stream());
}
} else {
pickedManager.closeIfEmpty();
}
throw e;
Expand Down Expand Up @@ -337,12 +334,14 @@ public String toString() {
managerBuilder.append(
m.producers.values().stream()
.map(
p -> {
StringBuilder producerBuilder = new StringBuilder("{");
producerBuilder.append(jsonField("stream", p.stream())).append(",");
producerBuilder.append(jsonField("producer_id", p.publisherId));
return producerBuilder.append("}").toString();
})
p ->
"{"
+ jsonField("stream", p.stream())
+ ","
+ jsonField("producer_id", p.publisherId)
+ ","
+ jsonField("state", p.producer.state())
+ "}")
.collect(Collectors.joining(",")));
managerBuilder.append("],");
managerBuilder.append("\"tracking_consumers\" : [");
Expand Down Expand Up @@ -832,33 +831,33 @@ private void recoverAgent(Broker node, List<BrokerWrapper> candidates, AgentTrac
tracker.stream());
}
reassignmentCompleted = true;
} catch (ConnectionStreamException
| ClientClosedException
| StreamNotAvailableException e) {
LOGGER.debug(
"{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
+ "refreshing candidate leader and retrying",
tracker.type(),
tracker.identifiable() ? tracker.id() : "N/A",
tracker.stream());
// maybe not a good candidate, let's refresh and retry for this one
Pair<Broker, List<BrokerWrapper>> brokerAndCandidates =
callAndMaybeRetry(
() -> {
List<BrokerWrapper> cs = findCandidateNodes(tracker.stream(), forceLeader);
return pair(pickBroker(cs), cs);
},
ex -> !(ex instanceof StreamDoesNotExistException),
environment.recoveryBackOffDelayPolicy(),
"Candidate lookup for %s on stream '%s'",
tracker.type(),
tracker.stream());
node = brokerAndCandidates.v1();
candidates = brokerAndCandidates.v2();
} catch (Exception e) {
LOGGER.warn(
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
reassignmentCompleted = true;
if (shouldRefreshCandidates(e)) {
LOGGER.debug(
"{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
+ "refreshing candidate leader and retrying",
tracker.type(),
tracker.identifiable() ? tracker.id() : "N/A",
tracker.stream());
// maybe not a good candidate, let's refresh and retry for this one
Pair<Broker, List<BrokerWrapper>> brokerAndCandidates =
callAndMaybeRetry(
() -> {
List<BrokerWrapper> cs = findCandidateNodes(tracker.stream(), forceLeader);
return pair(pickBroker(cs), cs);
},
ex -> !(ex instanceof StreamDoesNotExistException),
environment.recoveryBackOffDelayPolicy(),
"Candidate lookup for %s on stream '%s'",
tracker.type(),
tracker.stream());
node = brokerAndCandidates.v1();
candidates = brokerAndCandidates.v2();
} else {
LOGGER.warn(
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
reassignmentCompleted = true;
}
}
}
}
Expand Down Expand Up @@ -1022,13 +1021,6 @@ public int hashCode() {
private static final Predicate<Exception> RETRY_ON_TIMEOUT =
e -> e instanceof TimeoutStreamException;

private static class ClientClosedException extends StreamException {

public ClientClosedException() {
super("Client already closed");
}
}

static <T> int pickSlot(ConcurrentMap<Byte, T> map, T tracker, AtomicInteger sequence) {
int index = -1;
T previousValue = tracker;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public String toString() {
Client subscriptionClient = this.subscriptionClient;
Client trackingClient = this.trackingClient;
return "{ "
+ "\"id\" : "
+ "\"consumer_id\" : "
+ id
+ ","
+ "\"stream\" : \""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import com.rabbitmq.stream.oauth2.CredentialsManager;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.UsernamePasswordCredentialsProvider;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
Expand Down Expand Up @@ -99,7 +98,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamEnvironment implements Environment {
final class StreamEnvironment implements Environment {

private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class);

Expand Down Expand Up @@ -130,7 +129,6 @@ class StreamEnvironment implements Environment {
private final ObservationCollector<?> observationCollector;
private final Duration rpcTimeout;

@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
StreamEnvironment(
ScheduledExecutorService scheduledExecutorService,
Client.ClientParameters clientParametersPrototype,
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ public int hashCode() {
public String toString() {
Client client = this.client;
return "{ "
+ "\"id\" : "
+ "\"producer_id\" : "
+ id
+ ","
+ "\"stream\" : \""
Expand Down Expand Up @@ -677,4 +677,8 @@ private void executeInLock(Runnable action) {
this.unlock();
}
}

long id() {
return this.id;
}
}
4 changes: 4 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ static String jsonField(String name, Number value) {
return quote(name) + " : " + value.longValue();
}

static String jsonField(String name, Enum<?> value) {
return quote(name) + " : " + (value == null ? "null" : value.name());
}

static String jsonField(String name, String value) {
return quote(name) + " : " + quote(value);
}
Expand Down
Loading