diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index b0a582a6d6..c04cdb65ac 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -573,7 +573,11 @@ private Map peerProperties() { if (request.error() == null) { return request.response.get(); } else { - throw new StreamException("Error when establishing stream connection", request.error()); + if (request.error() instanceof StreamException) { + throw (StreamException) request.error(); + } else { + throw new StreamException("Error when establishing stream connection", request.error()); + } } } catch (StreamException e) { this.handleRpcError(correlationId, e); diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index c64d0db45f..07fdb59959 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -15,6 +15,7 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS; +import static com.rabbitmq.stream.impl.CoordinatorUtils.shouldRefreshCandidates; import static com.rabbitmq.stream.impl.Utils.AVAILABLE_PROCESSORS; import static com.rabbitmq.stream.impl.Utils.brokerFromClient; import static com.rabbitmq.stream.impl.Utils.convertCodeToException; @@ -38,7 +39,6 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamDoesNotExistException; import com.rabbitmq.stream.StreamException; -import com.rabbitmq.stream.StreamNotAvailableException; import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext; import com.rabbitmq.stream.impl.Client.Broker; @@ -51,6 +51,7 @@ import com.rabbitmq.stream.impl.Client.MetadataListener; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.Client.ShutdownListener; +import com.rabbitmq.stream.impl.CoordinatorUtils.ClientClosedException; import com.rabbitmq.stream.impl.Utils.BrokerWrapper; import com.rabbitmq.stream.impl.Utils.ClientConnectionType; import com.rabbitmq.stream.impl.Utils.ClientFactory; @@ -200,6 +201,7 @@ private void addToManager( .host(node.getHost()) .port(node.getPort()); ClientSubscriptionsManager pickedManager = null; + LOGGER.debug("Finding a manager for consumer {}", tracker.consumer.id()); while (pickedManager == null) { Iterator iterator = this.managers.iterator(); while (iterator.hasNext()) { @@ -225,27 +227,27 @@ private void addToManager( try { pickedManager.add(tracker, offsetSpecification, isInitialSubscription); LOGGER.debug( - "Assigned tracker {} to manager {} (node {}), subscription ID {}", + "Assigned tracker {} to manager {} (node {}), subscription ID {}, consumer {}", tracker.label(), pickedManager.id, pickedManager.name, - tracker.subscriptionIdInClient); + tracker.subscriptionIdInClient, + tracker.consumer.id()); this.managers.add(pickedManager); } catch (IllegalStateException e) { pickedManager = null; - } catch (ConnectionStreamException | ClientClosedException | StreamNotAvailableException e) { - // manager connection is dead or stream not available - // scheduling manager closing if necessary in another thread to avoid blocking this one - if (pickedManager.isEmpty()) { - ConsumersCoordinator.this.environment.execute( - pickedManager::closeIfEmpty, - "Consumer manager closing after timeout, consumer %d on stream '%s'", - tracker.consumer.id(), - tracker.stream); - } - throw e; } catch (RuntimeException e) { - if (pickedManager != null) { + if (shouldRefreshCandidates(e)) { + // manager connection is dead or stream not available + // scheduling manager closing if necessary in another thread to avoid blocking this one + if (pickedManager.isEmpty()) { + ConsumersCoordinator.this.environment.execute( + pickedManager::closeIfEmpty, + "Consumer manager closing after timeout, consumer %d on stream '%s'", + tracker.consumer.id(), + tracker.stream); + } + } else { pickedManager.closeIfEmpty(); } throw e; @@ -357,6 +359,10 @@ public void close() { public String toString() { StringBuilder builder = new StringBuilder("{"); builder.append(jsonField("client_count", this.managers.size())).append(", "); + builder + .append( + jsonField("consumer_count", this.managers.stream().mapToInt(m -> m.trackerCount).sum())) + .append(","); builder.append(quote("clients")).append(" : ["); builder.append( this.managers.stream() @@ -376,14 +382,16 @@ public String toString() { trackers.stream() .filter(Objects::nonNull) .map( - t -> { - StringBuilder trackerBuilder = new StringBuilder("{"); - trackerBuilder.append(jsonField("stream", t.stream)).append(","); - trackerBuilder.append(jsonField("id", t.id)).append(","); - trackerBuilder.append( - jsonField("subscription_id", t.subscriptionIdInClient)); - return trackerBuilder.append("}").toString(); - }) + t -> + "{" + + jsonField("stream", t.stream) + + "," + + jsonField("id", t.id) + + "," + + jsonField("subscription_id", t.subscriptionIdInClient) + + "," + + jsonField("state", t.consumer.state()) + + "}") .collect(Collectors.joining(","))); managerBuilder.append("]"); return managerBuilder.append("}").toString(); @@ -548,7 +556,7 @@ private void markRecovering() { String label() { return String.format( - "[id=%d, stream=%s, name=%s, consumer=%d]", + "[id %d, stream %s, name %s, consumer %d]", this.id, this.stream, this.offsetTrackingReference, this.consumer.id()); } } @@ -990,34 +998,36 @@ private void recoverSubscription(List candidates, SubscriptionTra tracker.stream); } reassignmentCompleted = true; - } catch (ConnectionStreamException - | ClientClosedException - | StreamNotAvailableException e) { - LOGGER.debug( - "Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, " - + "refreshing candidates and retrying", - tracker.consumer.id(), - tracker.stream); - // maybe not a good candidate, let's refresh and retry for this one - candidates = - Utils.callAndMaybeRetry( - findCandidateNodes(tracker.stream), - ex -> !(ex instanceof StreamDoesNotExistException), - recoveryBackOffDelayPolicy(), - "Candidate lookup to consume from '%s' (subscription recovery)", - tracker.stream); } catch (StreamException e) { - LOGGER.warn( - "Stream error while re-assigning subscription from stream {} (name {})", - tracker.stream, - tracker.offsetTrackingReference, - e); - if (e.getCode() == RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS) { - LOGGER.debug("Subscription ID already existing, retrying"); - } else { + if (shouldRefreshCandidates(e)) { LOGGER.debug( - "Not re-assigning consumer '{}' because of '{}'", tracker.label(), e.getMessage()); - reassignmentCompleted = true; + "Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, " + + "refreshing candidates and retrying", + tracker.consumer.id(), + tracker.stream); + // maybe not a good candidate, let's refresh and retry for this one + candidates = + Utils.callAndMaybeRetry( + findCandidateNodes(tracker.stream), + ex -> !(ex instanceof StreamDoesNotExistException), + recoveryBackOffDelayPolicy(), + "Candidate lookup to consume from '%s' (subscription recovery)", + tracker.stream); + } else { + LOGGER.warn( + "Stream error while re-assigning subscription from stream {} (name {})", + tracker.stream, + tracker.offsetTrackingReference, + e); + if (e.getCode() == RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS) { + LOGGER.debug("Subscription ID already existing, retrying"); + } else { + LOGGER.debug( + "Not re-assigning consumer '{}' because of '{}'", + tracker.label(), + e.getMessage()); + reassignmentCompleted = true; + } } } catch (Exception e) { LOGGER.warn( @@ -1039,21 +1049,19 @@ private void checkNotClosed() { } void add( - SubscriptionTracker subscriptionTracker, + SubscriptionTracker tracker, OffsetSpecification offsetSpecification, boolean isInitialSubscription) { this.subscriptionManagerLock.lock(); try { if (this.isFull()) { LOGGER.debug( - "Cannot add subscription tracker for stream '{}', manager is full", - subscriptionTracker.stream); + "Cannot add subscription tracker for stream '{}', manager is full", tracker.stream); throw new IllegalStateException("Cannot add subscription tracker, the manager is full"); } if (this.isClosed()) { LOGGER.debug( - "Cannot add subscription tracker for stream '{}', manager is closed", - subscriptionTracker.stream); + "Cannot add subscription tracker for stream '{}', manager is closed", tracker.stream); throw new IllegalStateException("Cannot add subscription tracker, the manager is closed"); } @@ -1066,32 +1074,32 @@ void add( LOGGER.debug( "Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}, " - + "subscription ID is {}", - subscriptionTracker.stream, + + "subscription ID is {}, consumer {}", + tracker.stream, offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification, - subscriptionTracker.offsetTrackingReference, - subscriptionTracker.subscriptionProperties, - subscriptionId); + tracker.offsetTrackingReference, + tracker.subscriptionProperties, + subscriptionId, + tracker.consumer.id()); try { // updating data structures before subscribing // (to make sure they are up-to-date in case message would arrive super fast) - subscriptionTracker.assign(subscriptionId, this); + tracker.assign(subscriptionId, this); streamToStreamSubscriptions - .computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet()) - .add(subscriptionTracker); - this.setSubscriptionTrackers( - update(previousSubscriptions, subscriptionId, subscriptionTracker)); + .computeIfAbsent(tracker.stream, s -> ConcurrentHashMap.newKeySet()) + .add(tracker); + this.setSubscriptionTrackers(update(previousSubscriptions, subscriptionId, tracker)); - String offsetTrackingReference = subscriptionTracker.offsetTrackingReference; + String offsetTrackingReference = tracker.offsetTrackingReference; if (offsetTrackingReference != null) { checkNotClosed(); QueryOffsetResponse queryOffsetResponse = Utils.callAndMaybeRetry( - () -> client.queryOffset(offsetTrackingReference, subscriptionTracker.stream), + () -> client.queryOffset(offsetTrackingReference, tracker.stream), RETRY_ON_TIMEOUT, "Offset query for consumer %s on stream '%s' (reference %s)", - subscriptionTracker.consumer.id(), - subscriptionTracker.stream, + tracker.consumer.id(), + tracker.stream, offsetTrackingReference); if (queryOffsetResponse.isOk() && queryOffsetResponse.getOffset() != 0) { if (offsetSpecification != null && isInitialSubscription) { @@ -1107,7 +1115,7 @@ void add( "Using offset {} to start consuming from {} with consumer {} " + "(instead of {})", queryOffsetResponse.getOffset(), - subscriptionTracker.stream, + tracker.stream, offsetTrackingReference, offsetSpecification); offsetSpecification = OffsetSpecification.offset(queryOffsetResponse.getOffset() + 1); @@ -1120,8 +1128,8 @@ void add( // TODO consider using/emulating ConsumerUpdateListener, to have only one API, not 2 // even when the consumer is not a SAC. SubscriptionContext subscriptionContext = - new DefaultSubscriptionContext(offsetSpecification, subscriptionTracker.stream); - subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext); + new DefaultSubscriptionContext(offsetSpecification, tracker.stream); + tracker.subscriptionListener.preSubscribe(subscriptionContext); LOGGER.info( "Computed offset specification {}, offset specification used after subscription listener {}", offsetSpecification, @@ -1133,18 +1141,18 @@ void add( () -> client.subscribe( subscriptionId, - subscriptionTracker.stream, + tracker.stream, subscriptionContext.offsetSpecification(), - subscriptionTracker.flowStrategy.initialCredits(), - subscriptionTracker.subscriptionProperties), + tracker.flowStrategy.initialCredits(), + tracker.subscriptionProperties), RETRY_ON_TIMEOUT, - "Subscribe request for consumer %s on stream '%s'", - subscriptionTracker.consumer.id(), - subscriptionTracker.stream); + "Subscribe request for consumer %d on stream '%s'", + tracker.consumer.id(), + tracker.stream); if (!subscribeResponse.isOk()) { String message = "Subscription to stream " - + subscriptionTracker.stream + + tracker.stream + " failed with code " + formatConstant(subscribeResponse.getResponseCode()); LOGGER.debug(message); @@ -1162,20 +1170,20 @@ void add( } } throw convertCodeToException( - subscribeResponse.getResponseCode(), subscriptionTracker.stream, () -> message); + subscribeResponse.getResponseCode(), tracker.stream, () -> message); } } catch (RuntimeException e) { - subscriptionTracker.assign((byte) -1, null); + tracker.assign((byte) -1, null); this.setSubscriptionTrackers(previousSubscriptions); streamToStreamSubscriptions - .computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet()) - .remove(subscriptionTracker); - maybeCleanStreamToStreamSubscriptions(subscriptionTracker.stream); + .computeIfAbsent(tracker.stream, s -> ConcurrentHashMap.newKeySet()) + .remove(tracker); + maybeCleanStreamToStreamSubscriptions(tracker.stream); throw e; } - subscriptionTracker.state(SubscriptionState.ACTIVE); - subscriptionTracker.markOpen(); - LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream); + tracker.state(SubscriptionState.ACTIVE); + tracker.markOpen(); + LOGGER.debug("Subscribed to '{}'", tracker.stream); } finally { this.subscriptionManagerLock.unlock(); } @@ -1378,13 +1386,6 @@ public String toString() { private static final Predicate RETRY_ON_TIMEOUT = e -> e instanceof TimeoutStreamException; - private static class ClientClosedException extends StreamException { - - public ClientClosedException() { - super("Client already closed"); - } - } - private static class DefaultConsumerFlowStrategyContext implements ConsumerFlowStrategy.Context { private final byte subscriptionId; diff --git a/src/main/java/com/rabbitmq/stream/impl/CoordinatorUtils.java b/src/main/java/com/rabbitmq/stream/impl/CoordinatorUtils.java new file mode 100644 index 0000000000..3611c0a266 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/CoordinatorUtils.java @@ -0,0 +1,43 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.StreamNotAvailableException; +import java.nio.channels.ClosedChannelException; +import java.util.function.Predicate; + +final class CoordinatorUtils { + + private static final Predicate REFRESH_CANDIDATES = + e -> + e instanceof ConnectionStreamException + || e instanceof ClientClosedException + || e instanceof StreamNotAvailableException + || e instanceof ClosedChannelException; + + private CoordinatorUtils() {} + + static boolean shouldRefreshCandidates(Throwable e) { + return REFRESH_CANDIDATES.test(e) || REFRESH_CANDIDATES.test(e.getCause()); + } + + static class ClientClosedException extends StreamException { + + public ClientClosedException() { + super("Client already closed"); + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 4ad28f4043..befd21bd4b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -14,6 +14,8 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.CoordinatorUtils.ClientClosedException; +import static com.rabbitmq.stream.impl.CoordinatorUtils.shouldRefreshCandidates; import static com.rabbitmq.stream.impl.Tuples.pair; import static com.rabbitmq.stream.impl.Utils.AVAILABLE_PROCESSORS; import static com.rabbitmq.stream.impl.Utils.callAndMaybeRetry; @@ -31,7 +33,6 @@ import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.StreamDoesNotExistException; import com.rabbitmq.stream.StreamException; -import com.rabbitmq.stream.StreamNotAvailableException; import com.rabbitmq.stream.impl.Client.Broker; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.Client.MetadataListener; @@ -193,22 +194,18 @@ private void addToManager(Broker node, List candidates, AgentTrac this.managers.add(pickedManager); } catch (IllegalStateException e) { pickedManager = null; - } catch (ConnectionStreamException | ClientClosedException | StreamNotAvailableException e) { - // manager connection is dead or stream not available - // scheduling manager closing if necessary in another thread to avoid blocking this one - if (pickedManager.isEmpty()) { - ClientProducersManager manager = pickedManager; - this.environment.execute( - () -> { - manager.closeIfEmpty(); - }, - "Producer manager closing after timeout, producer %d on stream '%s'", - tracker.uniqueId(), - tracker.stream()); - } - throw e; } catch (RuntimeException e) { - if (pickedManager != null) { + if (shouldRefreshCandidates(e)) { + // manager connection is dead or stream not available + // scheduling manager closing if necessary in another thread to avoid blocking this one + if (pickedManager.isEmpty()) { + this.environment.execute( + pickedManager::closeIfEmpty, + "Producer manager closing after timeout, producer %d on stream '%s'", + tracker.uniqueId(), + tracker.stream()); + } + } else { pickedManager.closeIfEmpty(); } throw e; @@ -337,12 +334,14 @@ public String toString() { managerBuilder.append( m.producers.values().stream() .map( - p -> { - StringBuilder producerBuilder = new StringBuilder("{"); - producerBuilder.append(jsonField("stream", p.stream())).append(","); - producerBuilder.append(jsonField("producer_id", p.publisherId)); - return producerBuilder.append("}").toString(); - }) + p -> + "{" + + jsonField("stream", p.stream()) + + "," + + jsonField("producer_id", p.publisherId) + + "," + + jsonField("state", p.producer.state()) + + "}") .collect(Collectors.joining(","))); managerBuilder.append("],"); managerBuilder.append("\"tracking_consumers\" : ["); @@ -832,33 +831,33 @@ private void recoverAgent(Broker node, List candidates, AgentTrac tracker.stream()); } reassignmentCompleted = true; - } catch (ConnectionStreamException - | ClientClosedException - | StreamNotAvailableException e) { - LOGGER.debug( - "{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, " - + "refreshing candidate leader and retrying", - tracker.type(), - tracker.identifiable() ? tracker.id() : "N/A", - tracker.stream()); - // maybe not a good candidate, let's refresh and retry for this one - Pair> brokerAndCandidates = - callAndMaybeRetry( - () -> { - List cs = findCandidateNodes(tracker.stream(), forceLeader); - return pair(pickBroker(cs), cs); - }, - ex -> !(ex instanceof StreamDoesNotExistException), - environment.recoveryBackOffDelayPolicy(), - "Candidate lookup for %s on stream '%s'", - tracker.type(), - tracker.stream()); - node = brokerAndCandidates.v1(); - candidates = brokerAndCandidates.v2(); } catch (Exception e) { - LOGGER.warn( - "Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e); - reassignmentCompleted = true; + if (shouldRefreshCandidates(e)) { + LOGGER.debug( + "{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, " + + "refreshing candidate leader and retrying", + tracker.type(), + tracker.identifiable() ? tracker.id() : "N/A", + tracker.stream()); + // maybe not a good candidate, let's refresh and retry for this one + Pair> brokerAndCandidates = + callAndMaybeRetry( + () -> { + List cs = findCandidateNodes(tracker.stream(), forceLeader); + return pair(pickBroker(cs), cs); + }, + ex -> !(ex instanceof StreamDoesNotExistException), + environment.recoveryBackOffDelayPolicy(), + "Candidate lookup for %s on stream '%s'", + tracker.type(), + tracker.stream()); + node = brokerAndCandidates.v1(); + candidates = brokerAndCandidates.v2(); + } else { + LOGGER.warn( + "Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e); + reassignmentCompleted = true; + } } } } @@ -1022,13 +1021,6 @@ public int hashCode() { private static final Predicate RETRY_ON_TIMEOUT = e -> e instanceof TimeoutStreamException; - private static class ClientClosedException extends StreamException { - - public ClientClosedException() { - super("Client already closed"); - } - } - static int pickSlot(ConcurrentMap map, T tracker, AtomicInteger sequence) { int index = -1; T previousValue = tracker; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 4d1e00a03c..2c457dd790 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -579,7 +579,7 @@ public String toString() { Client subscriptionClient = this.subscriptionClient; Client trackingClient = this.trackingClient; return "{ " - + "\"id\" : " + + "\"consumer_id\" : " + id + "," + "\"stream\" : \"" diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index d5f91124b8..2632151be5 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -61,7 +61,6 @@ import com.rabbitmq.stream.oauth2.CredentialsManager; import com.rabbitmq.stream.sasl.CredentialsProvider; import com.rabbitmq.stream.sasl.UsernamePasswordCredentialsProvider; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoopGroup; import io.netty.handler.ssl.SslContext; @@ -99,7 +98,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StreamEnvironment implements Environment { +final class StreamEnvironment implements Environment { private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class); @@ -130,7 +129,6 @@ class StreamEnvironment implements Environment { private final ObservationCollector observationCollector; private final Duration rpcTimeout; - @SuppressFBWarnings("CT_CONSTRUCTOR_THROW") StreamEnvironment( ScheduledExecutorService scheduledExecutorService, Client.ClientParameters clientParametersPrototype, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 22b5fd4a5d..57916bfdf3 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -608,7 +608,7 @@ public int hashCode() { public String toString() { Client client = this.client; return "{ " - + "\"id\" : " + + "\"producer_id\" : " + id + "," + "\"stream\" : \"" @@ -677,4 +677,8 @@ private void executeInLock(Runnable action) { this.unlock(); } } + + long id() { + return this.id; + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index bb252a7143..9824070022 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -599,6 +599,10 @@ static String jsonField(String name, Number value) { return quote(name) + " : " + value.longValue(); } + static String jsonField(String name, Enum value) { + return quote(name) + " : " + (value == null ? "null" : value.name()); + } + static String jsonField(String name, String value) { return quote(name) + " : " + quote(value); } diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 98b6ef742a..3315963881 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -40,6 +40,7 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; import com.rabbitmq.stream.ProducerBuilder; +import com.rabbitmq.stream.Resource; import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster; @@ -55,6 +56,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -190,13 +192,39 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru .maxProducersByConnection(producerCount / 4) .maxConsumersByConnection(consumerCount / 4) .build(); + String streamPrefix = "rec-" + new Random().nextInt(1000); List streams = - range(0, streamCount) - .mapToObj(i -> TestUtils.streamName(testInfo) + "-" + i) - .collect(toList()); - streams.forEach(s -> environment.streamCreator().stream(s).create()); + range(0, streamCount).mapToObj(i -> streamPrefix + "-" + i).collect(toList()); + streams.forEach( + s -> { + try { + environment.deleteStream(s); + } catch (Exception e) { + // ok + } + environment.streamCreator().stream(s).create(); + }); List producers = Collections.emptyList(); List consumers = Collections.emptyList(); + java.util.function.Consumer> producerInfo = + ps -> { + ps.forEach( + p -> + LOGGER.info( + "Producer {} to stream '{}', state {} (last exception: '{}')", + p.id(), + p.stream(), + p.state(), + p.lastException)); + }; + java.util.function.Consumer> consumerInfo = + cs -> { + cs.forEach( + c -> { + LOGGER.info( + "Consumer {} from stream '{}', state {}", c.id(), c.stream(), c.state()); + }); + }; try { producers = range(0, producerCount) @@ -224,30 +252,34 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru syncs = consumers.stream().map(c -> c.waitForNewMessages(100)).collect(toList()); syncs.forEach(s -> assertThat(s).completes()); + LOGGER.info("Producer information before cluster restart:"); + producerInfo.accept(producers); + + LOGGER.info("Consumer information before cluster restart:"); + consumerInfo.accept(consumers); + restartCluster(); Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); - List> streamsSyncs = - producers.stream() - .map(p -> pair(p.stream(), p.waitForNewMessages(1000))) - .collect(toList()); - streamsSyncs.forEach( + List> pSyncs = + producers.stream().map(p -> pair(p, p.waitForNewMessages(1000))).collect(toList()); + pSyncs.forEach( p -> { - LOGGER.info("Checking publisher to {} still publishes", p.v1()); + LOGGER.info( + "Checking publisher {} to {} still publishes", p.v1().id(), p.v1().stream()); assertThat(p.v2()).completes(ASSERTION_TIMEOUT); LOGGER.info("Publisher to {} still publishes", p.v1()); }); - streamsSyncs = - consumers.stream() - .map(c -> pair(c.stream(), c.waitForNewMessages(1000))) - .collect(toList()); - streamsSyncs.forEach( + List> cSyncs = + consumers.stream().map(c -> pair(c, c.waitForNewMessages(1000))).collect(toList()); + cSyncs.forEach( p -> { - LOGGER.info("Checking consumer from {} still consumes", p.v1()); + LOGGER.info( + "Checking consumer {} from {} still consumes", p.v1().id(), p.v1().stream()); assertThat(p.v2()).completes(ASSERTION_TIMEOUT); - LOGGER.info("Consumer from {} still consumes", p.v1()); + LOGGER.info("Consumer {} from {} still consumes", p.v1().id(), p.v1().stream()); }); Map committedChunkIdPerStream = new LinkedHashMap<>(streamCount); @@ -270,10 +302,10 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru System.out.println(TestUtils.jsonPrettyPrint(environment.toString())); LOGGER.info("Producer information:"); - producers.forEach( - p -> { - LOGGER.info("Producer to '{}' (last exception: '{}')", p.stream(), p.lastException); - }); + producerInfo.accept(producers); + + LOGGER.info("Consumer information:"); + consumerInfo.accept(consumers); LOGGER.info("Closing producers"); producers.forEach( @@ -510,6 +542,14 @@ String stream() { return this.stream; } + long id() { + return ((StreamProducer) this.producer).id(); + } + + Resource.State state() { + return ((StreamProducer) this.producer).state(); + } + String lastException() { if (this.lastException.get() == null) { return "no exception"; @@ -576,6 +616,14 @@ String stream() { return this.stream; } + long id() { + return ((StreamConsumer) this.consumer).id(); + } + + Resource.State state() { + return ((StreamConsumer) this.consumer).state(); + } + @Override public void close() { this.consumer.close();