Skip to content

Commit cdaf625

Browse files
authored
Merge pull request #248 from rabbitmq/rpc-direct-reply-to
Support direct reply-to in RPC client support class
2 parents d6ace80 + b4ce6d4 commit cdaf625

File tree

18 files changed

+402
-82
lines changed

18 files changed

+402
-82
lines changed

.github/workflows/test-pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Test against RabbitMQ 4.1 (PR)
1+
name: Test against RabbitMQ stable (PR)
22

33
on:
44
pull_request:

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Test against RabbitMQ 4.0
1+
name: Test against RabbitMQ stable
22

33
on:
44
push:

src/docs/asciidoc/usage.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ This behavior is hardcoded but it is possible to cancel it thanks to a reply pos
235235

236236
The RPC client uses the following defaults:
237237

238-
* it creates and waits for replies on an auto-delete, exclusive queue if no reply-to queue is set.
238+
* it uses https://www.rabbitmq.com/docs/direct-reply-to[direct reply-to] if available (RabbitMQ 4.2 or more) for replies if no reply-to queue is set (it falls back to an auto-delete, exclusive queue if direct reply-to is not available)
239239
* it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix (`{UUID}-{sequence}`, e.g. `6f839461-6b19-47e1-80b3-6be10d899d85-42`).
240240
The prefix is different for each `RpcClient` instance and the suffix is incremented by one for each request.
241241
* it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the reply-to queue address (defined by the user or created automatically, see above).

