diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 90a6557958..37b051755b 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -1,4 +1,4 @@ -name: Test against RabbitMQ 4.1 (PR) +name: Test against RabbitMQ stable (PR) on: pull_request: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 95efcb456c..676be345c3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,4 +1,4 @@ -name: Test against RabbitMQ 4.0 +name: Test against RabbitMQ stable on: push: diff --git a/src/docs/asciidoc/usage.adoc b/src/docs/asciidoc/usage.adoc index b1f46b9a8c..0a012e4b51 100644 --- a/src/docs/asciidoc/usage.adoc +++ b/src/docs/asciidoc/usage.adoc @@ -235,7 +235,7 @@ This behavior is hardcoded but it is possible to cancel it thanks to a reply pos The RPC client uses the following defaults: -* it creates and waits for replies on an auto-delete, exclusive queue if no reply-to queue is set. +* it uses https://www.rabbitmq.com/docs/direct-reply-to[direct reply-to] if available (RabbitMQ 4.2 or more) for replies if no reply-to queue is set (it falls back to an auto-delete, exclusive queue if direct reply-to is not available) * it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix (`{UUID}-{sequence}`, e.g. `6f839461-6b19-47e1-80b3-6be10d899d85-42`). The prefix is different for each `RpcClient` instance and the suffix is incremented by one for each request. * it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the reply-to queue address (defined by the user or created automatically, see above). diff --git a/src/main/java/com/rabbitmq/client/amqp/Publisher.java b/src/main/java/com/rabbitmq/client/amqp/Publisher.java index f36d859d17..1808956f95 100644 --- a/src/main/java/com/rabbitmq/client/amqp/Publisher.java +++ b/src/main/java/com/rabbitmq/client/amqp/Publisher.java @@ -32,8 +32,8 @@ public interface Publisher extends AutoCloseable, Resource { /** * Create a message meant to be published by the publisher instance. * - *

