diff --git a/src/docs/asciidoc/usage.adoc b/src/docs/asciidoc/usage.adoc
index 57244f8b3..4ff4aae74 100644
--- a/src/docs/asciidoc/usage.adoc
+++ b/src/docs/asciidoc/usage.adoc
@@ -26,6 +26,26 @@ include::{test-examples}/Api.java[tag=connection-settings]
<1> Use the `guest` user by default
<2> Use the `admin` user for this connection
+=== Subscription Listener
+
+The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
+This callback is meant for stream consumers: it can be used to dynamically set the offset the consumer attaches to in the stream.
+It is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection).
+
+It is possible to use the callback to get the last processed offset from an external store.
+The following code snippet shows how this can be done (note the interaction with the external store is not detailed):
+
+.Using the subscription listener to attach to a stream
+[source,java,indent=0]
+--------
+include::{test-examples}/Api.java[tag=subscription-listener]
+--------
+<1> Set subscription listener
+<2> Get offset from external store
+<3> Set offset to use for the subscription
+<4> Get the message offset
+<5> Store the offset in the external store after processing
+
=== Metrics Collection
The library provides the {javadoc-url}/com/rabbitmq/client/amqp/metrics/MetricsCollector.html[`MetricsCollector`] abstraction to collect metrics.
diff --git a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java
index 006f5523d..64ab859b7 100644
--- a/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java
@@ -74,6 +74,17 @@ public interface ConsumerBuilder {
*/
StreamOptions stream();
+ /**
+ * Set a listener to customize the subscription before the consumer is created (or recovered).
+ *
+ *
This callback is available for stream consumers.
+ *
+ * @param subscriptionListener subscription listener
+ * @return this builder instance
+ * @see SubscriptionListener
+ */
+ ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
+
/**
* Build the consumer.
*
@@ -164,4 +175,41 @@ enum StreamOffsetSpecification {
/** Very end of the stream (new chunks). */
NEXT
}
+
+ /**
+ * Callback to modify a consumer subscription before the link creation.
+ *
+ *
This allows looking up the last processed offset for a stream consumer and attaching to this
+ * offset.
+ */
+ interface SubscriptionListener {
+
+ /**
+ * Pre-subscription callback.
+ *
+ *
It is called before the link is created but also every time it recovers, e.g. after a
+ * connection failure.
+ *
+ *
Configuration set with {@link Context#streamOptions()} overrides the one set with {@link
+ * ConsumerBuilder#stream()}.
+ *
+ * @param context subscription context
+ */
+ void preSubscribe(Context context);
+
+ /** Subscription context. */
+ interface Context {
+
+ /**
+ * Stream options, to set the offset to start consuming from.
+ *
+ *
Only the {@link StreamOptions} are accessible, the {@link StreamOptions#builder()}
+ * method returns null
+ *
+ * @return the stream options
+ * @see StreamOptions
+ */
+ ConsumerBuilder.StreamOptions streamOptions();
+ }
+ }
}
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
index 048dcce65..2e6a5935d 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
@@ -18,15 +18,16 @@
package com.rabbitmq.client.amqp.impl;
import static com.rabbitmq.client.amqp.Resource.State.*;
+import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
import static java.time.Duration.ofSeconds;
+import static java.util.Optional.ofNullable;
import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
import com.rabbitmq.client.amqp.Consumer;
+import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,6 +60,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
private final String queue;
private final Map filters;
private final Map linkProperties;
+ private final ConsumerBuilder.SubscriptionListener subscriptionListener;
private final AmqpConnection connection;
private final AtomicReference pauseStatus =
new AtomicReference<>(PauseStatus.UNPAUSED);
@@ -89,11 +91,17 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
this.queue = builder.queue();
this.filters = Map.copyOf(builder.filters());
this.linkProperties = Map.copyOf(builder.properties());
+ this.subscriptionListener =
+ ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER);
this.connection = builder.connection();
this.sessionHandler = this.connection.createSessionHandler();
this.nativeReceiver =
this.createNativeReceiver(
- this.sessionHandler.session(), this.address, this.linkProperties, this.filters);
+ this.sessionHandler.session(),
+ this.address,
+ this.linkProperties,
+ this.filters,
+ this.subscriptionListener);
this.initStateFromNativeReceiver(this.nativeReceiver);
this.metricsCollector = this.connection.metricsCollector();
this.startReceivingLoop();
@@ -153,8 +161,12 @@ private ClientReceiver createNativeReceiver(
Session nativeSession,
String address,
Map properties,
- Map filters) {
+ Map filters,
+ SubscriptionListener subscriptionListener) {
try {
+ filters = new LinkedHashMap<>(filters);
+ StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
+ subscriptionListener.preSubscribe(() -> streamOptions);
ReceiverOptions receiverOptions =
new ReceiverOptions()
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
@@ -221,7 +233,8 @@ void recoverAfterConnectionFailure() {
this.sessionHandler.sessionNoCheck(),
this.address,
this.linkProperties,
- this.filters),
+ this.filters,
+ this.subscriptionListener),
e -> {
boolean shouldRetry =
e instanceof AmqpException.AmqpResourceClosedException
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java
index 21a1e3d29..789fa3a96 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java
@@ -27,6 +27,8 @@
class AmqpConsumerBuilder implements ConsumerBuilder {
+ static SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = ctx -> {};
+
private final AmqpConnection connection;
private String queue;
private Consumer.MessageHandler messageHandler;
@@ -35,6 +37,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
private final Map filters = new LinkedHashMap<>();
private final Map properties = new LinkedHashMap<>();
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
+ private SubscriptionListener subscriptionListener = NO_OP_SUBSCRIPTION_LISTENER;
AmqpConsumerBuilder(AmqpConnection connection) {
this.connection = connection;
@@ -79,6 +82,16 @@ public StreamOptions stream() {
return this.streamOptions;
}
+ @Override
+ public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener) {
+ this.subscriptionListener = subscriptionListener;
+ return this;
+ }
+
+ SubscriptionListener subscriptionListener() {
+ return this.subscriptionListener;
+ }
+
AmqpConnection connection() {
return connection;
}
@@ -186,4 +199,8 @@ private void offsetSpecification(Object value) {
this.filters.put("rabbitmq:stream-offset-spec", value);
}
}
+
+ static StreamOptions streamOptions(Map filters) {
+ return new DefaultStreamOptions(null, filters);
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/amqp/docs/Api.java b/src/test/java/com/rabbitmq/client/amqp/docs/Api.java
index 2157e4409..fa522ebdb 100644
--- a/src/test/java/com/rabbitmq/client/amqp/docs/Api.java
+++ b/src/test/java/com/rabbitmq/client/amqp/docs/Api.java
@@ -26,29 +26,8 @@
import io.micrometer.prometheusmetrics.PrometheusConfig;
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-
-import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
-import static com.rabbitmq.client.amqp.Publisher.Status.ACCEPTED;
-
class Api {
- void environment() {
- // tag::environment-creation[]
- Environment environment = new AmqpEnvironmentBuilder()
- .build();
- // end::environment-creation[]
- }
-
- void connection() {
- Environment environment = null;
- // tag::connection-creation[]
- Connection connection = environment.connectionBuilder()
- .build();
- // end::connection-creation[]
- }
-
void connectionSettings() {
// tag::connection-settings[]
Environment environment = new AmqpEnvironmentBuilder()
@@ -62,290 +41,31 @@ void connectionSettings() {
// end::connection-settings[]
}
-
- void publishing() {
- Connection connection = null;
- // tag::publisher-creation[]
- Publisher publisher = connection.publisherBuilder()
- .exchange("foo").key("bar")
- .build();
- // end::publisher-creation[]
-
-
- // tag::message-creation[]
- Message message = publisher
- .message("hello".getBytes(StandardCharsets.UTF_8))
- .messageId(1L);
- // end::message-creation[]
-
- // tag::message-publishing[]
- publisher.publish(message, context -> {
- if (context.status() == ACCEPTED) {
- // the broker accepted (confirmed) the message
- } else {
- // deal with possible failure
- }
- });
- // end::message-publishing[]
- }
-
- void targetAddressFormatExchangeKey() {
+ void subscriptionListener() {
Connection connection = null;
- // tag::target-address-exchange-key[]
- Publisher publisher = connection.publisherBuilder()
- .exchange("foo").key("bar") // <1>
- .build();
- // end::target-address-exchange-key[]
- }
-
- void targetAddressFormatExchange() {
- Connection connection = null;
- // tag::target-address-exchange[]
- Publisher publisher = connection.publisherBuilder()
- .exchange("foo") // <1>
- .build();
- // end::target-address-exchange[]
- }
-
- void targetAddressFormatQueue() {
- Connection connection = null;
- // tag::target-address-queue[]
- Publisher publisher = connection.publisherBuilder()
- .queue("some-queue") // <1>
- .build();
- // end::target-address-queue[]
- }
-
- void targetAddressNull() {
- Connection connection = null;
- // tag::target-address-null[]
- Publisher publisher = connection.publisherBuilder()
- .build(); // <1>
-
- Message message1 = publisher.message()
- .toAddress().exchange("foo").key("bar") // <2>
- .message();
-
- Message message2 = publisher.message()
- .toAddress().exchange("foo") // <3>
- .message();
-
- Message message3 = publisher.message()
- .toAddress().queue("my-queue") // <4>
- .message();
- // end::target-address-null[]
- }
-
- void consuming() {
- Connection connection = null;
- // tag::consumer-consume[]
- Consumer consumer = connection.consumerBuilder()
- .queue("some-queue")
- .messageHandler((context, message) -> {
- byte[] body = message.body(); // <1>
- // ... <2>
- context.accept(); // <3>
- })
- .build();
- // end::consumer-consume[]
-
- // tag::consumer-graceful-shutdown[]
- consumer.pause(); // <1>
- long unsettledMessageCount = consumer.unsettledMessageCount(); // <2>
- consumer.close(); // <3>
- // end::consumer-graceful-shutdown[]
-
- // tag::consumer-abrupt-shutdown[]
- consumer.close(); // <1>
- // end::consumer-abrupt-shutdown[]
- }
-
- void consumingStream() {
- Connection connection = null;
- // tag::consumer-consume-stream[]
- Consumer consumer = connection.consumerBuilder()
- .queue("some-stream")
- .stream() // <1>
- .offset(ConsumerBuilder.StreamOffsetSpecification.FIRST) // <2>
- .builder() // <3>
- .messageHandler((context, message) -> {
- // message processing
- })
- .build();
- // end::consumer-consume-stream[]
- }
-
- void consumingStreamFiltering() {
- Connection connection = null;
- // tag::consumer-consume-stream-filtering[]
- Consumer consumer = connection.consumerBuilder()
+ // tag::subscription-listener[]
+ connection.consumerBuilder()
.queue("some-stream")
- .stream() // <1>
- .filterValues("invoices", "orders") // <2>
- .filterMatchUnfiltered(true) // <3>
- .builder() // <4>
- .messageHandler((context, message) -> {
- // message processing
- })
- .build();
- // end::consumer-consume-stream-filtering[]
- }
-
- void management() {
- Connection connection = null;
- // tag::management[]
- Management management = connection.management();
- // end::management[]
- }
-
- void exchanges() {
- Management management = null;
- // tag::fanout-exchange[]
- management.exchange()
- .name("my-exchange")
- .type(FANOUT)
- .declare();
- // end::fanout-exchange[]
-
- // tag::delayed-message-exchange[]
- management.exchange()
- .name("my-exchange")
- .type("x-delayed-message")
- .autoDelete(false)
- .argument("x-delayed-type", "direct")
- .declare();
- // end::delayed-message-exchange[]
-
- // tag::delete-exchange[]
- management.exchangeDeletion().delete("my-exchange");
- // end::delete-exchange[]
- }
-
- void queues() {
- Management management = null;
- // tag::queue-creation[]
- management.queue()
- .name("my-queue")
- .exclusive(true)
- .autoDelete(false)
- .declare();
- // end::queue-creation[]
-
- // tag::queue-creation-with-arguments[]
- management
- .queue()
- .name("my-queue")
- .messageTtl(Duration.ofMinutes(10)) // <1>
- .maxLengthBytes(ByteCapacity.MB(100)) // <1>
- .declare();
- // end::queue-creation-with-arguments[]
-
- // tag::quorum-queue-creation[]
- management
- .queue()
- .name("my-quorum-queue")
- .quorum() // <1>
- .quorumInitialGroupSize(3)
- .deliveryLimit(3)
- .queue()
- .declare();
- // end::quorum-queue-creation[]
-
- // tag::queue-deletion[]
- management.queueDeletion().delete("my-queue");
- // end::queue-deletion[]
- }
-
- void bindings() {
- Management management = null;
- // tag::binding[]
- management.binding()
- .sourceExchange("my-exchange")
- .destinationQueue("my-queue")
- .key("foo")
- .bind();
- // end::binding[]
-
- // tag::exchange-binding[]
- management.binding()
- .sourceExchange("my-exchange")
- .destinationExchange("my-other-exchange")
- .key("foo")
- .bind();
- // end::exchange-binding[]
-
- // tag::unbinding[]
- management.unbind()
- .sourceExchange("my-exchange")
- .destinationQueue("my-queue")
- .key("foo")
- .unbind();
- // end::unbinding[]
- }
-
- void listeners() {
- Environment environment = null;
- // tag::listener-connection[]
- Connection connection = environment.connectionBuilder()
- .listeners(context -> { // <1>
- context.previousState(); // <2>
- context.currentState(); // <3>
- context.failureCause(); // <4>
- context.resource(); // <5>
- }).build();
- // end::listener-connection[]
-
- // tag::listener-publisher[]
- Publisher publisher = connection.publisherBuilder()
- .listeners(context -> { // <1>
- // ...
+ .subscriptionListener(ctx -> { // <1>
+ long offset = getOffsetFromExternalStore(); // <2>
+ ctx.streamOptions().offset(offset + 1); // <3>
})
- .exchange("foo").key("bar")
- .build();
- // end::listener-publisher[]
+ .messageHandler((ctx, msg) -> {
+ // message handling code...
- // tag::listener-consumer[]
- Consumer consumer = connection.consumerBuilder()
- .listeners(context -> { // <1>
- // ...
+ long offset = (long) msg.annotation("x-stream-offset"); // <4>
+ storeOffsetInExternalStore(offset); // <5>
})
- .queue("my-queue")
.build();
- // end::listener-consumer[]
+ // end::subscription-listener[]
}
- void connectionRecoveryBackOff() {
- Environment environment = null;
- // tag::connection-recovery-back-off[]
- Connection connection = environment.connectionBuilder()
- .recovery() // <1>
- .backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2))) // <2>
- .connectionBuilder().build();
- // end::connection-recovery-back-off[]
+ long getOffsetFromExternalStore() {
+ return 0L;
}
- void connectionRecoveryNoTopologyRecovery() {
- Environment environment = null;
- // tag::connection-recovery-no-topology-recovery[]
- Connection connection = environment.connectionBuilder()
- .recovery()
- .topology(false) // <1>
- .connectionBuilder()
- .listeners(context -> {
- // <2>
- })
- .build();
- // end::connection-recovery-no-topology-recovery[]
- }
+ void storeOffsetInExternalStore(long offset) {
- void connectionRecoveryDeactivate() {
- Environment environment = null;
- // tag::connection-recovery-deactivate[]
- Connection connection = environment.connectionBuilder()
- .recovery()
- .activated(false) // <1>
- .connectionBuilder().build();
- // end::connection-recovery-deactivate[]
}
void metricsCollectorMicrometerPrometheus() {
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConsumerTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConsumerTest.java
new file mode 100644
index 000000000..b162d20b8
--- /dev/null
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConsumerTest.java
@@ -0,0 +1,169 @@
+// Copyright (c) 2024 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.client.amqp.impl;
+
+import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
+import static com.rabbitmq.client.amqp.Resource.State.OPEN;
+import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
+import static com.rabbitmq.client.amqp.impl.Assertions.*;
+import static com.rabbitmq.client.amqp.impl.Cli.closeConnection;
+import static com.rabbitmq.client.amqp.impl.TestUtils.name;
+import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
+import static org.assertj.core.api.Assertions.*;
+
+import com.rabbitmq.client.amqp.*;
+import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(AmqpTestInfrastructureExtension.class)
+public class AmqpConsumerTest {
+
+ BackOffDelayPolicy backOffDelayPolicy = BackOffDelayPolicy.fixed(Duration.ofMillis(100));
+ Environment environment;
+ Connection connection;
+ String q;
+ String connectionName;
+
+ @BeforeEach
+ void init(TestInfo info) {
+ this.q = name(info);
+ connection.management().queue(this.q).type(STREAM).declare();
+ this.connectionName = ((AmqpConnection) connection).name();
+ }
+
+ @AfterEach
+ void tearDown() {
+ connection.management().queueDeletion().delete(this.q);
+ }
+
+ @Test
+ void subscriptionListenerShouldBeCalledOnRecovery() {
+ Sync subscriptionSync = sync();
+ Sync recoveredSync = sync();
+ connection
+ .consumerBuilder()
+ .queue(this.q)
+ .subscriptionListener(ctx -> subscriptionSync.down())
+ .listeners(recoveredListener(recoveredSync))
+ .messageHandler((ctx, msg) -> {})
+ .build();
+
+ assertThat(subscriptionSync).completes();
+ assertThat(recoveredSync).hasNotCompleted();
+ sync().reset();
+ closeConnection(this.connectionName);
+ assertThat(recoveredSync).completes();
+ assertThat(subscriptionSync).completes();
+ }
+
+ @Test
+ void streamConsumerRestartsWhereItLeftOff() {
+ Connection publisherConnection = environment.connectionBuilder().build();
+ Publisher publisher = publisherConnection.publisherBuilder().queue(this.q).build();
+ int messageCount = 100;
+ Runnable publish =
+ () -> {
+ Sync publishSync = sync(messageCount);
+ Publisher.Callback callback = ctx -> publishSync.down();
+ IntStream.range(0, messageCount)
+ .forEach(
+ ignored -> {
+ publisher.publish(publisher.message(), callback);
+ });
+ assertThat(publishSync).completes();
+ };
+
+ publish.run();
+
+ Sync consumeSync = sync(messageCount);
+ AtomicLong lastOffsetProcessed = new AtomicLong(-1);
+ AtomicInteger consumedMessageCount = new AtomicInteger(0);
+ AtomicInteger subscriptionListenerCallCount = new AtomicInteger(0);
+ Sync recoveredSync = sync();
+ ConsumerBuilder.SubscriptionListener subscriptionListener =
+ ctx -> {
+ subscriptionListenerCallCount.incrementAndGet();
+ ctx.streamOptions().offset(lastOffsetProcessed.get() + 1);
+ };
+ Consumer.MessageHandler messageHandler =
+ (ctx, msg) -> {
+ long offset = (long) msg.annotation("x-stream-offset");
+ ctx.accept();
+ lastOffsetProcessed.set(offset);
+ consumedMessageCount.incrementAndGet();
+ consumeSync.down();
+ };
+ Consumer consumer =
+ connection
+ .consumerBuilder()
+ .listeners(recoveredListener(recoveredSync))
+ .queue(this.q)
+ .subscriptionListener(subscriptionListener)
+ .messageHandler(messageHandler)
+ .build();
+
+ assertThat(subscriptionListenerCallCount).hasValue(1);
+ assertThat(consumeSync).completes();
+
+ closeConnection(this.connectionName);
+ assertThat(recoveredSync).completes();
+ assertThat(subscriptionListenerCallCount).hasValue(2);
+ assertThat(consumedMessageCount).hasValue(messageCount);
+
+ long offsetAfterRecovery = lastOffsetProcessed.get();
+ consumeSync.reset(messageCount);
+ publish.run();
+ assertThat(consumeSync).completes();
+ assertThat(consumedMessageCount).hasValue(messageCount * 2);
+ assertThat(lastOffsetProcessed).hasValueGreaterThan(offsetAfterRecovery);
+
+ consumer.close();
+
+ long offsetAfterClosing = lastOffsetProcessed.get();
+ consumeSync.reset(messageCount);
+ publish.run();
+
+ connection
+ .consumerBuilder()
+ .queue(this.q)
+ .subscriptionListener(subscriptionListener)
+ .messageHandler(messageHandler)
+ .build();
+
+ assertThat(subscriptionListenerCallCount).hasValue(3);
+ assertThat(consumeSync).completes();
+ assertThat(consumedMessageCount).hasValue(messageCount * 3);
+ assertThat(lastOffsetProcessed).hasValueGreaterThan(offsetAfterClosing);
+ }
+
+ private static Resource.StateListener recoveredListener(Sync sync) {
+ return context -> {
+ if (context.previousState() == RECOVERING && context.currentState() == OPEN) {
+ sync.down();
+ }
+ };
+ }
+}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java
index 06e9ed76f..ad57a0601 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTestInfrastructureExtension.java
@@ -17,6 +17,9 @@
// info@rabbitmq.com.
package com.rabbitmq.client.amqp.impl;
+import static com.rabbitmq.client.amqp.impl.TestUtils.name;
+
+import com.rabbitmq.client.amqp.BackOffDelayPolicy;
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Environment;
import java.lang.reflect.Field;
@@ -60,9 +63,18 @@ public void beforeEach(ExtensionContext context) throws Exception {
Field connectionField = field(context.getTestClass().get(), "connection");
if (connectionField != null) {
+ AmqpConnectionBuilder connectionBuilder = (AmqpConnectionBuilder) env.connectionBuilder();
+ Field backOffDelayPolicyField = field(context.getTestClass().get(), "backOffDelayPolicy");
+ if (backOffDelayPolicyField != null) {
+ backOffDelayPolicyField.setAccessible(true);
+ BackOffDelayPolicy backOffDelayPolicy =
+ (BackOffDelayPolicy) backOffDelayPolicyField.get(context.getTestInstance().get());
+ if (backOffDelayPolicy != null) {
+ connectionBuilder.recovery().backOffDelayPolicy(backOffDelayPolicy);
+ }
+ }
+ Connection connection = connectionBuilder.name(name(context)).build();
connectionField.setAccessible(true);
- Connection connection =
- ((AmqpConnectionBuilder) env.connectionBuilder()).name(TestUtils.name(context)).build();
connectionField.set(context.getTestInstance().get(), connection);
store(context).put("connection", connection);
}
@@ -77,7 +89,7 @@ public void afterEach(ExtensionContext context) {
}
@Override
- public void afterAll(ExtensionContext context) throws Exception {
+ public void afterAll(ExtensionContext context) {
Environment env = store(context).get("environment", Environment.class);
if (env != null) {
env.close();
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
index 899c81d7f..406a52f04 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
@@ -99,6 +99,13 @@ SyncAssert completes(Duration timeout) {
}
return this;
}
+
+ SyncAssert hasNotCompleted() {
+ if (actual.hasCompleted()) {
+ fail("Sync '%s' should not have completed", this.actual.toString());
+ }
+ return this;
+ }
}
static class QueueInfoAssert extends AbstractObjectAssert {
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java
index 9674b8c23..7f29caf86 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java
@@ -525,6 +525,10 @@ void reset() {
this.reset(1);
}
+ boolean hasCompleted() {
+ return this.latch.get().getCount() == 0;
+ }
+
@Override
public String toString() {
return this.description;