Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions src/docs/asciidoc/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -165,106 +165,106 @@ rabbitmq_amqp_published_released_total 1.0
rabbitmq_amqp_publishers 1.0
------

=== Remote Procedure Call (RPC)
=== Request/Response

Remote procedure call with RabbitMQ consists in a client sending a request message and a server replying with a response message.
Both the RPC client and server are _client applications_ and the messages flow through the broker.
The RPC client must send a reply-to queue address with the request.
The RPC server uses this reply-to queue address to send the response.
There must also be a way to correlate a request with its response, this is usually handled with a header that the RPC client and server agree on.
Request/response with RabbitMQ consists in a requester sending a request message and a responder replying with a response message.
Both the requester and responder are _client applications_ and the messages flow through the broker.
The requester must send a reply-to queue address with the request.
The responder uses this reply-to queue address to send the response.
There must also be a way to correlate a request with its response, this is usually handled with a header that the requester and responder agree on.

The library provides RPC client and server support classes.
The library provides requester and responder support classes.
They use sensible defaults and some of the internal mechanics are configurable.
They should meet the requirements of most RPC use cases.
It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the RPC support classes do.
They should meet the requirements of most request/response use cases.
It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the request/response support classes do.

Here is how to create an RPC server instance:
Here is how to create a responder instance:

.Creating an RPC server
.Creating a responder
[source,java,indent=0]
--------
include::{test-examples}/RpcApi.java[tag=rpc-server-creation]
include::{test-examples}/RequestResponseApi.java[tag=responder-creation]
--------
<1> Use builder from connection
<2> Set the queue to consume requests from (it must exist)
<3> Define the processing logic
<4> Create the reply message

Note the RPC server does not create the queue it waits requests on.
Note the responder does not create the queue it waits requests on.
It must be created beforehand.

Here is how to create an RPC client:
Here is how to create a requester:

.Creating an RPC client
.Creating a requester
[source,java,indent=0]
--------
include::{test-examples}/RpcApi.java[tag=rpc-client-creation]
include::{test-examples}/RequestResponseApi.java[tag=requester-creation]
--------
<1> Use builder from connection
<2> Set the address to send request messages to

The RPC client will send its request to the configured destination.
The requester will send its request to the configured destination.
It can be an exchange or a queue, like in the example above.

Here is how to send a request:

.Sending a request
[source,java,indent=0]
--------
include::{test-examples}/RpcApi.java[tag=rpc-client-request]
include::{test-examples}/RequestResponseApi.java[tag=requester-request]
--------
<1> Create the message request
<2> Send the request
<3> Wait for the reply (synchronously)

The `RpcClient#publish(Message)` method returns a `CompletableFuture<Message>` that holds the reply message.
The `Requester#publish(Message)` method returns a `CompletableFuture<Message>` that holds the reply message.
It is then possible to wait for the reply asynchronously or synchronously.

The RPC server has the following behavior:
The responder has the following behavior:

* when receiving a message request, it calls the processing logic (handler), extracts the correlation ID, calls a reply post-processor if defined, and sends the reply message.
* if all these operations succeed, the server accepts the request message (settles it with the `ACCEPTED` outcome).
* if any of these operations throws an exception, the server discards the request message (the message is removed from the request queue and is https://www.rabbitmq.com/client-libraries/amqp-client-libraries#message-processing-result-outcome[dead-lettered] if configured).
* if all these operations succeed, the responder accepts the request message (settles it with the `ACCEPTED` outcome).
* if any of these operations throws an exception, the responder discards the request message (the message is removed from the request queue and is https://www.rabbitmq.com/client-libraries/amqp-client-libraries#message-processing-result-outcome[dead-lettered] if configured).

The RPC server uses the following defaults:
The responder uses the following defaults:

* it uses the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`message-id` property] for the correlation ID.
* it assigns the correlation ID (so the _request_ `message-id` by default) to the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`correlation-id` property].
* it assigns the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`to` property] if it is defined.
This behavior is hardcoded but it is possible to cancel it thanks to a reply post-processor.
This behavior is hardcoded, but it is possible to override it thanks to a reply post-processor.

The RPC client uses the following defaults:
The requester uses the following defaults:

