diff --git a/docs/amqp.md b/docs/amqp.md index 7f00eb13a..115d307d7 100644 --- a/docs/amqp.md +++ b/docs/amqp.md @@ -307,7 +307,6 @@ This section lists features that RabbitMQ supports exclusively in AMQP 1.0, whic ### AMQP 0.9.1 Features This section lists features that RabbitMQ supports exclusively in AMQP 0.9.1, which are currently not available in AMQP 1.0: -* **[Direct Reply-to](./direct-reply-to)**: While AMQP 1.0 clients can perform Remote Procedure Calls (RPCs) by declaring a reply queue (as [described](https://rabbitmq.github.io/rabbitmq-amqp-java-client/stable/htmlsingle/#remote-procedure-call-rpc) in the RabbitMQ AMQP 1.0 Java client), the Direct Reply-to feature is only available in AMQP 0.9.1. * **[AMQP 0.9.1 Channel Interceptor](https://github.com/rabbitmq/internals/blob/master/interceptors.md)**: Plugins, such as the [Sharding Plugin](https://github.com/rabbitmq/rabbitmq-server/tree/main/deps/rabbitmq_sharding#rabbitmq-sharding-plugin), that intercept and modify frames are [currently](https://github.com/rabbitmq/rabbitmq-server/issues/10051) only supported for AMQP 0.9.1. * **Message rates in the Management UI**: Message rates for exchanges and queues are shown in the Management UI for AMQP 0.9.1 connections. These rates are not available for AMQP 1.0 connections. * **[Transactions](./semantics)**: AMQP 0.9.1 provides limited support, whereas AMQP 1.0 currently does not support transactions (as listed in the [limitations](#limitations)). diff --git a/docs/confirms.md b/docs/confirms.md index b2a6ccd54..155cddb2d 100644 --- a/docs/confirms.md +++ b/docs/confirms.md @@ -81,7 +81,7 @@ so the delivered message can be marked for future deletion. Sometimes publishing and consuming applications need to communicate via requests and responses that need an explicit acknowledgement from the peer. [RabbitMQ tutorial #6](/tutorials) demonstrates the basics of how that's done, and [Direct Reply-to](./direct-reply-to) provides -a way to do it without declaring a lot of short-lived temporary response queues. +a way to do it without creating response queues. This type of communication, however, is not covered in this guide, and is mentioned only to contrast it with the much more focussed messaging protocol features described in this guide. diff --git a/docs/direct-reply-to.md b/docs/direct-reply-to.md index dc71c4e1b..0c63481cf 100644 --- a/docs/direct-reply-to.md +++ b/docs/direct-reply-to.md @@ -1,5 +1,5 @@ --- -title: Direct Reply-to +title: Direct Reply-To --- -# Direct Reply-to - -## Overview {#overview} - -Direct reply-to is a feature that allows RPC (request/reply) clients with a design -similar to that demonstrated in [tutorial 6](/tutorials) without requiring the creation -of a reply queue. - -:::important - -Request-reply implementations where clients use explicitly declared queues, both -long-lived client named and connection-specific exclusive queues, are -just as valid as Direct Reply-to, and have their benefits, in particular -for workloads with long-running tasks - -::: - -## Motivation {#motivation} - -RPC (request/reply) is a popular pattern to implement with a messaging broker -like RabbitMQ. [Tutorial 6](/tutorials) demonstrates its implementation -with a variety of clients. The typical way to do this is for RPC clients to -send requests that are routed to a long lived (known) server queue. The RPC server(s) -consume requests from this queue and then send replies to each client -using the queue named by the client in the reply-to -header. - -Where does the client's queue come from? The client can -declare a single-use queue for each request-response pair. But -this is inefficient; even an unreplicated queue can be -expensive to create and then delete (compared with the cost of -sending a message). This is especially true in a cluster as all -cluster nodes need to agree that the queue has been created, -agree on its type, replication parameters, and other metadata. - -Therefore, the client should create a single reply queue for multiple RPC requests. - -The [properties](queues#properties) of this reply queue depend on the use case: - -* **[Exclusive](queues#exclusive-queues) queues** are commonly used when replies are consumed by a single client and deleted upon disconnection -* **Non-exclusive long-lived queues** are better suited for long-running tasks, ensuring replies persist even if the client disconnects temporarily - -Direct reply-to eliminates the need for a reply queue. This benefits the request-reply -implementations with short-lived queues and transient responses at the cost -of giving up all control over how the responses are stored. - -With Direct Reply-to, RPC clients will receive replies directly from their RPC server, -without going through a reply queue. "Directly" here still means going through the same channel -and a RabbitMQ node; there is no direct network connection between RPC client and RPC server processes. - -## How to Use Direct Reply-to {#usage} - -To use direct reply-to, an RPC client should: - - - -The RPC server will then see a reply-to property -with a generated name. It should publish to the default exchange -(``""``) with the routing key set to this value (i.e. just as if -it were sending to a reply queue as usual). The message will -then be sent straight to the client consumer. - -If the RPC server is going to perform some expensive computation -it might wish to check if the client has gone away. To do this -the server can declare the generated reply name first on a -disposable channel in order to determine whether it still -exists. Note that even if you declare the "queue" with -passive=false there is no way to create it; the -declare will just succeed (with 0 messages ready and 1 consumer) -or fail. - -## Caveats and Limitations {#limitations} - - - -## When to Use Direct Reply-to - -While clients should use long lived connections, direct reply-to is ideal for workloads with -[high connection churn](connections#high-connection-churn), where clients establish a connection -for a single RPC and disconnect immediately after. -By avoiding the creation of queue metadata in the [metadata store](metadata-store), direct -reply-to can reduce overhead and latency. - -For workloads with long-lived connections where clients perform multiple RPCs, the performance -benefits of direct reply-to are not significant compared to [classic queues](classic-queues). -Modern RabbitMQ versions have optimized classic queues for low latency and minimal resource usage, -making them similarly efficient in such scenarios. +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +# Direct Reply-To + +## Overview + +Direct Reply-To lets you implement RPC (request/reply) patterns like those in [tutorial 6](/tutorials#6-rpc) without creating a dedicated reply [queue](./queues). + +## Motivation + +RPC (request/reply) is a common pattern with brokers such as RabbitMQ. +[Tutorial 6](/tutorials#6-rpc) shows several client implementations. Typically, the requester (RPC client) sends requests that are routed to a long-lived, known request queue. The responder (RPC server) consumes from that queue and sends replies using the queue name supplied in the request message’s `reply-to` property. + +Where does the requester’s reply queue come from? +A requester could declare a single-use queue for each request–reply pair, but that’s inefficient: even an unreplicated queue is relatively expensive to create and delete compared to the cost of receiving a reply. +In clusters the overhead is higher because all nodes must agree on creation, type, replication parameters, and other metadata. + +A better approach is to create a single reply queue per requester and reuse it across requests. +The [properties](queues#properties) of that queue depend on the use case: +* [Exclusive](queues#exclusive-queues) queues are common when a single client consumes replies and the queue is deleted on disconnect. +* Non-exclusive, long-lived queues suit long-running tasks where replies should survive brief client disconnects. + +**Direct Reply-To** is a RabbitMQ-specific alternative that completely **eliminates the reply queue**. That means: +* No queue metadata are written to the [metadata store](./metadata-store) (Khepri). +* No queue buffers or persists reply messages. +* No separate Erlang process exists for the reply queue. + +Main benefits: +* Less load on the [metadata store](./metadata-store) (no insert/delete of queue metadata). +* Less load on the [Management HTTP API](./management#http-api): fewer queues to list in the [Management UI](./management#external-monitoring) and fewer metrics to emit. +* Fewer Erlang processes on the broker. + +With Direct Reply-To, on the broker side, the responder’s AMQP 1.0 [session](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-sessions) or AMQP 0.9.1 [channel](./channels) process delivers the reply directly to the requester’s session/channel process without going through an actual queue. + +“Directly” still means via the broker; there is no point-to-point network connection between the two client applications. + +## When to Use Direct Reply-To {#when-to-use} + +The main use case is scale: many (tens of thousands of) clients performing request/reply. + +While clients should prefer long-lived connections, Direct Reply-To also works well with [high connection churn](connections#high-connection-churn), where a client connects for a single RPC and disconnects immediately afterward. Avoiding queue create/delete reduces overhead and latency. + +Since Direct Reply-To has **at-most-once** delivery semantics for replies, use it only when losing a reply is acceptable. For example, if a reply isn’t received within a timeout, the requester is expected to resend the request. + +## When to Avoid Direct Reply-To {#when-to-avoid} + +Avoid Direct Reply-To if any of the following apply: +* You require **at-least-once** guarantees for replies (i.e., losing a reply is unacceptable). +* Replies must be durably buffered by the broker. +* You need high throughput to the same requester (e.g., hundreds+ of messages per second). Queues exist to buffer when a consumer can’t keep up. + +For workloads with long-lived connections and multiple RPCs, the benefits of Direct Reply-To are smaller relative to using [classic queues](classic-queues). +Modern RabbitMQ versions optimize classic queues for low latency and low resource usage, so they can be similarly efficient in these scenarios. +Conventional request-reply using explicitly declared classic queues is equally valid and can be preferable for long-running tasks. + +## Broker Implementation Details + +Internally, RabbitMQ implements Direct Reply-To using the `rabbit_volatile_queue` queue type. +“Volatile” describes the semantics: non-durable, zero-buffer, at-most-once, may drop, and not stored in the metadata store. + +You will see `rabbit_volatile_queue` only in a few places. Instances do not appear in the Management UI or in `rabbitmqctl list_queues`. + +One place is in [Prometheus](./prometheus) metrics, for example: +* `rabbitmq_global_messages_delivered_total{protocol="amqp10",queue_type="rabbit_volatile_queue"}` + Number of messages (“direct replies”) delivered to AMQP 1.0 requesters. +* `rabbitmq_global_messages_delivered_total{protocol="amqp091",queue_type="rabbit_volatile_queue"}` + Number of messages (“direct replies”) delivered to AMQP 0.9.1 requesters. +* `rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"}` + Number of messages (“direct replies”) dropped by RabbitMQ. + +## Usage + +Direct Reply-To is supported for AMQP 1.0 and AMQP 0.9.1. +It also works across protocols (e.g., AMQP 1.0 requester with AMQP 0.9.1 responder, or vice versa). + +### Usage in AMQP 1.0 {#usage-amqp} + +The requester first attaches a link to receive reply messages. +The [attach](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-attach) (with a [source](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-source)) must set specific fields. +If your [AMQP 1.0 client library](/client-libraries/amqp-client-libraries) supports Direct Reply-To, it will set them for you (see [examples](#examples-amqp)). +Otherwise: +* Set `snd-settle-mode` to `settled` since RabbitMQ sends all replies settled. + There is no queue to return a reply to if the requester disconnects or rejects it. +* Leave `address` unset; RabbitMQ will generate the address. +* Leave `durable` unset or set it to `none`; no state is kept in the RabbitMQ metadata store. +* Set `expiry-policy` to `link-detach`. +* Leave `timeout` unset or set it to `0`. +* Include `rabbitmq:volatile-queue` in `capabilities`. + +RabbitMQ returns a broker-generated pseudo-queue address in the `address` field of the `attach` response. +It looks like `/queues/amq.rabbitmq.reply-to.`, where `` is not meaningful to clients. + +Before sending the first request, the requester must grant link credit to this pseudo-queue. + +For each request, set the following message [properties](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties): +1. `message-id`: a globally unique value. (The responder will set the reply’s `correlation-id` to this value.) +2. `reply-to`: the address received in the `attach` response. + +The responder reads the request’s `reply-to` and sends the reply to that address via one of two options: +1. Attach a sending link to the anonymous terminus (null target address) as described in [Target Address v2](./amqp#target-address-v2), and set the reply address in the message’s `to` property. + Useful when replying to many different requesters (no per-requester link). +2. Create a sending link directly to the provided address. + In this case, RabbitMQ checks whether the requester is still connected; if not, RabbitMQ refuses the link. + +If the responder will perform expensive work, it can proactively check whether the requester is still present by issuing an HTTP GET over AMQP. A `200` status indicates the requester is still connected (see [examples](#examples-amqp)). + +#### AMQP 1.0 Caveats and Limitations {#caveats-amqp} + +* The requester must receive replies settled. + There is no queue to return a reply if the requester disconnects or rejects it. +* The requester can receive replies only on the same connection and session where it attached its receiving link. + Consumption from this pseudo-queue on another session is not supported. +* The responder should send replies settled. + If it sends replies unsettled, RabbitMQ immediately settles them with the [accepted](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-accepted) outcome, even if the reply may subsequently be dropped. +* Replies sent via Direct Reply-To are not fault-tolerant. + Because this pseudo-queue does not buffer, RabbitMQ drops replies when: + * The AMQP 1.0 requester runs out of link credit. + It’s the requester’s responsibility to grant sufficient link credit. + * AMQP 1.0 [session flow control](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-session-flow-control) disallows delivery. + The requester must keep its `incoming-window` large enough. + * The broker’s AMQP 1.0 writer process cannot send replies fast enough to the requester. + +If message loss is unacceptable, use [classic queues](classic-queues) instead of Direct Reply-To. + +#### Examples: AMQP 1.0 {#examples-amqp} + + + +```java +String requestQueue = "request-queue"; + +// create the responder +Responder responder = connection.responderBuilder() + .requestQueue(requestQueue) + .handler((ctx, req) -> { + // check whether the requester is still connected (optional) + if (ctx.isRequesterAlive(req)) { + String in = new String(req.body(), UTF_8); + String out = "*** " + in + " ***"; + return ctx.message(out.getBytes(UTF_8)); + } else { + return null; + } + }).build(); + +// create the requester, it uses direct reply-to by default +Requester requester = connection.requesterBuilder() + .requestAddress().queue(requestQueue) + .requester() + .build(); + +// create the request message +Message request = requester.message("hello".getBytes(UTF_8)); +// send the request +CompletableFuture responseFuture = requester.publish(request); +// wait for the response +Message response = responseFuture.get(10, TimeUnit.SECONDS); +``` + + +```erlang +%% 1. Requester attaches its receiving link. +OpnConfRequester = OpnConfRequester0#{notify_with_performative => true}, +{ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester), +{ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester), +Source = #{address => undefined, + durable => none, + expiry_policy => <<"link-detach">>, + dynamic => true, + capabilities => [<<"rabbitmq:volatile-queue">>]}, +AttachArgs = #{name => <<"receiver requester">>, + role => {receiver, Source, self()}, + snd_settle_mode => settled, + rcv_settle_mode => first}, +{ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, AttachArgs), + +%% Requester learns the broker-generated reply address. +Addr = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, Addr0}}} = Attach, + Addr0 + end, + +%% Requester must grant link credit before sending the first request. +ok = amqp10_client:flow_link_credit(ReceiverRequester, 1000, 500), + +%% 2. Requester sends the request. +ok = amqp10_client:send_msg( + SenderRequester, + amqp10_msg:set_properties( + #{message_id => RpcId, + reply_to => Addr}, + amqp10_msg:new(DeliveryTag, RequestPayload, true))), + +%% 3. Responder receives the request and reads relevant properties. +... +#{message_id := RpcId, + reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg), + +%% Optionally, the responder checks whether the requester is still connected. +{ok, #{queue := ReplyQueue}} = rabbitmq_amqp_address:to_map(ReplyToAddr), +case rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQueue) of + {ok, #{}} -> + %% requester is still there + ok; + _ -> + throw(requester_absent) +end, + +%% 4. Responder replies (attached to the anonymous terminus). +ok = amqp10_client:send_msg( + SenderResponder, + amqp10_msg:set_properties( + #{to => ReplyToAddr, + correlation_id => RpcId}, + amqp10_msg:new(Tag, ReplyPayload, true))), + +%% 5. Requester receives the reply. +receive {amqp10_msg, ReceiverRequester, ReplyMsg} -> + %% process reply here... + ok +end. +``` + + + + +### Usage in AMQP 0.9.1 {#usage-amqp091} + +To use Direct Reply-To, a requester must: +1. Consume from the pseudo-queue `amq.rabbitmq.reply-to` in no-ack mode. + There is no need to declare this "queue" first (though the client may). +2. Set the request message’s `reply-to` to `amq.rabbitmq.reply-to`. + +When forwarding the request, RabbitMQ transparently rewrites `reply-to` to `amq.rabbitmq.reply-to.`, where `` is not meaningful to clients. +The responder then publishes the reply to the default exchange (`""`) using that value as the routing key. + +If the responder will perform expensive work, it can check whether the client has gone away by passively declaring the generated reply queue name on a disposable channel. +Even with `passive=false` there is no way to create it; the declare either succeeds (0 ready messages, 1 consumer) or fails. + +#### AMQP 0.9.1 Caveats and Limitations {#caveats-amqp091} + +* The requester must consume in [automatic acknowledgement mode](./confirms#acknowledgement-modes). + There is no queue to return a reply if the requester disconnects or rejects it. +* The requester must use the same connection and [channel](./channels) both to consume from `amq.rabbitmq.reply-to` and to publish the request. +* Replies sent via Direct Reply-To are not fault-tolerant; they are dropped if the client that published the request disconnects. + The requester is expected to reconnect and resubmit the request. +* The name `amq.rabbitmq.reply-to` is used in `basic.consume` and the `reply-to` property as if it were a queue; however it is not. + It cannot be deleted and does not appear in the management plugin or `rabbitmqctl list_queues`. +* If the responder publishes with the `mandatory` flag, `amq.rabbitmq.reply-to.*` is treated as a queue for routing. + Whether the requester is still present is not checked at routing time. + In other words, a message routed solely to this name is considered "routed", and RabbitMQ will not send a `basic.return`. +* The requester can create at most one Direct Reply-To consumer (`basic.consume`) per channel. + +#### Examples: AMQP 0.9.1 {#examples-amqp091} + + + +```erlang +%% 1. Requester consumes from pseudo-queue in no-ack mode. +amqp_channel:subscribe(RequesterChan, + #'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, + no_ack = true}, + self()), +CTagRequester = receive #'basic.consume_ok'{consumer_tag = CTag} -> CTag + end, + +%% 2. Requester sends the request. +amqp_channel:cast( + RequesterChan, + #'basic.publish'{routing_key = RequestQueue}, + #amqp_msg{props = #'P_basic'{reply_to = <<"amq.rabbitmq.reply-to">>, + message_id = RpcId}, + payload = RequestPayload}), + +%% 3. Responder receives the request. +{ReplyTo, RpcId} = + receive {#'basic.deliver'{consumer_tag = CTagResponder}, + #amqp_msg{payload = RequestPayload, + props = #'P_basic'{reply_to = ReplyTo0, + message_id = RpcId0}}} -> + {ReplyTo0, RpcId0} + end, + +%% 4. Responder replies. +amqp_channel:cast( + ResponderChan, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{props = #'P_basic'{correlation_id = RpcId}, + payload = ReplyPayload}), + +%% 5. Requester receives the reply +receive {#'basic.deliver'{consumer_tag = CTagRequester}, + #amqp_msg{payload = ReplyPayload, + props = #'P_basic'{correlation_id = RpcId}}} -> + %% process reply here... + ok +end. +``` + + diff --git a/docs/extensions.md b/docs/extensions.md index 69d3f0d2c..10123febd 100644 --- a/docs/extensions.md +++ b/docs/extensions.md @@ -41,8 +41,7 @@ extension points such as [optional queue arguments](./queues#optional-arguments) * [Consumer Cancellation Notifications](./consumer-cancel) let a consumer know if it has been cancelled by the server. * [`basic.nack`](./nack) extends `basic.reject` to support rejecting multiple messages at once. * [Consumer Priorities](./consumer-priority) allow you to send messages to higher priority consumers first. - * [Direct reply-to](./direct-reply-to) allows RPC clients to receive replies to their queries without needing - to declare a temporary queue. + * [Direct Reply-To](./direct-reply-to) allows RPC clients to receive replies to their queries without needing to declare a queue. ## Message Routing diff --git a/sidebarsDocs.js b/sidebarsDocs.js index f9003a40b..d3c26bc12 100644 --- a/sidebarsDocs.js +++ b/sidebarsDocs.js @@ -164,7 +164,7 @@ const sidebars = { { type: 'doc', id: 'direct-reply-to', - label: 'Direct reply-to', + label: 'Direct Reply-To', }, { type: 'doc',