Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Test against RabbitMQ 4.1 (PR)
name: Test against RabbitMQ stable (PR)

on:
pull_request:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Test against RabbitMQ 4.0
name: Test against RabbitMQ stable

on:
push:
Expand Down
2 changes: 1 addition & 1 deletion src/docs/asciidoc/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ This behavior is hardcoded but it is possible to cancel it thanks to a reply pos

The RPC client uses the following defaults:

* it creates and waits for replies on an auto-delete, exclusive queue if no reply-to queue is set.
* it uses https://www.rabbitmq.com/docs/direct-reply-to[direct reply-to] if available (RabbitMQ 4.2 or more) for replies if no reply-to queue is set (it falls back to an auto-delete, exclusive queue if direct reply-to is not available)
* it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix (`{UUID}-{sequence}`, e.g. `6f839461-6b19-47e1-80b3-6be10d899d85-42`).
The prefix is different for each `RpcClient` instance and the suffix is incremented by one for each request.
* it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the reply-to queue address (defined by the user or created automatically, see above).
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/rabbitmq/client/amqp/Publisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public interface Publisher extends AutoCloseable, Resource {
/**
* Create a message meant to be published by the publisher instance.
*
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should be
* not be modified or even reused.
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should not
* be modified or even reused.
*
* @return a message
*/
Expand All @@ -42,8 +42,8 @@ public interface Publisher extends AutoCloseable, Resource {
/**
* Create a message meant to be published by the publisher instance.
*
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should be
* not be modified or even reused.
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should not
* be modified or even reused.
*
* @param body message body
* @return a message with the provided body
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/rabbitmq/client/amqp/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface RpcClient extends AutoCloseable {
/**
* Create a message meant to be published by the underlying publisher instance.
*
* <p>Once published with the {@link #publish(Message)} the message instance should be not be
* <p>Once published with the {@link #publish(Message)} the message instance should not be
* modified or even reused.
*
* @return a message
Expand All @@ -39,7 +39,7 @@ public interface RpcClient extends AutoCloseable {
/**
* Create a message meant to be published by the underlying publisher instance.
*
* <p>Once published with the {@link #publish(Message)} the message instance should be not be
* <p>Once published with the {@link #publish(Message)} the message instance should not be
* modified or even reused.
*
* @param body message body
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ interface Handler {
/** Request processing context. */
interface Context {

/**
* Tell whether the requester is still able to receive the response.
*
* <p>The call assumes a reply-to queue address has been set on the request message and checks
* whether this queue still exists or not.
*
* <p>A time-consuming request handler can use this call from time to time to make sure it still
* worth keeping processing the request.
*
* @param request the incoming request
* @return true if the requester is still considered alive, false otherwise
*/
boolean isRequesterAlive(Message request);

/**
* Create a message meant to be published by the underlying publisher instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert;
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
import static com.rabbitmq.client.amqp.impl.Utils.supportDirectReplyTo;
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions;
Expand Down Expand Up @@ -118,7 +119,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final Lock instanceLock = new ReentrantLock();
private final boolean filterExpressionsSupported,
setTokenSupported,
sqlFilterExpressionsSupported;
sqlFilterExpressionsSupported,
directReplyToSupported;
private volatile ConsumerWorkService consumerWorkService;
private volatile Executor dispatchingExecutor;
private final boolean privateDispatchingExecutor;
Expand Down Expand Up @@ -216,6 +218,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
this.filterExpressionsSupported = supportFilterExpressions(brokerVersion);
this.setTokenSupported = supportSetToken(brokerVersion);
this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion);
this.directReplyToSupported = supportDirectReplyTo(brokerVersion);
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
this.state(OPEN);
this.environment.metricsCollector().openConnection();
Expand Down Expand Up @@ -868,6 +871,10 @@ boolean setTokenSupported() {
return this.setTokenSupported;
}

boolean directReplyToSupported() {
return this.directReplyToSupported;
}

long id() {
return this.id;
}
Expand Down
85 changes: 60 additions & 25 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
private final int initialCredits;
private final Long id;
private final String address;
private volatile String directReplyToAddress;
private final String queue;
private final Map<String, DescribedType> filters;
private final Map<String, Object> linkProperties;
Expand Down Expand Up @@ -96,10 +97,15 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
.connection()
.observationCollector()
.subscribe(builder.queue(), builder.messageHandler());
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
addressBuilder.queue(builder.queue());
this.address = addressBuilder.address();
this.queue = builder.queue();
if (builder.directReplyTo()) {
this.address = null;
this.queue = null;
} else {
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
addressBuilder.queue(builder.queue());
this.address = addressBuilder.address();
this.queue = builder.queue();
}
this.filters = Map.copyOf(builder.filters());
this.linkProperties = Map.copyOf(builder.properties());
this.subscriptionListener =
Expand All @@ -120,18 +126,19 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
this.consumerWorkService = connection.consumerWorkService();
this.consumerWorkService.register(this);
this.nativeReceiver =
this.createNativeReceiver(
createNativeReceiver(
this.sessionHandler.session(),
this.address,
this.linkProperties,
this.filters,
this.subscriptionListener,
this.nativeHandler,
this.nativeCloseHandler);
this.initStateFromNativeReceiver(this.nativeReceiver);
this.metricsCollector = this.connection.metricsCollector();
this.state(OPEN);
try {
this.directReplyToAddress = nativeReceiver.address();
this.initStateFromNativeReceiver(this.nativeReceiver);
this.metricsCollector = this.connection.metricsCollector();
this.state(OPEN);
this.nativeReceiver.addCredit(this.initialCredits);
} catch (ClientException e) {
AmqpException ex = ExceptionUtils.convert(e);
Expand Down Expand Up @@ -189,7 +196,7 @@ public void close() {

// internal API

private ClientReceiver createNativeReceiver(
private static ClientReceiver createNativeReceiver(
Session nativeSession,
String address,
Map<String, Object> properties,
Expand All @@ -201,24 +208,47 @@ private ClientReceiver createNativeReceiver(
filters = new LinkedHashMap<>(filters);
StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
subscriptionListener.preSubscribe(() -> streamOptions);
ReceiverOptions receiverOptions =
new ReceiverOptions()
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.autoAccept(false)
.autoSettle(false)
.handler(nativeHandler)
.closeHandler(closeHandler)
.creditWindow(0)
.properties(properties);
boolean directReplyTo = address == null;
ReceiverOptions receiverOptions = new ReceiverOptions();

if (directReplyTo) {
receiverOptions
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.autoAccept(true)
.autoSettle(true)
.sourceOptions()
.capabilities("rabbitmq:volatile-queue")
.expiryPolicy(ExpiryPolicy.LINK_CLOSE)
.durabilityMode(DurabilityMode.NONE);
} else {
receiverOptions
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.autoAccept(false)
.autoSettle(false);
}
receiverOptions
.handler(nativeHandler)
.closeHandler(closeHandler)
.creditWindow(0)
.properties(properties);
Map<String, Object> localSourceFilters = Collections.emptyMap();
if (!filters.isEmpty()) {
localSourceFilters = Map.copyOf(filters);
receiverOptions.sourceOptions().filters(localSourceFilters);
}
ClientReceiver receiver =
(ClientReceiver)
ExceptionUtils.wrapGet(
nativeSession.openReceiver(address, receiverOptions).openFuture());
ClientReceiver receiver;
if (directReplyTo) {
receiver =
(ClientReceiver)
ExceptionUtils.wrapGet(
nativeSession.openDynamicReceiver(receiverOptions).openFuture());
} else {
receiver =
(ClientReceiver)
ExceptionUtils.wrapGet(
nativeSession.openReceiver(address, receiverOptions).openFuture());
}

boolean filterOk = true;
if (!filters.isEmpty()) {
Map<String, String> remoteSourceFilters = receiver.source().filters();
Expand Down Expand Up @@ -298,10 +328,11 @@ void recoverAfterConnectionFailure() {
List.of(ofSeconds(1), ofSeconds(2), ofSeconds(3), BackOffDelayPolicy.TIMEOUT),
"Create AMQP receiver to address '%s'",
this.address);
this.initStateFromNativeReceiver(this.nativeReceiver);
this.pauseStatus.set(PauseStatus.UNPAUSED);
this.unsettledMessageCount.set(0);
try {
this.directReplyToAddress = this.nativeReceiver.address();
this.initStateFromNativeReceiver(this.nativeReceiver);
this.pauseStatus.set(PauseStatus.UNPAUSED);
this.unsettledMessageCount.set(0);
this.nativeReceiver.addCredit(this.initialCredits);
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
Expand Down Expand Up @@ -493,6 +524,10 @@ private void settle(
}
}

String directReplyToAddress() {
return this.directReplyToAddress;
}

@Override
public String toString() {
return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {

private final AmqpConnection connection;
private String queue;
private boolean directReplyTo = false;
private Consumer.MessageHandler messageHandler;
private int initialCredits = 100;
private final List<Resource.StateListener> listeners = new ArrayList<>();
Expand All @@ -48,6 +49,19 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
@Override
public ConsumerBuilder queue(String queue) {
this.queue = queue;
if (this.queue == null) {
this.directReplyTo = true;
} else {
this.directReplyTo = false;
}
return this;
}

ConsumerBuilder directReplyTo(boolean directReplyTo) {
this.directReplyTo = directReplyTo;
if (this.directReplyTo) {
this.queue = null;
}
return this;
}

Expand Down Expand Up @@ -102,6 +116,10 @@ String queue() {
return queue;
}

boolean directReplyTo() {
return this.directReplyTo;
}

Consumer.MessageHandler messageHandler() {
return messageHandler;
}
Expand All @@ -124,7 +142,7 @@ Map<String, DescribedType> filters() {

@Override
public Consumer build() {
if (this.queue == null || this.queue.isBlank()) {
if ((this.queue == null || this.queue.isBlank()) && !this.directReplyTo) {
throw new IllegalArgumentException("A queue must be specified");
}
if (this.messageHandler == null) {
Expand Down Expand Up @@ -442,7 +460,7 @@ public StreamFilterOptions propertySymbol(String key, String value) {

@Override
public StreamFilterOptions sql(String sql) {
if (!this.streamOptions.builder.connection.filterExpressionsSupported()) {
if (!this.streamOptions.builder.connection.sqlFilterExpressionsSupported()) {
throw new IllegalArgumentException(
"AMQP SQL filter expressions requires at least RabbitMQ 4.2.0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,11 +713,18 @@ private static class DefaultQueueInfo implements QueueInfo {

@SuppressWarnings("unchecked")
private DefaultQueueInfo(Map<String, Object> response) {
QueueType queueType;
this.name = (String) response.get("name");
this.durable = (Boolean) response.get("durable");
this.autoDelete = (Boolean) response.get("auto_delete");
this.exclusive = (Boolean) response.get("exclusive");
this.type = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
try {
queueType = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
} catch (Exception e) {
// this happens for reply-to queues, no need to make the type public
queueType = null;
}
this.type = queueType;
this.arguments = Map.copyOf((Map<String, Object>) response.get("arguments"));
this.leader = (String) response.get("leader");
String[] members = (String[]) response.get("replicas");
Expand Down
Loading