src/main/java/com/rabbitmq/client/amqp/Publisher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public interface Publisher extends AutoCloseable, Resource {
3232
/**
3333
* Create a message meant to be published by the publisher instance.
3434
*
35-
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should be
36-
* not be modified or even reused.
35+
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should not
36+
* be modified or even reused.
3737
*
3838
* @return a message
3939
*/
@@ -42,8 +42,8 @@ public interface Publisher extends AutoCloseable, Resource {
4242
/**
4343
* Create a message meant to be published by the publisher instance.
4444
*
45-
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should be
46-
* not be modified or even reused.
45+
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should not
46+
* be modified or even reused.
4747
*
4848
* @param body message body
4949
* @return a message with the provided body

src/main/java/com/rabbitmq/client/amqp/RpcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public interface RpcClient extends AutoCloseable {
2929
/**
3030
* Create a message meant to be published by the underlying publisher instance.
3131
*
32-
* <p>Once published with the {@link #publish(Message)} the message instance should be not be
32+
* <p>Once published with the {@link #publish(Message)} the message instance should not be
3333
* modified or even reused.
3434
*
3535
* @return a message
@@ -39,7 +39,7 @@ public interface RpcClient extends AutoCloseable {
3939
/**
4040
* Create a message meant to be published by the underlying publisher instance.
4141
*
42-
* <p>Once published with the {@link #publish(Message)} the message instance should be not be
42+
* <p>Once published with the {@link #publish(Message)} the message instance should not be
4343
* modified or even reused.
4444
*
4545
* @param body message body

src/main/java/com/rabbitmq/client/amqp/RpcServer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ interface Handler {
4141
/** Request processing context. */
4242
interface Context {
4343

44+
/**
45+
* Tell whether the requester is still able to receive the response.
46+
*
47+
* <p>The call assumes a reply-to queue address has been set on the request message and checks
48+
* whether this queue still exists or not.
49+
*
50+
* <p>A time-consuming request handler can use this call from time to time to make sure it still
51+
* worth keeping processing the request.
52+
*
53+
* @param request the incoming request
54+
* @return true if the requester is still considered alive, false otherwise
55+
*/
56+
boolean isRequesterAlive(Message request);
57+
4458
/**
4559
* Create a message meant to be published by the underlying publisher instance.
4660
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
2525
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert;
2626
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
27+
import static com.rabbitmq.client.amqp.impl.Utils.supportDirectReplyTo;
2728
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
2829
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
2930
import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions;
@@ -118,7 +119,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
118119
private final Lock instanceLock = new ReentrantLock();
119120
private final boolean filterExpressionsSupported,
120121
setTokenSupported,
121-
sqlFilterExpressionsSupported;
122+
sqlFilterExpressionsSupported,
123+
directReplyToSupported;
122124
private volatile ConsumerWorkService consumerWorkService;
123125
private volatile Executor dispatchingExecutor;
124126
private final boolean privateDispatchingExecutor;
@@ -216,6 +218,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
216218
this.filterExpressionsSupported = supportFilterExpressions(brokerVersion);
217219
this.setTokenSupported = supportSetToken(brokerVersion);
218220
this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion);
221+
this.directReplyToSupported = supportDirectReplyTo(brokerVersion);
219222
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
220223
this.state(OPEN);
221224
this.environment.metricsCollector().openConnection();
@@ -868,6 +871,10 @@ boolean setTokenSupported() {
868871
return this.setTokenSupported;
869872
}
870873

874+
boolean directReplyToSupported() {
875+
return this.directReplyToSupported;
876+
}
877+
871878
long id() {
872879
return this.id;
873880
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
6565
private final int initialCredits;
6666
private final Long id;
6767
private final String address;
68+
private volatile String directReplyToAddress;
6869
private final String queue;
6970
private final Map<String, DescribedType> filters;
7071
private final Map<String, Object> linkProperties;
@@ -96,10 +97,15 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
9697
.connection()
9798
.observationCollector()
9899
.subscribe(builder.queue(), builder.messageHandler());
99-
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
100-
addressBuilder.queue(builder.queue());
101-
this.address = addressBuilder.address();
102-
this.queue = builder.queue();
100+
if (builder.directReplyTo()) {
101+
this.address = null;
102+
this.queue = null;
103+
} else {
104+
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
105+
addressBuilder.queue(builder.queue());
106+
this.address = addressBuilder.address();
107+
this.queue = builder.queue();
108+
}
103109
this.filters = Map.copyOf(builder.filters());
104110
this.linkProperties = Map.copyOf(builder.properties());
105111
this.subscriptionListener =
@@ -120,18 +126,19 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
120126
this.consumerWorkService = connection.consumerWorkService();
121127
this.consumerWorkService.register(this);
122128
this.nativeReceiver =
123-
this.createNativeReceiver(
129+
createNativeReceiver(
124130
this.sessionHandler.session(),
125131
this.address,
126132
this.linkProperties,
127133
this.filters,
128134
this.subscriptionListener,
129135
this.nativeHandler,
130136
this.nativeCloseHandler);
131-
this.initStateFromNativeReceiver(this.nativeReceiver);
132-
this.metricsCollector = this.connection.metricsCollector();
133-
this.state(OPEN);
134137
try {
138+
this.directReplyToAddress = nativeReceiver.address();
139+
this.initStateFromNativeReceiver(this.nativeReceiver);
140+
this.metricsCollector = this.connection.metricsCollector();
141+
this.state(OPEN);
135142
this.nativeReceiver.addCredit(this.initialCredits);
136143
} catch (ClientException e) {
137144
AmqpException ex = ExceptionUtils.convert(e);
@@ -189,7 +196,7 @@ public void close() {
189196

190197
// internal API
191198

192-
private ClientReceiver createNativeReceiver(
199+
private static ClientReceiver createNativeReceiver(
193200
Session nativeSession,
194201
String address,
195202
Map<String, Object> properties,
@@ -201,24 +208,47 @@ private ClientReceiver createNativeReceiver(
201208
filters = new LinkedHashMap<>(filters);
202209
StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
203210
subscriptionListener.preSubscribe(() -> streamOptions);
204-
ReceiverOptions receiverOptions =
205-
new ReceiverOptions()
206-
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
207-
.autoAccept(false)
208-
.autoSettle(false)
209-
.handler(nativeHandler)
210-
.closeHandler(closeHandler)
211-
.creditWindow(0)
212-
.properties(properties);
211+
boolean directReplyTo = address == null;
212+
ReceiverOptions receiverOptions = new ReceiverOptions();
213+
214+
if (directReplyTo) {
215+
receiverOptions
216+
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
217+
.autoAccept(true)
218+
.autoSettle(true)
219+
.sourceOptions()
220+
.capabilities("rabbitmq:volatile-queue")
221+
.expiryPolicy(ExpiryPolicy.LINK_CLOSE)
222+
.durabilityMode(DurabilityMode.NONE);
223+
} else {
224+
receiverOptions
225+
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
226+
.autoAccept(false)
227+
.autoSettle(false);
228+
}
229+
receiverOptions
230+
.handler(nativeHandler)
231+
.closeHandler(closeHandler)
232+
.creditWindow(0)
233+
.properties(properties);
213234
Map<String, Object> localSourceFilters = Collections.emptyMap();
214235
if (!filters.isEmpty()) {
215236
localSourceFilters = Map.copyOf(filters);
216237
receiverOptions.sourceOptions().filters(localSourceFilters);
217238
}
218-
ClientReceiver receiver =
219-
(ClientReceiver)
220-
ExceptionUtils.wrapGet(
221-
nativeSession.openReceiver(address, receiverOptions).openFuture());
239+
ClientReceiver receiver;
240+
if (directReplyTo) {
241+
receiver =
242+
(ClientReceiver)
243+
ExceptionUtils.wrapGet(
244+
nativeSession.openDynamicReceiver(receiverOptions).openFuture());
245+
} else {
246+
receiver =
247+
(ClientReceiver)
248+
ExceptionUtils.wrapGet(
249+
nativeSession.openReceiver(address, receiverOptions).openFuture());
250+
}
251+
222252
boolean filterOk = true;
223253
if (!filters.isEmpty()) {
224254
Map<String, String> remoteSourceFilters = receiver.source().filters();
@@ -298,10 +328,11 @@ void recoverAfterConnectionFailure() {
298328
List.of(ofSeconds(1), ofSeconds(2), ofSeconds(3), BackOffDelayPolicy.TIMEOUT),
299329
"Create AMQP receiver to address '%s'",
300330
this.address);
301-
this.initStateFromNativeReceiver(this.nativeReceiver);
302-
this.pauseStatus.set(PauseStatus.UNPAUSED);
303-
this.unsettledMessageCount.set(0);
304331
try {
332+
this.directReplyToAddress = this.nativeReceiver.address();
333+
this.initStateFromNativeReceiver(this.nativeReceiver);
334+
this.pauseStatus.set(PauseStatus.UNPAUSED);
335+
this.unsettledMessageCount.set(0);
305336
this.nativeReceiver.addCredit(this.initialCredits);
306337
} catch (ClientException e) {
307338
throw ExceptionUtils.convert(e);
@@ -493,6 +524,10 @@ private void settle(
493524
}
494525
}
495526

527+
String directReplyToAddress() {
528+
return this.directReplyToAddress;
529+
}
530+
496531
@Override
497532
public String toString() {
498533
return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}';

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3333

3434
private final AmqpConnection connection;
3535
private String queue;
36+
private boolean directReplyTo = false;
3637
private Consumer.MessageHandler messageHandler;
3738
private int initialCredits = 100;
3839
private final List<Resource.StateListener> listeners = new ArrayList<>();
@@ -48,6 +49,19 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
4849
@Override
4950
public ConsumerBuilder queue(String queue) {
5051
this.queue = queue;
52+
if (this.queue == null) {
53+
this.directReplyTo = true;
54+
} else {
55+
this.directReplyTo = false;
56+
}
57+
return this;
58+
}
59+
60+
ConsumerBuilder directReplyTo(boolean directReplyTo) {
61+
this.directReplyTo = directReplyTo;
62+
if (this.directReplyTo) {
63+
this.queue = null;
64+
}
5165
return this;
5266
}
5367

@@ -102,6 +116,10 @@ String queue() {
102116
return queue;
103117
}
104118

119+
boolean directReplyTo() {
120+
return this.directReplyTo;
121+
}
122+
105123
Consumer.MessageHandler messageHandler() {
106124
return messageHandler;
107125
}
@@ -124,7 +142,7 @@ Map<String, DescribedType> filters() {
124142

125143
@Override
126144
public Consumer build() {
127-
if (this.queue == null || this.queue.isBlank()) {
145+
if ((this.queue == null || this.queue.isBlank()) && !this.directReplyTo) {
128146
throw new IllegalArgumentException("A queue must be specified");
129147
}
130148
if (this.messageHandler == null) {
@@ -442,7 +460,7 @@ public StreamFilterOptions propertySymbol(String key, String value) {
442460

443461
@Override
444462
public StreamFilterOptions sql(String sql) {
445-
if (!this.streamOptions.builder.connection.filterExpressionsSupported()) {
463+
if (!this.streamOptions.builder.connection.sqlFilterExpressionsSupported()) {
446464
throw new IllegalArgumentException(
447465
"AMQP SQL filter expressions requires at least RabbitMQ 4.2.0");
448466
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,11 +713,18 @@ private static class DefaultQueueInfo implements QueueInfo {
713713

714714
@SuppressWarnings("unchecked")
715715
private DefaultQueueInfo(Map<String, Object> response) {
716+
QueueType queueType;
716717
this.name = (String) response.get("name");
717718
this.durable = (Boolean) response.get("durable");
718719
this.autoDelete = (Boolean) response.get("auto_delete");
719720
this.exclusive = (Boolean) response.get("exclusive");
720-
this.type = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
721+
try {
722+
queueType = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
723+
} catch (Exception e) {
724+
// this happens for reply-to queues, no need to make the type public
725+
queueType = null;
726+
}
727+
this.type = queueType;
721728
this.arguments = Map.copyOf((Map<String, Object>) response.get("arguments"));
722729
this.leader = (String) response.get("leader");
723730
String[] members = (String[]) response.get("replicas");

0 commit comments

Comments
 (0)