Once published with the {@link #publish(Message, Callback)} the message instance should be - * not be modified or even reused. + *

Once published with the {@link #publish(Message, Callback)} the message instance should not + * be modified or even reused. * * @return a message */ @@ -42,8 +42,8 @@ public interface Publisher extends AutoCloseable, Resource { /** * Create a message meant to be published by the publisher instance. * - *

Once published with the {@link #publish(Message, Callback)} the message instance should be - * not be modified or even reused. + *

Once published with the {@link #publish(Message, Callback)} the message instance should not + * be modified or even reused. * * @param body message body * @return a message with the provided body diff --git a/src/main/java/com/rabbitmq/client/amqp/RpcClient.java b/src/main/java/com/rabbitmq/client/amqp/RpcClient.java index b043d46299..bd30f89301 100644 --- a/src/main/java/com/rabbitmq/client/amqp/RpcClient.java +++ b/src/main/java/com/rabbitmq/client/amqp/RpcClient.java @@ -29,7 +29,7 @@ public interface RpcClient extends AutoCloseable { /** * Create a message meant to be published by the underlying publisher instance. * - *

Once published with the {@link #publish(Message)} the message instance should be not be + *

Once published with the {@link #publish(Message)} the message instance should not be * modified or even reused. * * @return a message @@ -39,7 +39,7 @@ public interface RpcClient extends AutoCloseable { /** * Create a message meant to be published by the underlying publisher instance. * - *

Once published with the {@link #publish(Message)} the message instance should be not be + *

Once published with the {@link #publish(Message)} the message instance should not be * modified or even reused. * * @param body message body diff --git a/src/main/java/com/rabbitmq/client/amqp/RpcServer.java b/src/main/java/com/rabbitmq/client/amqp/RpcServer.java index 9637aca28a..773e2f0012 100644 --- a/src/main/java/com/rabbitmq/client/amqp/RpcServer.java +++ b/src/main/java/com/rabbitmq/client/amqp/RpcServer.java @@ -41,6 +41,20 @@ interface Handler { /** Request processing context. */ interface Context { + /** + * Tell whether the requester is still able to receive the response. + * + *

The call assumes a reply-to queue address has been set on the request message and checks + * whether this queue still exists or not. + * + *

A time-consuming request handler can use this call from time to time to make sure it still + * worth keeping processing the request. + * + * @param request the incoming request + * @return true if the requester is still considered alive, false otherwise + */ + boolean isRequesterAlive(Message request); + /** * Create a message meant to be published by the underlying publisher instance. * diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index 1ab4d99636..d9a77108b4 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -24,6 +24,7 @@ import static com.rabbitmq.client.amqp.Resource.State.RECOVERING; import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert; import static com.rabbitmq.client.amqp.impl.Tuples.pair; +import static com.rabbitmq.client.amqp.impl.Utils.supportDirectReplyTo; import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions; import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken; import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions; @@ -118,7 +119,8 @@ final class AmqpConnection extends ResourceBase implements Connection { private final Lock instanceLock = new ReentrantLock(); private final boolean filterExpressionsSupported, setTokenSupported, - sqlFilterExpressionsSupported; + sqlFilterExpressionsSupported, + directReplyToSupported; private volatile ConsumerWorkService consumerWorkService; private volatile Executor dispatchingExecutor; private final boolean privateDispatchingExecutor; @@ -216,6 +218,7 @@ final class AmqpConnection extends ResourceBase implements Connection { this.filterExpressionsSupported = supportFilterExpressions(brokerVersion); this.setTokenSupported = supportSetToken(brokerVersion); this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion); + this.directReplyToSupported = supportDirectReplyTo(brokerVersion); LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename()); this.state(OPEN); this.environment.metricsCollector().openConnection(); @@ -868,6 +871,10 @@ boolean setTokenSupported() { return this.setTokenSupported; } + boolean directReplyToSupported() { + return this.directReplyToSupported; + } + long id() { return this.id; } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 60936c50c1..f10358baa0 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -65,6 +65,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer { private final int initialCredits; private final Long id; private final String address; + private volatile String directReplyToAddress; private final String queue; private final Map filters; private final Map linkProperties; @@ -96,10 +97,15 @@ final class AmqpConsumer extends ResourceBase implements Consumer { .connection() .observationCollector() .subscribe(builder.queue(), builder.messageHandler()); - DefaultAddressBuilder addressBuilder = Utils.addressBuilder(); - addressBuilder.queue(builder.queue()); - this.address = addressBuilder.address(); - this.queue = builder.queue(); + if (builder.directReplyTo()) { + this.address = null; + this.queue = null; + } else { + DefaultAddressBuilder addressBuilder = Utils.addressBuilder(); + addressBuilder.queue(builder.queue()); + this.address = addressBuilder.address(); + this.queue = builder.queue(); + } this.filters = Map.copyOf(builder.filters()); this.linkProperties = Map.copyOf(builder.properties()); this.subscriptionListener = @@ -120,7 +126,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer { this.consumerWorkService = connection.consumerWorkService(); this.consumerWorkService.register(this); this.nativeReceiver = - this.createNativeReceiver( + createNativeReceiver( this.sessionHandler.session(), this.address, this.linkProperties, @@ -128,10 +134,11 @@ final class AmqpConsumer extends ResourceBase implements Consumer { this.subscriptionListener, this.nativeHandler, this.nativeCloseHandler); - this.initStateFromNativeReceiver(this.nativeReceiver); - this.metricsCollector = this.connection.metricsCollector(); - this.state(OPEN); try { + this.directReplyToAddress = nativeReceiver.address(); + this.initStateFromNativeReceiver(this.nativeReceiver); + this.metricsCollector = this.connection.metricsCollector(); + this.state(OPEN); this.nativeReceiver.addCredit(this.initialCredits); } catch (ClientException e) { AmqpException ex = ExceptionUtils.convert(e); @@ -189,7 +196,7 @@ public void close() { // internal API - private ClientReceiver createNativeReceiver( + private static ClientReceiver createNativeReceiver( Session nativeSession, String address, Map properties, @@ -201,24 +208,47 @@ private ClientReceiver createNativeReceiver( filters = new LinkedHashMap<>(filters); StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters); subscriptionListener.preSubscribe(() -> streamOptions); - ReceiverOptions receiverOptions = - new ReceiverOptions() - .deliveryMode(DeliveryMode.AT_LEAST_ONCE) - .autoAccept(false) - .autoSettle(false) - .handler(nativeHandler) - .closeHandler(closeHandler) - .creditWindow(0) - .properties(properties); + boolean directReplyTo = address == null; + ReceiverOptions receiverOptions = new ReceiverOptions(); + + if (directReplyTo) { + receiverOptions + .deliveryMode(DeliveryMode.AT_MOST_ONCE) + .autoAccept(true) + .autoSettle(true) + .sourceOptions() + .capabilities("rabbitmq:volatile-queue") + .expiryPolicy(ExpiryPolicy.LINK_CLOSE) + .durabilityMode(DurabilityMode.NONE); + } else { + receiverOptions + .deliveryMode(DeliveryMode.AT_LEAST_ONCE) + .autoAccept(false) + .autoSettle(false); + } + receiverOptions + .handler(nativeHandler) + .closeHandler(closeHandler) + .creditWindow(0) + .properties(properties); Map localSourceFilters = Collections.emptyMap(); if (!filters.isEmpty()) { localSourceFilters = Map.copyOf(filters); receiverOptions.sourceOptions().filters(localSourceFilters); } - ClientReceiver receiver = - (ClientReceiver) - ExceptionUtils.wrapGet( - nativeSession.openReceiver(address, receiverOptions).openFuture()); + ClientReceiver receiver; + if (directReplyTo) { + receiver = + (ClientReceiver) + ExceptionUtils.wrapGet( + nativeSession.openDynamicReceiver(receiverOptions).openFuture()); + } else { + receiver = + (ClientReceiver) + ExceptionUtils.wrapGet( + nativeSession.openReceiver(address, receiverOptions).openFuture()); + } + boolean filterOk = true; if (!filters.isEmpty()) { Map remoteSourceFilters = receiver.source().filters(); @@ -298,10 +328,11 @@ void recoverAfterConnectionFailure() { List.of(ofSeconds(1), ofSeconds(2), ofSeconds(3), BackOffDelayPolicy.TIMEOUT), "Create AMQP receiver to address '%s'", this.address); - this.initStateFromNativeReceiver(this.nativeReceiver); - this.pauseStatus.set(PauseStatus.UNPAUSED); - this.unsettledMessageCount.set(0); try { + this.directReplyToAddress = this.nativeReceiver.address(); + this.initStateFromNativeReceiver(this.nativeReceiver); + this.pauseStatus.set(PauseStatus.UNPAUSED); + this.unsettledMessageCount.set(0); this.nativeReceiver.addCredit(this.initialCredits); } catch (ClientException e) { throw ExceptionUtils.convert(e); @@ -493,6 +524,10 @@ private void settle( } } + String directReplyToAddress() { + return this.directReplyToAddress; + } + @Override public String toString() { return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}'; diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java index 85f8c7dd06..746cc5335f 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java @@ -33,6 +33,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder { private final AmqpConnection connection; private String queue; + private boolean directReplyTo = false; private Consumer.MessageHandler messageHandler; private int initialCredits = 100; private final List listeners = new ArrayList<>(); @@ -48,6 +49,19 @@ class AmqpConsumerBuilder implements ConsumerBuilder { @Override public ConsumerBuilder queue(String queue) { this.queue = queue; + if (this.queue == null) { + this.directReplyTo = true; + } else { + this.directReplyTo = false; + } + return this; + } + + ConsumerBuilder directReplyTo(boolean directReplyTo) { + this.directReplyTo = directReplyTo; + if (this.directReplyTo) { + this.queue = null; + } return this; } @@ -102,6 +116,10 @@ String queue() { return queue; } + boolean directReplyTo() { + return this.directReplyTo; + } + Consumer.MessageHandler messageHandler() { return messageHandler; } @@ -124,7 +142,7 @@ Map filters() { @Override public Consumer build() { - if (this.queue == null || this.queue.isBlank()) { + if ((this.queue == null || this.queue.isBlank()) && !this.directReplyTo) { throw new IllegalArgumentException("A queue must be specified"); } if (this.messageHandler == null) { @@ -442,7 +460,7 @@ public StreamFilterOptions propertySymbol(String key, String value) { @Override public StreamFilterOptions sql(String sql) { - if (!this.streamOptions.builder.connection.filterExpressionsSupported()) { + if (!this.streamOptions.builder.connection.sqlFilterExpressionsSupported()) { throw new IllegalArgumentException( "AMQP SQL filter expressions requires at least RabbitMQ 4.2.0"); } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java index 24eaac0eac..a59a39c1ec 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java @@ -713,11 +713,18 @@ private static class DefaultQueueInfo implements QueueInfo { @SuppressWarnings("unchecked") private DefaultQueueInfo(Map response) { + QueueType queueType; this.name = (String) response.get("name"); this.durable = (Boolean) response.get("durable"); this.autoDelete = (Boolean) response.get("auto_delete"); this.exclusive = (Boolean) response.get("exclusive"); - this.type = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH)); + try { + queueType = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH)); + } catch (Exception e) { + // this happens for reply-to queues, no need to make the type public + queueType = null; + } + this.type = queueType; this.arguments = Map.copyOf((Map) response.get("arguments")); this.leader = (String) response.get("leader"); String[] members = (String[]) response.get("replicas"); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java index d6e9f6af71..279afd3245 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java @@ -18,7 +18,6 @@ package com.rabbitmq.client.amqp.impl; import com.rabbitmq.client.amqp.AmqpException; -import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.Management; import com.rabbitmq.client.amqp.Message; import com.rabbitmq.client.amqp.Publisher; @@ -39,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class AmqpRpcClient implements RpcClient { +final class AmqpRpcClient implements RpcClient { private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcClient.class); @@ -48,7 +47,7 @@ class AmqpRpcClient implements RpcClient { private final AmqpConnection connection; private final Clock clock; private final Publisher publisher; - private final Consumer consumer; + private final AmqpConsumer consumer; private final Map outstandingRequests = new ConcurrentHashMap<>(); private final Supplier correlationIdSupplier; private final BiFunction requestPostProcessor; @@ -67,30 +66,39 @@ class AmqpRpcClient implements RpcClient { this.publisher = publisherBuilder.build(); String replyTo = builder.replyToQueue(); + boolean directReplyTo; if (replyTo == null) { - Management.QueueInfo queueInfo = - this.connection.management().queue().exclusive(true).autoDelete(true).declare(); - replyTo = queueInfo.name(); + directReplyTo = connection.directReplyToSupported(); + if (!directReplyTo) { + Management.QueueInfo queueInfo = + this.connection.management().queue().exclusive(true).autoDelete(true).declare(); + replyTo = queueInfo.name(); + } + } else { + directReplyTo = false; } if (builder.correlationIdExtractor() == null) { this.correlationIdExtractor = Message::correlationId; } else { this.correlationIdExtractor = builder.correlationIdExtractor(); } + AmqpConsumerBuilder consumerBuilder = (AmqpConsumerBuilder) this.connection.consumerBuilder(); + LOGGER.debug("Using direct reply-to: {}", this.connection.directReplyToSupported()); this.consumer = - this.connection - .consumerBuilder() - .queue(replyTo) - .messageHandler( - (ctx, msg) -> { - ctx.accept(); - OutstandingRequest request = - this.outstandingRequests.remove(this.correlationIdExtractor.apply(msg)); - if (request != null) { - request.future.complete(msg); - } - }) - .build(); + (AmqpConsumer) + consumerBuilder + .directReplyTo(directReplyTo) + .queue(replyTo) + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + OutstandingRequest request = + this.outstandingRequests.remove(this.correlationIdExtractor.apply(msg)); + if (request != null) { + request.future.complete(msg); + } + }) + .build(); if (builder.correlationIdSupplier() == null) { String correlationIdPrefix = UUID.randomUUID().toString(); @@ -102,14 +110,25 @@ class AmqpRpcClient implements RpcClient { } if (builder.requestPostProcessor() == null) { - DefaultAddressBuilder addressBuilder = Utils.addressBuilder(); - addressBuilder.queue(replyTo); - String replyToAddress = addressBuilder.address(); - // HTTP over AMQP 1.0 extension specification, 5.1: - // To associate a response with a request, the correlation-id value of the response properties - // MUST be set to the message-id value of the request properties. - this.requestPostProcessor = - (request, correlationId) -> request.replyTo(replyToAddress).messageId(correlationId); + if (directReplyTo) { + // HTTP over AMQP 1.0 extension specification, 5.1: + // To associate a response with a request, the correlation-id value of the response + // properties + // MUST be set to the message-id value of the request properties. + this.requestPostProcessor = + (request, correlationId) -> + request.replyTo(consumer.directReplyToAddress()).messageId(correlationId); + } else { + DefaultAddressBuilder addressBuilder = Utils.addressBuilder(); + addressBuilder.queue(replyTo); + String replyToAddress = addressBuilder.address(); + // HTTP over AMQP 1.0 extension specification, 5.1: + // To associate a response with a request, the correlation-id value of the response + // properties + // MUST be set to the message-id value of the request properties. + this.requestPostProcessor = + (request, correlationId) -> request.replyTo(replyToAddress).messageId(correlationId); + } } else { this.requestPostProcessor = builder.requestPostProcessor(); } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java index d29ec9dfb0..4b73c213dd 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java @@ -31,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class AmqpRpcServer implements RpcServer { +final class AmqpRpcServer implements RpcServer { private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcServer.class); @@ -66,6 +66,30 @@ class AmqpRpcServer implements RpcServer { Context context = new Context() { + + @Override + public boolean isRequesterAlive(Message message) { + String replyToAddr = message.replyTo(); + String replyToQueue = Utils.extractQueueName(replyToAddr); + boolean replyToOk = true; + if (replyToQueue != null) { + try { + connection.management().queueInfo(replyToQueue); + } catch (AmqpException.AmqpEntityDoesNotExistException e) { + replyToOk = false; + } catch (Exception e) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn( + "Error while checking reply queue '{}' ({}): {}", + replyToQueue, + replyToAddr, + e.getMessage()); + } + } + } + return replyToOk; + } + @Override public Message message() { return publisher.message(); @@ -95,6 +119,7 @@ public Message message(byte[] body) { } else { this.replyPostProcessor = builder.replyPostProcessor(); } + this.consumer = this.connection .consumerBuilder() @@ -104,8 +129,9 @@ public Message message(byte[] body) { Object correlationId = null; try { Message reply = handler.handle(context, msg); - if (reply != null && msg.replyTo() != null) { - reply.to(msg.replyTo()); + String replyToAddr = msg.replyTo(); + if (reply != null && replyToAddr != null) { + reply.to(replyToAddr); } correlationId = correlationIdExtractor.apply(msg); reply = replyPostProcessor.apply(reply, correlationId); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/UriUtils.java b/src/main/java/com/rabbitmq/client/amqp/impl/UriUtils.java index 2682eecab0..e8a0363ae0 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/UriUtils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/UriUtils.java @@ -106,4 +106,38 @@ private static String encode(String value, BitSet safeCharacters) { } return buf.toString(); } + + static String decode(final CharSequence content) { + return decode(content, false); + } + + private static String decode(final CharSequence content, final boolean plusAsBlank) { + if (content == null) { + return null; + } + final ByteBuffer bb = ByteBuffer.allocate(content.length()); + final CharBuffer cb = CharBuffer.wrap(content); + while (cb.hasRemaining()) { + final char c = cb.get(); + if (c == '%' && cb.remaining() >= 2) { + final char uc = cb.get(); + final char lc = cb.get(); + final int u = Character.digit(uc, RADIX); + final int l = Character.digit(lc, RADIX); + if (u != -1 && l != -1) { + bb.put((byte) ((u << 4) + l)); + } else { + bb.put((byte) '%'); + bb.put((byte) uc); + bb.put((byte) lc); + } + } else if (plusAsBlank && c == '+') { + bb.put((byte) ' '); + } else { + bb.put((byte) c); + } + } + bb.flip(); + return StandardCharsets.UTF_8.decode(bb).toString(); + } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java index 3078eec173..e4ea091eab 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java @@ -228,26 +228,20 @@ static int versionCompare(String str1, String str2) { } static boolean is4_0_OrMore(String brokerVersion) { - try { - return versionCompare(currentVersion(brokerVersion), "4.0.0") >= 0; - } catch (Exception e) { - LOGGER.debug("Unable to parse broker version {}", brokerVersion, e); - return true; - } + return atLeastVersion(brokerVersion, "4.0.0"); + } + + static boolean is4_1_OrMore(String brokerVersion) { + return atLeastVersion(brokerVersion, "4.1.0"); } static boolean is4_2_OrMore(String brokerVersion) { - try { - return versionCompare(currentVersion(brokerVersion), "4.2.0") >= 0; - } catch (Exception e) { - LOGGER.debug("Unable to parse broker version {}", brokerVersion, e); - return true; - } + return atLeastVersion(brokerVersion, "4.2.0"); } - static boolean is4_1_OrMore(String brokerVersion) { + private static boolean atLeastVersion(String brokerVersion, String expectedVersion) { try { - return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0; + return versionCompare(currentVersion(brokerVersion), expectedVersion) >= 0; } catch (Exception e) { LOGGER.debug("Unable to parse broker version {}", brokerVersion, e); return true; @@ -266,6 +260,10 @@ static boolean supportSqlFilterExpressions(String brokerVersion) { return is4_2_OrMore(brokerVersion); } + static boolean supportDirectReplyTo(String brokerVersion) { + return is4_2_OrMore(brokerVersion); + } + static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo { private final String address; @@ -365,4 +363,12 @@ interface RunnableWithException { void run() throws Exception; } + + static String extractQueueName(String address) { + if (address == null || !address.startsWith("/queues/")) { + return null; + } else { + return UriUtils.decode(address.replaceFirst("/queues/", "")); + } + } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java index a80ab7c1e8..bfbc376d2e 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java @@ -24,6 +24,7 @@ import static java.nio.charset.StandardCharsets.*; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.qpid.protonj2.client.DeliveryMode.AT_LEAST_ONCE; +import static org.apache.qpid.protonj2.client.DeliveryMode.AT_MOST_ONCE; import static org.apache.qpid.protonj2.client.DeliveryState.released; import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat; @@ -491,6 +492,57 @@ void refuseLinkReceiverToInvalidAddressShouldReturnInvalidField() throws Excepti } } + @Test + @BrokerVersionAtLeast(RABBITMQ_4_2_0) + void requestReplyVolatileQueue() throws Exception { + try (Client client = client()) { + org.apache.qpid.protonj2.client.Connection serverC = connection(client); + Session serverS = serverC.openSession(); + Sender serverSnd = serverS.openAnonymousSender(null); + Receiver serverR = + serverS.openReceiver( + "/queues/" + q, + new ReceiverOptions() + .deliveryMode(AT_LEAST_ONCE) + .autoSettle(false) + .autoAccept(false)); + + org.apache.qpid.protonj2.client.Connection clientC = connection(client); + Session clientS = clientC.openSession(); + ReceiverOptions receiverOptions = new ReceiverOptions().deliveryMode(AT_MOST_ONCE); + receiverOptions + .sourceOptions() + .capabilities("rabbitmq:volatile-queue") + .expiryPolicy(ExpiryPolicy.LINK_CLOSE) + .durabilityMode(DurabilityMode.NONE); + Receiver clientR = clientS.openDynamicReceiver(receiverOptions); + clientR.openFuture().get(); + assertThat(clientR.address()).isNotNull(); + + Sender clientSnd = clientC.openSender("/queues/" + q); + String body = UUID.randomUUID().toString(); + String corrId = UUID.randomUUID().toString(); + Message request = Message.create(body).replyTo(clientR.address()).messageId(corrId); + clientSnd.send(request); + + Delivery delivery = serverR.receive(10, SECONDS); + assertThat(delivery).isNotNull(); + request = delivery.message(); + Message response = + Message.create("*** " + request.body() + " ***") + .to(request.replyTo()) + .correlationId(request.messageId()); + serverSnd.send(response); + delivery.disposition(DeliveryState.accepted(), true); + + delivery = clientR.receive(10, SECONDS); + assertThat(delivery).isNotNull(); + response = delivery.message(); + assertThat(response.correlationId()).isEqualTo(corrId); + assertThat(response.body()).isEqualTo("*** " + body + " ***"); + } + } + private static void checkSession(Session s) throws Exception { ReceiverOptions receiverOptions = new ReceiverOptions(); receiverOptions.sourceOptions().capabilities("temporary-queue"); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java index 4ec728b933..0285f898b4 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java @@ -107,6 +107,87 @@ void rpcWithDefaults() { } } + @Test + void rpcIsRequesterAliveShouldReturnTrueIfPublisherStillOpen() throws Exception { + try (Connection clientConnection = environment.connectionBuilder().build(); + Connection serverConnection = environment.connectionBuilder().build()) { + + String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); + + RpcClient rpcClient = + clientConnection + .rpcClientBuilder() + .requestAddress() + .queue(requestQueue) + .rpcClient() + .build(); + + List calls = new CopyOnWriteArrayList<>(); + serverConnection + .rpcServerBuilder() + .requestQueue(requestQueue) + .handler( + (ctx, request) -> { + calls.add(ctx.isRequesterAlive(request)); + return HANDLER.handle(ctx, request); + }) + .build(); + + String request = UUID.randomUUID().toString(); + CompletableFuture responseFuture = + rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + Message response = responseFuture.get(10, TimeUnit.SECONDS); + assertThat(response.body()).asString(UTF_8).isEqualTo(process(request)); + assertThat(calls).containsExactly(true); + } + } + + @Test + void rpcIsRequesterAliveShouldReturnFalseIfPublisherClosed() { + try (Connection clientConnection = environment.connectionBuilder().build(); + Connection serverConnection = environment.connectionBuilder().build()) { + + String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); + + RpcClient rpcClient = + clientConnection + .rpcClientBuilder() + .requestAddress() + .queue(requestQueue) + .rpcClient() + .build(); + + Sync requestReceivedSync = sync(); + Sync requesterClosedSync = sync(); + + List calls = new CopyOnWriteArrayList<>(); + serverConnection + .rpcServerBuilder() + .requestQueue(requestQueue) + .handler( + (ctx, request) -> { + calls.add(ctx.isRequesterAlive(request)); + requestReceivedSync.down(); + requesterClosedSync.await(Duration.ofSeconds(20)); + calls.add(ctx.isRequesterAlive(request)); + return null; + }) + .build(); + + String request = UUID.randomUUID().toString(); + CompletableFuture responseFuture = + rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + assertThat(requestReceivedSync).completes(); + assertThat(calls).containsExactly(true); + rpcClient.close(); + assertThat(responseFuture).completesExceptionallyWithin(Duration.ofSeconds(10)); + requesterClosedSync.down(); + waitAtMost(() -> calls.size() == 2); + assertThat(calls).containsExactly(true, false); + waitAtMost(() -> serverConnection.management().queueInfo(requestQueue).messageCount() == 0); + } + } + @Test void rpcWithCustomSettings() { try (Connection clientConnection = environment.connectionBuilder().build(); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/UriUtilsTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/UriUtilsTest.java index 149423e18d..9bc2b3567a 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/UriUtilsTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/UriUtilsTest.java @@ -55,4 +55,13 @@ void encodeParamTest(String param, String expected) { void encodeNonUnreservedTest(String param, String expected) { assertThat(encodeNonUnreserved(param)).isEqualTo(expected); } + + @ParameterizedTest + @CsvSource({ + "test,test", + "amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8%3D.0T5a3Sa%2BQ7ZRPeMFMi%2BJ0A%3D%3D,amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8=.0T5a3Sa+Q7ZRPeMFMi+J0A==" + }) + void decodeTest(String in, String expected) { + assertThat(decode(in)).isEqualTo(expected); + } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/UtilsTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/UtilsTest.java index f19d58cae4..177530f158 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/UtilsTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/UtilsTest.java @@ -18,12 +18,14 @@ package com.rabbitmq.client.amqp.impl; import static com.rabbitmq.client.amqp.impl.Utils.checkMessageAnnotations; +import static com.rabbitmq.client.amqp.impl.Utils.extractQueueName; import static java.util.Map.of; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; public class UtilsTest { @@ -100,4 +102,14 @@ void validateBrokerVersionParsing41AndLater(String brokerVersion) { assertThat(Utils.is4_1_OrMore(brokerVersion)).isTrue(); assertThat(Utils.supportFilterExpressions(brokerVersion)).isTrue(); } + + @ParameterizedTest + @CsvSource({ + "/queues/q1,q1", + "/exchanges/foo,", + "/queues/amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8%3D.0T5a3Sa%2BQ7ZRPeMFMi%2BJ0A%3D%3D,amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8=.0T5a3Sa+Q7ZRPeMFMi+J0A==" + }) + void extractQueueNameTest(String address, String expected) { + assertThat(extractQueueName(address)).isEqualTo(expected); + } }