diff --git a/src/docs/asciidoc/usage.adoc b/src/docs/asciidoc/usage.adoc index 0a012e4b51..d69b9de0c5 100644 --- a/src/docs/asciidoc/usage.adoc +++ b/src/docs/asciidoc/usage.adoc @@ -165,45 +165,45 @@ rabbitmq_amqp_published_released_total 1.0 rabbitmq_amqp_publishers 1.0 ------ -=== Remote Procedure Call (RPC) +=== Request/Response -Remote procedure call with RabbitMQ consists in a client sending a request message and a server replying with a response message. -Both the RPC client and server are _client applications_ and the messages flow through the broker. -The RPC client must send a reply-to queue address with the request. -The RPC server uses this reply-to queue address to send the response. -There must also be a way to correlate a request with its response, this is usually handled with a header that the RPC client and server agree on. +Request/response with RabbitMQ consists in a requester sending a request message and a responder replying with a response message. +Both the requester and responder are _client applications_ and the messages flow through the broker. +The requester must send a reply-to queue address with the request. +The responder uses this reply-to queue address to send the response. +There must also be a way to correlate a request with its response, this is usually handled with a header that the requester and responder agree on. -The library provides RPC client and server support classes. +The library provides requester and responder support classes. They use sensible defaults and some of the internal mechanics are configurable. -They should meet the requirements of most RPC use cases. -It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the RPC support classes do. +They should meet the requirements of most request/response use cases. +It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the request/response support classes do. -Here is how to create an RPC server instance: +Here is how to create a responder instance: -.Creating an RPC server +.Creating a responder [source,java,indent=0] -------- -include::{test-examples}/RpcApi.java[tag=rpc-server-creation] +include::{test-examples}/RequestResponseApi.java[tag=responder-creation] -------- <1> Use builder from connection <2> Set the queue to consume requests from (it must exist) <3> Define the processing logic <4> Create the reply message -Note the RPC server does not create the queue it waits requests on. +Note the responder does not create the queue it waits requests on. It must be created beforehand. -Here is how to create an RPC client: +Here is how to create a requester: -.Creating an RPC client +.Creating a requester [source,java,indent=0] -------- -include::{test-examples}/RpcApi.java[tag=rpc-client-creation] +include::{test-examples}/RequestResponseApi.java[tag=requester-creation] -------- <1> Use builder from connection <2> Set the address to send request messages to -The RPC client will send its request to the configured destination. +The requester will send its request to the configured destination. It can be an exchange or a queue, like in the example above. Here is how to send a request: @@ -211,48 +211,48 @@ Here is how to send a request: .Sending a request [source,java,indent=0] -------- -include::{test-examples}/RpcApi.java[tag=rpc-client-request] +include::{test-examples}/RequestResponseApi.java[tag=requester-request] -------- <1> Create the message request <2> Send the request <3> Wait for the reply (synchronously) -The `RpcClient#publish(Message)` method returns a `CompletableFuture` that holds the reply message. +The `Requester#publish(Message)` method returns a `CompletableFuture` that holds the reply message. It is then possible to wait for the reply asynchronously or synchronously. -The RPC server has the following behavior: +The responder has the following behavior: * when receiving a message request, it calls the processing logic (handler), extracts the correlation ID, calls a reply post-processor if defined, and sends the reply message. -* if all these operations succeed, the server accepts the request message (settles it with the `ACCEPTED` outcome). -* if any of these operations throws an exception, the server discards the request message (the message is removed from the request queue and is https://www.rabbitmq.com/client-libraries/amqp-client-libraries#message-processing-result-outcome[dead-lettered] if configured). +* if all these operations succeed, the responder accepts the request message (settles it with the `ACCEPTED` outcome). +* if any of these operations throws an exception, the responder discards the request message (the message is removed from the request queue and is https://www.rabbitmq.com/client-libraries/amqp-client-libraries#message-processing-result-outcome[dead-lettered] if configured). -The RPC server uses the following defaults: +The responder uses the following defaults: * it uses the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`message-id` property] for the correlation ID. * it assigns the correlation ID (so the _request_ `message-id` by default) to the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`correlation-id` property]. * it assigns the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`to` property] if it is defined. -This behavior is hardcoded but it is possible to cancel it thanks to a reply post-processor. +This behavior is hardcoded, but it is possible to override it thanks to a reply post-processor. -The RPC client uses the following defaults: +The requester uses the following defaults: * it uses https://www.rabbitmq.com/docs/direct-reply-to[direct reply-to] if available (RabbitMQ 4.2 or more) for replies if no reply-to queue is set (it falls back to an auto-delete, exclusive queue if direct reply-to is not available) * it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix (`{UUID}-{sequence}`, e.g. `6f839461-6b19-47e1-80b3-6be10d899d85-42`). -The prefix is different for each `RpcClient` instance and the suffix is incremented by one for each request. +The prefix is different for each `Requester` instance and the suffix is incremented by one for each request. * it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the reply-to queue address (defined by the user or created automatically, see above). * it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`message-id` property] to the generated correlation ID. * it extracts the correlation ID from the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`correlation-id` property] to correlate a reply with the appropriate request. -Let's see how to customize some of the RPC support mechanics. +Let's see how to customize some of the request/response support mechanics. Imagine the request `message-id` property is a critical piece of information and we do not want to use it as the correlation ID. -The request can use the `correlation-id` property and the RPC server just has to extract the correlation ID from this property (instead of the `message-id` property by default). +The request can use the `correlation-id` property and the responder just has to extract the correlation ID from this property (instead of the `message-id` property by default). Let's also use a random UUID for the correlation ID generation (avoids doing this in production: this is OK in terms of uniqueness but not optimal in terms of performance because randomness is not cheap). -Here is how to declare the RPC client: +Here is how to declare the requester: -.Customizing the RPC client +.Customizing the requester [source,java,indent=0] -------- -include::{test-examples}/RpcApi.java[tag=rpc-custom-client-creation] +include::{test-examples}/RequestResponseApi.java[tag=custom-requester-creation] -------- <1> Declare the reply-to queue <2> Use a random UUID as correlation ID @@ -260,11 +260,11 @@ include::{test-examples}/RpcApi.java[tag=rpc-custom-client-creation] <4> Set the `reply-to` property <5> Set the address to send request messages to -We just have to tell the RPC server to get the correlation ID from the request `correlation-id` property: +We just have to tell the responder to get the correlation ID from the request `correlation-id` property: -.Customizing the RPC server +.Customizing the responder [source,java,indent=0] -------- -include::{test-examples}/RpcApi.java[tag=rpc-custom-server-creation] +include::{test-examples}/RequestResponseApi.java[tag=custom-responder-creation] -------- <1> Get the correlation ID from the request `correlation-id` property \ No newline at end of file diff --git a/src/main/java/com/rabbitmq/client/amqp/Connection.java b/src/main/java/com/rabbitmq/client/amqp/Connection.java index a51d2d86aa..97fb2965a5 100644 --- a/src/main/java/com/rabbitmq/client/amqp/Connection.java +++ b/src/main/java/com/rabbitmq/client/amqp/Connection.java @@ -44,18 +44,18 @@ public interface Connection extends Closeable, Resource { ConsumerBuilder consumerBuilder(); /** - * Create a builder to configure and create a {@link RpcClientBuilder}. + * Create a builder to configure and create a {@link RequesterBuilder}. * - * @return RPC client builder + * @return requester builder */ - RpcClientBuilder rpcClientBuilder(); + RequesterBuilder requesterBuilder(); /** - * Create a builder to configure and create a {@link RpcServerBuilder}. + * Create a builder to configure and create a {@link Responder}. * - * @return RPC server builder + * @return responder builder */ - RpcServerBuilder rpcServerBuilder(); + ResponderBuilder responderBuilder(); /** Close the connection and its resources */ @Override diff --git a/src/main/java/com/rabbitmq/client/amqp/RpcClient.java b/src/main/java/com/rabbitmq/client/amqp/Requester.java similarity index 90% rename from src/main/java/com/rabbitmq/client/amqp/RpcClient.java rename to src/main/java/com/rabbitmq/client/amqp/Requester.java index bd30f89301..deaf61efa1 100644 --- a/src/main/java/com/rabbitmq/client/amqp/RpcClient.java +++ b/src/main/java/com/rabbitmq/client/amqp/Requester.java @@ -20,11 +20,11 @@ import java.util.concurrent.CompletableFuture; /** - * Client support class for RPC. + * Requester support class for request/response interaction. * - * @see RpcClientBuilder + * @see RequesterBuilder */ -public interface RpcClient extends AutoCloseable { +public interface Requester extends AutoCloseable { /** * Create a message meant to be published by the underlying publisher instance. @@ -55,7 +55,7 @@ public interface RpcClient extends AutoCloseable { */ CompletableFuture publish(Message message); - /** Close the RPC client and its resources. */ + /** Close the requester and its resources. */ @Override void close(); } diff --git a/src/main/java/com/rabbitmq/client/amqp/RpcClientBuilder.java b/src/main/java/com/rabbitmq/client/amqp/RequesterBuilder.java similarity index 77% rename from src/main/java/com/rabbitmq/client/amqp/RpcClientBuilder.java rename to src/main/java/com/rabbitmq/client/amqp/RequesterBuilder.java index fb21ba843e..3419ae1d0a 100644 --- a/src/main/java/com/rabbitmq/client/amqp/RpcClientBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/RequesterBuilder.java @@ -22,27 +22,28 @@ import java.util.function.Function; import java.util.function.Supplier; -/** API to configure and create a {@link RpcClient}. */ -public interface RpcClientBuilder { +/** API to configure and create a {@link Requester}. */ +public interface RequesterBuilder { /** * Builder for the request address. * * @return the request address builder */ - RpcClientAddressBuilder requestAddress(); + RequesterAddressBuilder requestAddress(); /** * The queue the client expects responses on. * *

The queue must exist if it is set. * - *

The RPC client will create an exclusive, auto-delete queue if it is not set. + *

The requester will a direct reply-to queue (RabbitMQ 4.2 or more) or create an exclusive, + * auto-delete queue if this parameter is not set. * * @param replyToQueue reply queue * @return this builder instance */ - RpcClientBuilder replyToQueue(String replyToQueue); + RequesterBuilder replyToQueue(String replyToQueue); /** * The generator for correlation ID. @@ -53,7 +54,7 @@ public interface RpcClientBuilder { * @param correlationIdSupplier correlation ID generator * @return the this builder instance */ - RpcClientBuilder correlationIdSupplier(Supplier correlationIdSupplier); + RequesterBuilder correlationIdSupplier(Supplier correlationIdSupplier); /** * A callback before sending a request message. @@ -67,7 +68,7 @@ public interface RpcClientBuilder { * @param requestPostProcessor logic to post-process request message * @return this builder instance */ - RpcClientBuilder requestPostProcessor(BiFunction requestPostProcessor); + RequesterBuilder requestPostProcessor(BiFunction requestPostProcessor); /** * Callback to extract the correlation ID from a reply message. @@ -80,7 +81,7 @@ public interface RpcClientBuilder { * @param correlationIdExtractor correlation ID extractor * @return this builder instance */ - RpcClientBuilder correlationIdExtractor(Function correlationIdExtractor); + RequesterBuilder correlationIdExtractor(Function correlationIdExtractor); /** * Timeout before failing outstanding requests. @@ -88,23 +89,23 @@ public interface RpcClientBuilder { * @param timeout timeout * @return the builder instance */ - RpcClientBuilder requestTimeout(Duration timeout); + RequesterBuilder requestTimeout(Duration timeout); /** * Build the configured instance. * * @return the configured instance */ - RpcClient build(); + Requester build(); /** Builder for the request address. */ - interface RpcClientAddressBuilder extends AddressBuilder { + interface RequesterAddressBuilder extends AddressBuilder { /** - * Go back to the RPC client builder. + * Go back to the requester builder. * - * @return the RPC client builder + * @return the requester builder */ - RpcClientBuilder rpcClient(); + RequesterBuilder requester(); } } diff --git a/src/main/java/com/rabbitmq/client/amqp/RpcServer.java b/src/main/java/com/rabbitmq/client/amqp/Responder.java similarity index 93% rename from src/main/java/com/rabbitmq/client/amqp/RpcServer.java rename to src/main/java/com/rabbitmq/client/amqp/Responder.java index 773e2f0012..cc4176d88e 100644 --- a/src/main/java/com/rabbitmq/client/amqp/RpcServer.java +++ b/src/main/java/com/rabbitmq/client/amqp/Responder.java @@ -18,11 +18,11 @@ package com.rabbitmq.client.amqp; /** - * Client server class for RPC. + * Responder class for request/response interaction. * - * @see RpcServerBuilder + * @see ResponderBuilder */ -public interface RpcServer extends AutoCloseable { +public interface Responder extends AutoCloseable { /** Contract to process a request message and return a reply message. */ @FunctionalInterface @@ -87,7 +87,7 @@ interface Context { /** Request to receive messages again. */ void unpause(); - /** Close the RPC server and its resources. */ + /** Close the responder and its resources. */ @Override void close(); } diff --git a/src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/ResponderBuilder.java similarity index 85% rename from src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java rename to src/main/java/com/rabbitmq/client/amqp/ResponderBuilder.java index 20e7eba787..05679669bc 100644 --- a/src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/ResponderBuilder.java @@ -21,8 +21,8 @@ import java.util.function.BiFunction; import java.util.function.Function; -/** API to configure and create a {@link RpcServer}. */ -public interface RpcServerBuilder { +/** API to configure and create a {@link Responder}. */ +public interface ResponderBuilder { /** * The queue to wait for requests on. @@ -30,7 +30,7 @@ public interface RpcServerBuilder { * @param requestQueue request queue * @return this builder instance */ - RpcServerBuilder requestQueue(String requestQueue); + ResponderBuilder requestQueue(String requestQueue); /** * The logic to process requests and issue replies. @@ -38,7 +38,7 @@ public interface RpcServerBuilder { * @param handler handler * @return this builder instance */ - RpcServerBuilder handler(RpcServer.Handler handler); + ResponderBuilder handler(Responder.Handler handler); /** * Logic to extract the correlation ID from a request message. @@ -48,7 +48,7 @@ public interface RpcServerBuilder { * @param correlationIdExtractor logic to extract the correlation ID * @return this builder instance */ - RpcServerBuilder correlationIdExtractor(Function correlationIdExtractor); + ResponderBuilder correlationIdExtractor(Function correlationIdExtractor); /** * A callback called after request processing but before sending the reply message. @@ -61,7 +61,7 @@ public interface RpcServerBuilder { * @param replyPostProcessor logic to post-process reply message * @return this builder instance */ - RpcServerBuilder replyPostProcessor(BiFunction replyPostProcessor); + ResponderBuilder replyPostProcessor(BiFunction replyPostProcessor); /** * The time the server waits for all outstanding requests to be processed before closing. @@ -73,12 +73,12 @@ public interface RpcServerBuilder { * @param closeTimeout close timeout * @return this builder instance */ - RpcServerBuilder closeTimeout(Duration closeTimeout); + ResponderBuilder closeTimeout(Duration closeTimeout); /** * Create the configured instance. * * @return the configured instance */ - RpcServer build(); + Responder build(); } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index d9a77108b4..f92436d5b2 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -41,10 +41,10 @@ import com.rabbitmq.client.amqp.ObservationCollector; import com.rabbitmq.client.amqp.Publisher; import com.rabbitmq.client.amqp.PublisherBuilder; -import com.rabbitmq.client.amqp.RpcClient; -import com.rabbitmq.client.amqp.RpcClientBuilder; -import com.rabbitmq.client.amqp.RpcServer; -import com.rabbitmq.client.amqp.RpcServerBuilder; +import com.rabbitmq.client.amqp.Requester; +import com.rabbitmq.client.amqp.RequesterBuilder; +import com.rabbitmq.client.amqp.Responder; +import com.rabbitmq.client.amqp.ResponderBuilder; import com.rabbitmq.client.amqp.impl.Tuples.Pair; import com.rabbitmq.client.amqp.impl.Utils.RunnableWithException; import com.rabbitmq.client.amqp.impl.Utils.StopWatch; @@ -106,8 +106,8 @@ final class AmqpConnection extends ResourceBase implements Connection { private volatile Session nativeSession; private final List publishers = new CopyOnWriteArrayList<>(); private final List consumers = new CopyOnWriteArrayList<>(); - private final List rpcClients = new CopyOnWriteArrayList<>(); - private final List rpcServers = new CopyOnWriteArrayList<>(); + private final List requesters = new CopyOnWriteArrayList<>(); + private final List responders = new CopyOnWriteArrayList<>(); private final TopologyListener topologyListener; private final EntityRecovery entityRecovery; private final AtomicBoolean recoveringConnection = new AtomicBoolean(false); @@ -253,13 +253,13 @@ public ConsumerBuilder consumerBuilder() { } @Override - public RpcClientBuilder rpcClientBuilder() { - return new RpcSupport.AmqpRpcClientBuilder(this); + public RequesterBuilder requesterBuilder() { + return new RequestResponseSupport.AmqpRequesterBuilder(this); } @Override - public RpcServerBuilder rpcServerBuilder() { - return new RpcSupport.AmqpRpcServerBuilder(this); + public ResponderBuilder responderBuilder() { + return new RequestResponseSupport.AmqpResponderBuilder(this); } @Override @@ -802,24 +802,24 @@ void removeConsumer(AmqpConsumer consumer) { this.topologyListener.consumerDeleted(consumer.id(), consumer.queue()); } - RpcClient createRpcClient(RpcSupport.AmqpRpcClientBuilder builder) { - RpcClient rpcClient = new AmqpRpcClient(builder); - this.rpcClients.add(rpcClient); - return rpcClient; + Requester createRequester(RequestResponseSupport.AmqpRequesterBuilder builder) { + Requester requester = new AmqpRequester(builder); + this.requesters.add(requester); + return requester; } - void removeRpcClient(RpcClient rpcClient) { - this.rpcClients.remove(rpcClient); + void removeRequester(Requester requester) { + this.requesters.remove(requester); } - RpcServer createRpcServer(RpcSupport.AmqpRpcServerBuilder builder) { - RpcServer rpcServer = new AmqpRpcServer(builder); - this.rpcServers.add(rpcServer); - return rpcServer; + Responder createResponder(RequestResponseSupport.AmqpResponderBuilder builder) { + Responder responder = new AmqpResponder(builder); + this.responders.add(responder); + return responder; } - void removeRpcServer(RpcServer rpcServer) { - this.rpcServers.remove(rpcServer); + void removeResponder(Responder responder) { + this.responders.remove(responder); } private void changeStateOfPublishers(State newState, Throwable failure) { @@ -900,11 +900,11 @@ private void close(Throwable cause) { } safeClose.accept("management", this::closeManagement); - for (RpcClient rpcClient : this.rpcClients) { - safeClose.accept("RPC client", rpcClient::close); + for (Requester requester : this.requesters) { + safeClose.accept("requester", requester::close); } - for (RpcServer rpcServer : this.rpcServers) { - safeClose.accept("RPC server", rpcServer::close); + for (Responder responder : this.responders) { + safeClose.accept("responder", responder::close); } for (AmqpPublisher publisher : this.publishers) { safeClose.accept("publisher", () -> publisher.close(cause)); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRequester.java similarity index 94% rename from src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java rename to src/main/java/com/rabbitmq/client/amqp/impl/AmqpRequester.java index 279afd3245..c3dfbaf984 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRequester.java @@ -21,7 +21,7 @@ import com.rabbitmq.client.amqp.Management; import com.rabbitmq.client.amqp.Message; import com.rabbitmq.client.amqp.Publisher; -import com.rabbitmq.client.amqp.RpcClient; +import com.rabbitmq.client.amqp.Requester; import java.time.Duration; import java.util.Iterator; import java.util.Map; @@ -38,9 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class AmqpRpcClient implements RpcClient { +final class AmqpRequester implements Requester { - private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRequester.class); private static final Publisher.Callback NO_OP_CALLBACK = ctx -> {}; @@ -56,7 +56,7 @@ final class AmqpRpcClient implements RpcClient { private final Duration requestTimeout; private final ScheduledFuture requestTimeoutFuture; - AmqpRpcClient(RpcSupport.AmqpRpcClientBuilder builder) { + AmqpRequester(RequestResponseSupport.AmqpRequesterBuilder builder) { this.connection = builder.connection(); this.clock = this.connection.clock(); @@ -176,21 +176,21 @@ public CompletableFuture publish(Message message) { @Override public void close() { if (this.closed.compareAndSet(false, true)) { - this.connection.removeRpcClient(this); + this.connection.removeRequester(this); this.requestTimeoutFuture.cancel(true); try { this.publisher.close(); } catch (Exception e) { - LOGGER.warn("Error while closing RPC client publisher: {}", e.getMessage()); + LOGGER.warn("Error while closing requester publisher: {}", e.getMessage()); } try { this.consumer.close(); } catch (Exception e) { - LOGGER.warn("Error while closing RPC client consumer: {}", e.getMessage()); + LOGGER.warn("Error while closing requester consumer: {}", e.getMessage()); } this.outstandingRequests .values() - .forEach(r -> r.future.completeExceptionally(new AmqpException("RPC client is closed"))); + .forEach(r -> r.future.completeExceptionally(new AmqpException("Requester is closed"))); } } @@ -209,7 +209,7 @@ Runnable requestTimeoutTask() { } catch (Exception e) { LOGGER.warn("Error while pruning timed out request: {}", e.getMessage()); } - request.future.completeExceptionally(new AmqpException("RPC request timed out")); + request.future.completeExceptionally(new AmqpException("Request timed out")); } } }; @@ -217,7 +217,7 @@ Runnable requestTimeoutTask() { private void checkOpen() { if (this.closed.get()) { - throw new AmqpException("RPC client is closed"); + throw new AmqpException("Requester is closed"); } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpResponder.java similarity index 90% rename from src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java rename to src/main/java/com/rabbitmq/client/amqp/impl/AmqpResponder.java index 4b73c213dd..0d42401b36 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpResponder.java @@ -21,7 +21,7 @@ import com.rabbitmq.client.amqp.Consumer; import com.rabbitmq.client.amqp.Message; import com.rabbitmq.client.amqp.Publisher; -import com.rabbitmq.client.amqp.RpcServer; +import com.rabbitmq.client.amqp.Responder; import java.time.Duration; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,9 +31,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class AmqpRpcServer implements RpcServer { +final class AmqpResponder implements Responder { - private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcServer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AmqpResponder.class); private static final Publisher.Callback NO_OP_CALLBACK = ctx -> {}; @@ -57,7 +57,7 @@ final class AmqpRpcServer implements RpcServer { private final AtomicBoolean closed = new AtomicBoolean(false); private final Duration closeTimeout; - AmqpRpcServer(RpcSupport.AmqpRpcServerBuilder builder) { + AmqpResponder(RequestResponseSupport.AmqpResponderBuilder builder) { this.connection = builder.connection(); this.closeTimeout = builder.closeTimeout(); Handler handler = builder.handler(); @@ -141,7 +141,7 @@ public Message message(byte[] body) { ctx.accept(); } catch (Exception e) { LOGGER.info( - "Error while processing RPC request (correlation ID {}): {}", + "Error while processing request (correlation ID {}): {}", correlationId, e.getMessage()); ctx.discard(); @@ -165,25 +165,25 @@ public void unpause() { @Override public void close() { if (this.closed.compareAndSet(false, true)) { - this.connection.removeRpcServer(this); + this.connection.removeResponder(this); try { this.maybeWaitForUnsettledMessages(); } catch (Exception e) { - LOGGER.warn("Error while waiting for unsettled messages in RPC server: {}", e.getMessage()); + LOGGER.warn("Error while waiting for unsettled messages in responder: {}", e.getMessage()); } try { long unsettledMessageCount = this.consumer.unsettledMessageCount(); if (unsettledMessageCount > 0) { - LOGGER.info("Closing RPC server with {} unsettled message(s)", unsettledMessageCount); + LOGGER.info("Closing responder with {} unsettled message(s)", unsettledMessageCount); } this.consumer.close(); } catch (Exception e) { - LOGGER.warn("Error while closing RPC server consumer: {}", e.getMessage()); + LOGGER.warn("Error while closing responder consumer: {}", e.getMessage()); } try { this.publisher.close(); } catch (Exception e) { - LOGGER.warn("Error while closing RPC server publisher: {}", e.getMessage()); + LOGGER.warn("Error while closing responder publisher: {}", e.getMessage()); } } } @@ -197,9 +197,9 @@ private void sendReply(Message reply) { }, RESPONSE_SENDING_EXCEPTION_PREDICATE, RESPONSE_SENDING_RETRY_WAIT_TIMES, - "RPC Server Response"); + "Responder Response"); } catch (Exception e) { - LOGGER.info("Error while processing RPC request: {}", e.getMessage()); + LOGGER.info("Error while processing request: {}", e.getMessage()); } } @@ -221,7 +221,7 @@ private void maybeWaitForUnsettledMessages() { private void checkOpen() { if (this.closed.get()) { - throw new AmqpException.AmqpResourceClosedException("RPC server is closed"); + throw new AmqpException.AmqpResourceClosedException("Responder is closed"); } } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java b/src/main/java/com/rabbitmq/client/amqp/impl/RequestResponseSupport.java similarity index 70% rename from src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java rename to src/main/java/com/rabbitmq/client/amqp/impl/RequestResponseSupport.java index 6c1d1f2938..8b9b509242 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/RequestResponseSupport.java @@ -18,70 +18,70 @@ package com.rabbitmq.client.amqp.impl; import com.rabbitmq.client.amqp.Message; -import com.rabbitmq.client.amqp.RpcClient; -import com.rabbitmq.client.amqp.RpcClientBuilder; -import com.rabbitmq.client.amqp.RpcServer; -import com.rabbitmq.client.amqp.RpcServerBuilder; +import com.rabbitmq.client.amqp.Requester; +import com.rabbitmq.client.amqp.RequesterBuilder; +import com.rabbitmq.client.amqp.Responder; +import com.rabbitmq.client.amqp.ResponderBuilder; import java.time.Duration; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; -abstract class RpcSupport { +final class RequestResponseSupport { - private RpcSupport() {} + private RequestResponseSupport() {} - static class AmqpRpcClientBuilder implements RpcClientBuilder { + static class AmqpRequesterBuilder implements RequesterBuilder { private static final Duration REQUEST_TIMEOUT_MIN = Duration.ofSeconds(1); private final AmqpConnection connection; - private final DefaultRpcClientAddressBuilder requestAddressBuilder = - new DefaultRpcClientAddressBuilder(this); + private final DefaultRequesterAddressBuilder requestAddressBuilder = + new DefaultRequesterAddressBuilder(this); private String replyToQueue; private Supplier correlationIdSupplier; private BiFunction requestPostProcessor; private Function correlationIdExtractor; private Duration requestTimeout = Duration.ofSeconds(30); - AmqpRpcClientBuilder(AmqpConnection connection) { + AmqpRequesterBuilder(AmqpConnection connection) { this.connection = connection; } @Override - public RpcClientAddressBuilder requestAddress() { + public RequesterAddressBuilder requestAddress() { return this.requestAddressBuilder; } @Override - public RpcClientBuilder replyToQueue(String replyToQueue) { + public RequesterBuilder replyToQueue(String replyToQueue) { this.replyToQueue = replyToQueue; return this; } @Override - public RpcClientBuilder correlationIdSupplier(Supplier correlationIdSupplier) { + public RequesterBuilder correlationIdSupplier(Supplier correlationIdSupplier) { this.correlationIdSupplier = correlationIdSupplier; return this; } @Override - public RpcClientBuilder requestPostProcessor( + public RequesterBuilder requestPostProcessor( BiFunction requestPostProcessor) { this.requestPostProcessor = requestPostProcessor; return this; } @Override - public RpcClientBuilder correlationIdExtractor( + public RequesterBuilder correlationIdExtractor( Function correlationIdExtractor) { this.correlationIdExtractor = correlationIdExtractor; return this; } @Override - public RpcClientBuilder requestTimeout(Duration timeout) { + public RequesterBuilder requestTimeout(Duration timeout) { if (timeout == null) { throw new IllegalArgumentException("Request timeout cannot be null"); } @@ -98,8 +98,8 @@ Function correlationIdExtractor() { } @Override - public RpcClient build() { - return this.connection.createRpcClient(this); + public Requester build() { + return this.connection.createRequester(this); } AmqpConnection connection() { @@ -123,77 +123,77 @@ Duration requestTimeout() { } } - private static class DefaultRpcClientAddressBuilder - extends DefaultAddressBuilder - implements RpcClientBuilder.RpcClientAddressBuilder { + private static class DefaultRequesterAddressBuilder + extends DefaultAddressBuilder + implements RequesterBuilder.RequesterAddressBuilder { - private final AmqpRpcClientBuilder builder; + private final AmqpRequesterBuilder builder; - private DefaultRpcClientAddressBuilder(AmqpRpcClientBuilder builder) { + private DefaultRequesterAddressBuilder(AmqpRequesterBuilder builder) { super(null); this.builder = builder; } @Override - RpcClientBuilder.RpcClientAddressBuilder result() { + RequesterBuilder.RequesterAddressBuilder result() { return this; } @Override - public RpcClientBuilder rpcClient() { + public RequesterBuilder requester() { return this.builder; } } - static class AmqpRpcServerBuilder implements RpcServerBuilder { + static class AmqpResponderBuilder implements ResponderBuilder { private final AmqpConnection connection; private String requestQueue; - private RpcServer.Handler handler; + private Responder.Handler handler; private Function correlationIdExtractor; private BiFunction replyPostProcessor; private Duration closeTimeout = Duration.ofSeconds(60); - AmqpRpcServerBuilder(AmqpConnection connection) { + AmqpResponderBuilder(AmqpConnection connection) { this.connection = connection; } @Override - public RpcServerBuilder requestQueue(String requestQueue) { + public ResponderBuilder requestQueue(String requestQueue) { this.requestQueue = requestQueue; return this; } @Override - public RpcServerBuilder handler(RpcServer.Handler handler) { + public ResponderBuilder handler(Responder.Handler handler) { this.handler = handler; return this; } @Override - public RpcServerBuilder correlationIdExtractor( + public ResponderBuilder correlationIdExtractor( Function correlationIdExtractor) { this.correlationIdExtractor = correlationIdExtractor; return this; } @Override - public RpcServerBuilder replyPostProcessor( + public ResponderBuilder replyPostProcessor( BiFunction replyPostProcessor) { this.replyPostProcessor = replyPostProcessor; return this; } @Override - public RpcServerBuilder closeTimeout(Duration closeTimeout) { + public ResponderBuilder closeTimeout(Duration closeTimeout) { this.closeTimeout = closeTimeout; return this; } @Override - public RpcServer build() { - return this.connection.createRpcServer(this); + public Responder build() { + return this.connection.createResponder(this); } AmqpConnection connection() { @@ -204,7 +204,7 @@ String requestQueue() { return this.requestQueue; } - RpcServer.Handler handler() { + Responder.Handler handler() { return this.handler; } diff --git a/src/test/java/com/rabbitmq/client/amqp/docs/Api.java b/src/test/java/com/rabbitmq/client/amqp/docs/Api.java index ae13fc2f7e..3ae6685149 100644 --- a/src/test/java/com/rabbitmq/client/amqp/docs/Api.java +++ b/src/test/java/com/rabbitmq/client/amqp/docs/Api.java @@ -40,10 +40,10 @@ import com.rabbitmq.client.amqp.Publisher; import com.rabbitmq.client.amqp.PublisherBuilder; import com.rabbitmq.client.amqp.Resource; -import com.rabbitmq.client.amqp.RpcClient; -import com.rabbitmq.client.amqp.RpcClientBuilder; -import com.rabbitmq.client.amqp.RpcServer; -import com.rabbitmq.client.amqp.RpcServerBuilder; +import com.rabbitmq.client.amqp.Requester; +import com.rabbitmq.client.amqp.RequesterBuilder; +import com.rabbitmq.client.amqp.Responder; +import com.rabbitmq.client.amqp.ResponderBuilder; import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider; import com.rabbitmq.client.amqp.observation.micrometer.MicrometerObservationCollectorBuilder; import io.micrometer.observation.ObservationRegistry; diff --git a/src/test/java/com/rabbitmq/client/amqp/docs/RpcApi.java b/src/test/java/com/rabbitmq/client/amqp/docs/RequestResponseApi.java similarity index 54% rename from src/test/java/com/rabbitmq/client/amqp/docs/RpcApi.java rename to src/test/java/com/rabbitmq/client/amqp/docs/RequestResponseApi.java index fec6c10c0a..d9e6a9d51f 100644 --- a/src/test/java/com/rabbitmq/client/amqp/docs/RpcApi.java +++ b/src/test/java/com/rabbitmq/client/amqp/docs/RequestResponseApi.java @@ -1,9 +1,9 @@ package com.rabbitmq.client.amqp.docs; -import com.rabbitmq.client.amqp.RpcClient; +import com.rabbitmq.client.amqp.Requester; import com.rabbitmq.client.amqp.Connection; import com.rabbitmq.client.amqp.Message; -import com.rabbitmq.client.amqp.RpcServer; +import com.rabbitmq.client.amqp.Responder; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -11,60 +11,60 @@ import static java.nio.charset.StandardCharsets.UTF_8; -public class RpcApi { +public class RequestResponseApi { - void rpcWithDefaults() throws Exception { + void withDefaults() throws Exception { Connection connection = null; - // tag::rpc-server-creation[] - RpcServer rpcServer = connection.rpcServerBuilder() // <1> - .requestQueue("rpc-server") // <2> + // tag::responder-creation[] + Responder responder = connection.responderBuilder() // <1> + .requestQueue("request-queue") // <2> .handler((ctx, req) -> { // <3> String in = new String(req.body(), UTF_8); String out = "*** " + in + " ***"; return ctx.message(out.getBytes(UTF_8)); // <4> }).build(); - // end::rpc-server-creation[] + // end::responder-creation[] - // tag::rpc-client-creation[] - RpcClient rpcClient = connection.rpcClientBuilder() // <1> - .requestAddress().queue("rpc-server") // <2> - .rpcClient() + // tag::requester-creation[] + Requester requester = connection.requesterBuilder() // <1> + .requestAddress().queue("request-queue") // <2> + .requester() .build(); - // end::rpc-client-creation[] + // end::requester-creation[] - // tag::rpc-client-request[] - Message request = rpcClient.message("hello".getBytes(UTF_8)); // <1> - CompletableFuture replyFuture = rpcClient.publish(request); // <2> + // tag::requester-request[] + Message request = requester.message("hello".getBytes(UTF_8)); // <1> + CompletableFuture replyFuture = requester.publish(request); // <2> Message reply = replyFuture.get(10, TimeUnit.SECONDS); // <3> - // end::rpc-client-request[] + // end::requester-request[] } - void rpcWithCustomSettings() throws Exception { + void withCustomSettings() { Connection connection = null; - // tag::rpc-custom-client-creation[] + // tag::custom-requester-creation[] String replyToQueue = connection.management().queue() .autoDelete(true).exclusive(true) .declare().name(); // <1> - RpcClient rpcClient = connection.rpcClientBuilder() + Requester requester = connection.requesterBuilder() .correlationIdSupplier(UUID::randomUUID) // <2> .requestPostProcessor((msg, corrId) -> msg.correlationId(corrId) // <3> .replyToAddress().queue(replyToQueue).message()) // <4> .replyToQueue(replyToQueue) - .requestAddress().queue("rpc-server") // <5> - .rpcClient() + .requestAddress().queue("request-queue") // <5> + .requester() .build(); - // end::rpc-custom-client-creation[] + // end::custom-requester-creation[] - // tag::rpc-custom-server-creation[] - RpcServer rpcServer = connection.rpcServerBuilder() + // tag::custom-responder-creation[] + Responder responder = connection.responderBuilder() .correlationIdExtractor(Message::correlationId) // <1> - .requestQueue("rpc-server") + .requestQueue("request-queue") .handler((ctx, req) -> { String in = new String(req.body(), UTF_8); String out = "*** " + in + " ***"; return ctx.message(out.getBytes(UTF_8)); }).build(); - // end::rpc-custom-server-creation[] + // end::custom-responder-creation[] } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/RequestResponseTest.java similarity index 88% rename from src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java rename to src/test/java/com/rabbitmq/client/amqp/impl/RequestResponseTest.java index 0285f898b4..425e002a9a 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/RequestResponseTest.java @@ -34,9 +34,9 @@ import com.rabbitmq.client.amqp.Environment; import com.rabbitmq.client.amqp.Management; import com.rabbitmq.client.amqp.Message; +import com.rabbitmq.client.amqp.Requester; import com.rabbitmq.client.amqp.Resource; -import com.rabbitmq.client.amqp.RpcClient; -import com.rabbitmq.client.amqp.RpcServer; +import com.rabbitmq.client.amqp.Responder; import com.rabbitmq.client.amqp.impl.TestUtils.Sync; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -50,9 +50,9 @@ import org.junit.jupiter.params.provider.ValueSource; @AmqpTestInfrastructure -public class RpcTest { +public class RequestResponseTest { - private static final RpcServer.Handler HANDLER = + private static final Responder.Handler HANDLER = (ctx, request) -> { String in = new String(request.body(), UTF_8); return ctx.message(process(in).getBytes(UTF_8)); @@ -72,21 +72,21 @@ static void tearDownAll() { } @Test - void rpcWithDefaults() { + void withDefaults() { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); - serverConnection.rpcServerBuilder().requestQueue(requestQueue).handler(HANDLER).build(); + serverConnection.responderBuilder().requestQueue(requestQueue).handler(HANDLER).build(); int requestCount = 100; Sync sync = sync(requestCount); @@ -97,7 +97,7 @@ void rpcWithDefaults() { () -> { String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); Message response = responseFuture.get(10, TimeUnit.SECONDS); assertThat(response.body()).asString(UTF_8).isEqualTo(process(request)); sync.down(); @@ -108,23 +108,23 @@ void rpcWithDefaults() { } @Test - void rpcIsRequesterAliveShouldReturnTrueIfPublisherStillOpen() throws Exception { + void isRequesterAliveShouldReturnTrueIfPublisherStillOpen() throws Exception { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); List calls = new CopyOnWriteArrayList<>(); serverConnection - .rpcServerBuilder() + .responderBuilder() .requestQueue(requestQueue) .handler( (ctx, request) -> { @@ -135,7 +135,7 @@ void rpcIsRequesterAliveShouldReturnTrueIfPublisherStillOpen() throws Exception String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); Message response = responseFuture.get(10, TimeUnit.SECONDS); assertThat(response.body()).asString(UTF_8).isEqualTo(process(request)); assertThat(calls).containsExactly(true); @@ -143,18 +143,18 @@ void rpcIsRequesterAliveShouldReturnTrueIfPublisherStillOpen() throws Exception } @Test - void rpcIsRequesterAliveShouldReturnFalseIfPublisherClosed() { + void isRequesterAliveShouldReturnFalseIfPublisherClosed() { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); Sync requestReceivedSync = sync(); @@ -162,7 +162,7 @@ void rpcIsRequesterAliveShouldReturnFalseIfPublisherClosed() { List calls = new CopyOnWriteArrayList<>(); serverConnection - .rpcServerBuilder() + .responderBuilder() .requestQueue(requestQueue) .handler( (ctx, request) -> { @@ -176,10 +176,10 @@ void rpcIsRequesterAliveShouldReturnFalseIfPublisherClosed() { String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); assertThat(requestReceivedSync).completes(); assertThat(calls).containsExactly(true); - rpcClient.close(); + requester.close(); assertThat(responseFuture).completesExceptionallyWithin(Duration.ofSeconds(10)); requesterClosedSync.down(); waitAtMost(() -> calls.size() == 2); @@ -189,7 +189,7 @@ void rpcIsRequesterAliveShouldReturnFalseIfPublisherClosed() { } @Test - void rpcWithCustomSettings() { + void withCustomSettings() { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { @@ -200,9 +200,9 @@ void rpcWithCustomSettings() { // we are using application properties for the correlation ID and the reply-to queue // (instead of the standard properties) - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .correlationIdSupplier(correlationIdSequence::getAndIncrement) .requestPostProcessor( (msg, corrId) -> @@ -212,11 +212,11 @@ void rpcWithCustomSettings() { .replyToQueue(replyToQueue) .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); serverConnection - .rpcServerBuilder() + .responderBuilder() .correlationIdExtractor(msg -> msg.property("message-id")) .replyPostProcessor((msg, corrId) -> msg.property("correlation-id", (Long) corrId)) .requestQueue(requestQueue) @@ -239,7 +239,7 @@ void rpcWithCustomSettings() { () -> { String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); Message response = responseFuture.get(10, TimeUnit.SECONDS); org.assertj.core.api.Assertions.assertThat(response.body()) .asString(UTF_8) @@ -252,7 +252,7 @@ void rpcWithCustomSettings() { } @Test - void rpcUseCorrelationIdRequestProperty() { + void useCorrelationIdRequestProperty() { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { @@ -260,9 +260,9 @@ void rpcUseCorrelationIdRequestProperty() { String replyToQueue = clientConnection.management().queue().autoDelete(true).exclusive(true).declare().name(); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .correlationIdSupplier(UUID::randomUUID) .requestPostProcessor( (msg, corrId) -> @@ -270,11 +270,11 @@ void rpcUseCorrelationIdRequestProperty() { .replyToQueue(replyToQueue) .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); serverConnection - .rpcServerBuilder() + .responderBuilder() .correlationIdExtractor(Message::correlationId) .requestQueue(requestQueue) .handler(HANDLER) @@ -289,7 +289,7 @@ void rpcUseCorrelationIdRequestProperty() { () -> { String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); Message response = responseFuture.get(10, TimeUnit.SECONDS); assertThat(response.body()).asString(UTF_8).isEqualTo(process(request)); sync.down(); @@ -301,7 +301,7 @@ void rpcUseCorrelationIdRequestProperty() { @ParameterizedTest @ValueSource(booleans = {true, false}) - void rpcShouldRecoverAfterConnectionIsClosed(boolean isolateResources) + void shouldRecoverAfterConnectionIsClosed(boolean isolateResources) throws ExecutionException, InterruptedException, TimeoutException { String clientConnectionName = UUID.randomUUID().toString(); Sync clientConnectionSync = sync(); @@ -328,41 +328,41 @@ void rpcShouldRecoverAfterConnectionIsClosed(boolean isolateResources) .backOffDelayPolicy(backOffDelayPolicy) .connectionBuilder() .build()) { - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); - serverConnection.rpcServerBuilder().requestQueue(requestQueue).handler(HANDLER).build(); + serverConnection.responderBuilder().requestQueue(requestQueue).handler(HANDLER).build(); byte[] requestBody = request(UUID.randomUUID().toString()); CompletableFuture response = - rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID())); + requester.publish(requester.message(requestBody).messageId(UUID.randomUUID())); assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody)); Cli.closeConnection(clientConnectionName); requestBody = request(UUID.randomUUID().toString()); try { - rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID())); + requester.publish(requester.message(requestBody).messageId(UUID.randomUUID())); fail("Client connection is recovering, the call should have failed"); } catch (AmqpException e) { // OK } assertThat(clientConnectionSync).completes(); requestBody = request(UUID.randomUUID().toString()); - response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID())); + response = requester.publish(requester.message(requestBody).messageId(UUID.randomUUID())); assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody)); Cli.closeConnection(serverConnectionName); requestBody = request(UUID.randomUUID().toString()); - response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID())); + response = requester.publish(requester.message(requestBody).messageId(UUID.randomUUID())); assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody)); assertThat(serverConnectionSync).completes(); requestBody = request(UUID.randomUUID().toString()); - response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID())); + response = requester.publish(requester.message(requestBody).messageId(UUID.randomUUID())); assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody)); } finally { serverConnection.management().queueDelete(requestQueue); @@ -378,7 +378,7 @@ void poisonRequestsShouldTimeout() { String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); serverConnection - .rpcServerBuilder() + .responderBuilder() .requestQueue(requestQueue) .handler( (ctx, msg) -> { @@ -393,13 +393,13 @@ void poisonRequestsShouldTimeout() { .build(); Duration requestTimeout = Duration.ofSeconds(1); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestTimeout(requestTimeout) .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); int requestCount = 100; @@ -421,7 +421,7 @@ void poisonRequestsShouldTimeout() { executorService.submit( () -> { CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); responseFuture.handle( (msg, ex) -> { if (ex != null) { @@ -438,14 +438,14 @@ void poisonRequestsShouldTimeout() { } @Test - void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() { + void outstandingRequestsShouldCompleteExceptionallyOnRequesterClosing() { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); serverConnection - .rpcServerBuilder() + .responderBuilder() .requestQueue(requestQueue) .handler( (ctx, msg) -> { @@ -459,12 +459,12 @@ void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() { .replyPostProcessor((r, corrId) -> r == null ? null : r.correlationId(corrId)) .build(); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); int requestCount = 100; @@ -487,7 +487,7 @@ void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() { executorService.submit( () -> { CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); responseFuture.handle( (msg, ex) -> { if (ex == null) { @@ -503,7 +503,7 @@ void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() { assertThat(allRequestSubmitted).completes(); waitAtMost(() -> completedRequestCount.get() == requestCount - expectedPoisonCount.get()); assertThat(timedOutRequestCount).hasValue(0); - rpcClient.close(); + requester.close(); assertThat(timedOutRequestCount).hasPositiveValue().hasValue(expectedPoisonCount.get()); } } @@ -525,17 +525,17 @@ void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info) management.queue().exclusive(true).deadLetterExchange(dlx).declare().name(); Duration requestTimeout = Duration.ofSeconds(1); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestTimeout(requestTimeout) .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); serverConnection - .rpcServerBuilder() + .responderBuilder() .requestQueue(requestQueue) .handler( (ctx, request) -> { @@ -549,7 +549,7 @@ void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info) String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); Message response = responseFuture.get(10, TimeUnit.SECONDS); assertThat(response.body()).asString(UTF_8).isEqualTo(process(request)); @@ -557,7 +557,7 @@ void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info) request = "poison"; CompletableFuture poisonFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); waitAtMost(() -> management.queueInfo(dlq).messageCount() == 1); assertThatThrownBy( () -> poisonFuture.get(requestTimeout.multipliedBy(3).toMillis(), MILLISECONDS)) @@ -567,25 +567,25 @@ void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info) } @Test - void rpcServerShouldWaitForAllOutstandingMessagesToBeProcessedBeforeClosingInternalConsumer() + void responderShouldWaitForAllOutstandingMessagesToBeProcessedBeforeClosingInternalConsumer() throws ExecutionException, InterruptedException, TimeoutException { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); Sync receivedSync = sync(); - RpcServer rpcServer = + Responder responder = serverConnection - .rpcServerBuilder() + .responderBuilder() .requestQueue(requestQueue) .handler( (ctx, request) -> { @@ -601,35 +601,35 @@ void rpcServerShouldWaitForAllOutstandingMessagesToBeProcessedBeforeClosingInter String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); assertThat(receivedSync).completes(); - rpcServer.close(); + responder.close(); Message response = responseFuture.get(10, TimeUnit.SECONDS); assertThat(response.body()).asString(UTF_8).isEqualTo(process(request)); } } @Test - void outstandingRequestShouldTimeOutWhenRpcServerDoesNotCloseConsumerGracefully() { + void outstandingRequestShouldTimeOutWhenResponderDoesNotCloseConsumerGracefully() { try (Connection clientConnection = environment.connectionBuilder().build(); Connection serverConnection = environment.connectionBuilder().build()) { String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); Duration requestTimeout = Duration.ofSeconds(1); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestTimeout(requestTimeout) .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); Sync receivedSync = sync(); - RpcServer rpcServer = + Responder responder = serverConnection - .rpcServerBuilder() + .responderBuilder() .closeTimeout(Duration.ZERO) // close the consumer immediately .requestQueue(requestQueue) .handler( @@ -646,9 +646,9 @@ void outstandingRequestShouldTimeOutWhenRpcServerDoesNotCloseConsumerGracefully( String request = UUID.randomUUID().toString(); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); assertThat(receivedSync).completes(); - rpcServer.close(); + responder.close(); assertThatThrownBy( () -> responseFuture.get(requestTimeout.multipliedBy(3).toMillis(), MILLISECONDS)) .isInstanceOf(ExecutionException.class) @@ -665,18 +665,18 @@ void pauseUnpauseServer() throws ExecutionException, InterruptedException, Timeo String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); Duration requestTimeout = Duration.ofSeconds(1); - RpcClient rpcClient = + Requester requester = clientConnection - .rpcClientBuilder() + .requesterBuilder() .requestTimeout(requestTimeout) .requestAddress() .queue(requestQueue) - .rpcClient() + .requester() .build(); - RpcServer rpcServer = + Responder responder = serverConnection - .rpcServerBuilder() + .responderBuilder() .closeTimeout(Duration.ZERO) // close the consumer immediately .requestQueue(requestQueue) .handler(HANDLER) @@ -690,7 +690,7 @@ void pauseUnpauseServer() throws ExecutionException, InterruptedException, Timeo String request = UUID.randomUUID().toString(); expectedReplies.add(process(request)); CompletableFuture responseFuture = - rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + requester.publish(requester.message(request.getBytes(UTF_8))); futures.add(responseFuture); }; @@ -701,7 +701,7 @@ void pauseUnpauseServer() throws ExecutionException, InterruptedException, Timeo expectedReplies.clear(); futures.clear(); - rpcServer.pause(); + responder.pause(); int requestCount = 10; IntStream.range(0, requestCount).forEach(ignored -> sendRequest.run()); waitAtMost( @@ -710,7 +710,7 @@ void pauseUnpauseServer() throws ExecutionException, InterruptedException, Timeo futures.forEach(f -> assertThat(f).isNotCompleted()); - rpcServer.unpause(); + responder.unpause(); futures.forEach(f -> assertThat(getAndExtract(f)).isIn(expectedReplies));