* it uses https://www.rabbitmq.com/docs/direct-reply-to[direct reply-to] if available (RabbitMQ 4.2 or more) for replies if no reply-to queue is set (it falls back to an auto-delete, exclusive queue if direct reply-to is not available)
* it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix (`{UUID}-{sequence}`, e.g. `6f839461-6b19-47e1-80b3-6be10d899d85-42`).
The prefix is different for each `RpcClient` instance and the suffix is incremented by one for each request.
The prefix is different for each `Requester` instance and the suffix is incremented by one for each request.
* it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the reply-to queue address (defined by the user or created automatically, see above).
* it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`message-id` property] to the generated correlation ID.
* it extracts the correlation ID from the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`correlation-id` property] to correlate a reply with the appropriate request.

Let's see how to customize some of the RPC support mechanics.
Let's see how to customize some of the request/response support mechanics.
Imagine the request `message-id` property is a critical piece of information and we do not want to use it as the correlation ID.
The request can use the `correlation-id` property and the RPC server just has to extract the correlation ID from this property (instead of the `message-id` property by default).
The request can use the `correlation-id` property and the responder just has to extract the correlation ID from this property (instead of the `message-id` property by default).
Let's also use a random UUID for the correlation ID generation (avoids doing this in production: this is OK in terms of uniqueness but not optimal in terms of performance because randomness is not cheap).

Here is how to declare the RPC client:
Here is how to declare the requester:

.Customizing the RPC client
.Customizing the requester
[source,java,indent=0]
--------
include::{test-examples}/RpcApi.java[tag=rpc-custom-client-creation]
include::{test-examples}/RequestResponseApi.java[tag=custom-requester-creation]
--------
<1> Declare the reply-to queue
<2> Use a random UUID as correlation ID
<3> Use the `correlation-id` property for the request
<4> Set the `reply-to` property
<5> Set the address to send request messages to

We just have to tell the RPC server to get the correlation ID from the request `correlation-id` property:
We just have to tell the responder to get the correlation ID from the request `correlation-id` property:

.Customizing the RPC server
.Customizing the responder
[source,java,indent=0]
--------
include::{test-examples}/RpcApi.java[tag=rpc-custom-server-creation]
include::{test-examples}/RequestResponseApi.java[tag=custom-responder-creation]
--------
<1> Get the correlation ID from the request `correlation-id` property
12 changes: 6 additions & 6 deletions src/main/java/com/rabbitmq/client/amqp/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ public interface Connection extends Closeable, Resource {
ConsumerBuilder consumerBuilder();

/**
* Create a builder to configure and create a {@link RpcClientBuilder}.
* Create a builder to configure and create a {@link RequesterBuilder}.
*
* @return RPC client builder
* @return requester builder
*/
RpcClientBuilder rpcClientBuilder();
RequesterBuilder requesterBuilder();

/**
* Create a builder to configure and create a {@link RpcServerBuilder}.
* Create a builder to configure and create a {@link Responder}.
*
* @return RPC server builder
* @return responder builder
*/
RpcServerBuilder rpcServerBuilder();
ResponderBuilder responderBuilder();

/** Close the connection and its resources */
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import java.util.concurrent.CompletableFuture;

/**
* Client support class for RPC.
* Requester support class for request/response interaction.
*
* @see RpcClientBuilder
* @see RequesterBuilder
*/
public interface RpcClient extends AutoCloseable {
public interface Requester extends AutoCloseable {

/**
* Create a message meant to be published by the underlying publisher instance.
Expand Down Expand Up @@ -55,7 +55,7 @@ public interface RpcClient extends AutoCloseable {
*/
CompletableFuture<Message> publish(Message message);

/** Close the RPC client and its resources. */
/** Close the requester and its resources. */
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@
import java.util.function.Function;
import java.util.function.Supplier;

/** API to configure and create a {@link RpcClient}. */
public interface RpcClientBuilder {
/** API to configure and create a {@link Requester}. */
public interface RequesterBuilder {

/**
* Builder for the request address.
*
* @return the request address builder
*/
RpcClientAddressBuilder requestAddress();
RequesterAddressBuilder requestAddress();

/**
* The queue the client expects responses on.
*
* <p>The queue <b>must</b> exist if it is set.
*
* <p>The RPC client will create an exclusive, auto-delete queue if it is not set.
* <p>The requester will a direct reply-to queue (RabbitMQ 4.2 or more) or create an exclusive,
* auto-delete queue if this parameter is not set.
*
* @param replyToQueue reply queue
* @return this builder instance
*/
RpcClientBuilder replyToQueue(String replyToQueue);
RequesterBuilder replyToQueue(String replyToQueue);

/**
* The generator for correlation ID.
Expand All @@ -53,7 +54,7 @@ public interface RpcClientBuilder {
* @param correlationIdSupplier correlation ID generator
* @return the this builder instance
*/
RpcClientBuilder correlationIdSupplier(Supplier<Object> correlationIdSupplier);
RequesterBuilder correlationIdSupplier(Supplier<Object> correlationIdSupplier);

/**
* A callback before sending a request message.
Expand All @@ -67,7 +68,7 @@ public interface RpcClientBuilder {
* @param requestPostProcessor logic to post-process request message
* @return this builder instance
*/
RpcClientBuilder requestPostProcessor(BiFunction<Message, Object, Message> requestPostProcessor);
RequesterBuilder requestPostProcessor(BiFunction<Message, Object, Message> requestPostProcessor);

/**
* Callback to extract the correlation ID from a reply message.
Expand All @@ -80,31 +81,31 @@ public interface RpcClientBuilder {
* @param correlationIdExtractor correlation ID extractor
* @return this builder instance
*/
RpcClientBuilder correlationIdExtractor(Function<Message, Object> correlationIdExtractor);
RequesterBuilder correlationIdExtractor(Function<Message, Object> correlationIdExtractor);

/**
* Timeout before failing outstanding requests.
*
* @param timeout timeout
* @return the builder instance
*/
RpcClientBuilder requestTimeout(Duration timeout);
RequesterBuilder requestTimeout(Duration timeout);

/**
* Build the configured instance.
*
* @return the configured instance
*/
RpcClient build();
Requester build();

/** Builder for the request address. */
interface RpcClientAddressBuilder extends AddressBuilder<RpcClientAddressBuilder> {
interface RequesterAddressBuilder extends AddressBuilder<RequesterAddressBuilder> {

/**
* Go back to the RPC client builder.
* Go back to the requester builder.
*
* @return the RPC client builder
* @return the requester builder
*/
RpcClientBuilder rpcClient();
RequesterBuilder requester();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package com.rabbitmq.client.amqp;

/**
* Client server class for RPC.
* Responder class for request/response interaction.
*
* @see RpcServerBuilder
* @see ResponderBuilder
*/
public interface RpcServer extends AutoCloseable {
public interface Responder extends AutoCloseable {

/** Contract to process a request message and return a reply message. */
@FunctionalInterface
Expand Down Expand Up @@ -87,7 +87,7 @@ interface Context {
/** Request to receive messages again. */
void unpause();

/** Close the RPC server and its resources. */
/** Close the responder and its resources. */
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@
import java.util.function.BiFunction;
import java.util.function.Function;

/** API to configure and create a {@link RpcServer}. */
public interface RpcServerBuilder {
/** API to configure and create a {@link Responder}. */
public interface ResponderBuilder {

/**
* The queue to wait for requests on.
*
* @param requestQueue request queue
* @return this builder instance
*/
RpcServerBuilder requestQueue(String requestQueue);
ResponderBuilder requestQueue(String requestQueue);

/**
* The logic to process requests and issue replies.
*
* @param handler handler
* @return this builder instance
*/
RpcServerBuilder handler(RpcServer.Handler handler);
ResponderBuilder handler(Responder.Handler handler);

/**
* Logic to extract the correlation ID from a request message.
Expand All @@ -48,7 +48,7 @@ public interface RpcServerBuilder {
* @param correlationIdExtractor logic to extract the correlation ID
* @return this builder instance
*/
RpcServerBuilder correlationIdExtractor(Function<Message, Object> correlationIdExtractor);
ResponderBuilder correlationIdExtractor(Function<Message, Object> correlationIdExtractor);

/**
* A callback called after request processing but before sending the reply message.
Expand All @@ -61,7 +61,7 @@ public interface RpcServerBuilder {
* @param replyPostProcessor logic to post-process reply message
* @return this builder instance
*/
RpcServerBuilder replyPostProcessor(BiFunction<Message, Object, Message> replyPostProcessor);
ResponderBuilder replyPostProcessor(BiFunction<Message, Object, Message> replyPostProcessor);

/**
* The time the server waits for all outstanding requests to be processed before closing.
Expand All @@ -73,12 +73,12 @@ public interface RpcServerBuilder {
* @param closeTimeout close timeout
* @return this builder instance
*/
RpcServerBuilder closeTimeout(Duration closeTimeout);
ResponderBuilder closeTimeout(Duration closeTimeout);

/**
* Create the configured instance.
*
* @return the configured instance
*/
RpcServer build();
Responder build();
}
Loading