diff --git a/README.md b/README.md index c57ac3ae..142b944f 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,26 @@ # RabbitMQ AMQP 1.0 .NET Client This library is meant to be used with RabbitMQ 4.0. -Still work in progress suitable for testing in pre-production environments +Suitable for testing in pre-production environments -## How to Run -- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker. -- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"` -- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop` +## Install -## Getting Started +The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/). -You can find an example in: `docs/Examples/GettingStarted` +## Examples -## Install +Inside the `docs/Examples` directory you can find examples of how to use the client. -The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/). ## Documentation +- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) - [API](https://rabbitmq.github.io/rabbitmq-amqp-dotnet-client/api/RabbitMQ.AMQP.Client.html) + + +## How to Run + +- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker. +- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"` +- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop` diff --git a/RabbitMQ.AMQP.Client/IRpcClient.cs b/RabbitMQ.AMQP.Client/IRpcClient.cs index 62aca52f..00b89b8b 100644 --- a/RabbitMQ.AMQP.Client/IRpcClient.cs +++ b/RabbitMQ.AMQP.Client/IRpcClient.cs @@ -10,23 +10,89 @@ public interface IRpcClientAddressBuilder : IAddressBuilder + /// IRpcClientBuilder is the interface for creating an RPC client. + /// See also and + /// public interface IRpcClientBuilder { + /// + /// Request address where the client sends requests. + /// The server consumes requests from this address. + /// + /// IRpcClientAddressBuilder RequestAddress(); + + /// + /// The queue from which requests are consumed. + /// if not set the client will create a temporary queue. + /// + /// The queue name + /// IRpcClientBuilder ReplyToQueue(string replyToQueueName); IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue); + + /// + /// Extracts the correlation id from the request message. + /// each message has a correlation id that is used to match the request with the response. + /// There are default implementations for the correlation id extractor. + /// With this method, you can provide a custom implementation. + /// + /// + /// IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor); + /// + /// Post processes the reply message before sending it to the server. + /// The object parameter is the correlation id extracted from the request message. + /// There are default implementations for the reply post processor that use the correlationId() field + /// to set the correlation id of the reply message. + /// With this method, you can provide a custom implementation. + /// + /// + /// IRpcClientBuilder RequestPostProcessor(Func? requestPostProcessor); + /// + /// Client and Server must agree on the correlation id. + /// The client will provide the correlation id to send to the server. + /// If the default correlation id is not suitable, you can provide a custom correlation id supplier. + /// Be careful to provide a unique correlation id for each request. + /// + /// + /// + IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier); + + /// + /// The time to wait for a reply from the server. + /// + /// + /// IRpcClientBuilder Timeout(TimeSpan timeout); + /// + /// Build and return the RPC client. + /// + /// Task BuildAsync(); } + /// + /// IRpcClient is the interface for an RPC client. + /// See also and + /// public interface IRpcClient : ILifeCycle { + /// + /// PublishAsync sends a request message to the server and blocks the thread until the response is received. + /// The PublishAsync is thread-safe and can be called from multiple threads. + /// The Function returns the response message. + /// If the server does not respond within the timeout, the function throws a TimeoutException. + /// + /// The request message + /// Cancellation token + /// Task PublishAsync(IMessage message, CancellationToken cancellationToken = default); } } diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs index 65d8f297..a96a4730 100644 --- a/RabbitMQ.AMQP.Client/IRpcServer.cs +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -3,8 +3,12 @@ namespace RabbitMQ.AMQP.Client { - public delegate Task RpcHandler(IRpcServer.IContext context, IMessage request); + /// + /// IRpcServerBuilder is the interface for creating an RPC server. + /// The RPC server consumes requests from a queue and sends replies to a reply queue. + /// See also and + /// public interface IRpcServerBuilder { /// @@ -45,9 +49,25 @@ public interface IRpcServerBuilder /// IRpcServerBuilder Handler(RpcHandler handler); + /// + /// Build and return the RPC server. + /// + /// Task BuildAsync(); } + /// + /// Event handler for handling RPC requests. + /// + public delegate Task RpcHandler(IRpcServer.IContext context, IMessage request); + + /// + /// IRpcServer interface for creating an RPC server. + /// The RPC is simulated by sending a request message and receiving a reply message. + /// Where the client sends the queue where wants to receive the reply. + /// RPC client ---> request queue ---> RPC server ---> reply queue ---> RPC client + /// See also + /// public interface IRpcServer : ILifeCycle { diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs index 2e557b2c..8ad98b57 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs @@ -188,12 +188,6 @@ public override async Task CloseAsync() } } - /// - /// PublishAsync sends a request message to the server and blocks the thread until the response is received. - /// - /// The request message - /// Cancellation token - /// public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) { await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index 7aa38543..05fecfc6 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -151,7 +151,7 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () => { Trace.WriteLine(TraceLevel.Error, $"Failed to send reply, retrying in {span}"); } - }, 3).ConfigureAwait(false); + }, 5).ConfigureAwait(false); } }) .Queue(_configuration.RequestQueue).BuildAndStartAsync() diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 366ede1e..86ce2064 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -318,5 +318,41 @@ public async Task RpcClientMultiThreadShouldBeSafe() await rpcServer.CloseAsync(); await rpcClient.CloseAsync(); } + + /// + /// The RPC client `PublishAsync` should raise a timeout exception if the server does not reply within the timeout + /// + [Fact] + public async Task RpcClientShouldRaiseTimeoutError() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + object millisecondsToWait = request.ApplicationProperty("wait"); + Thread.Sleep(TimeSpan.FromMilliseconds((int)millisecondsToWait)); + return Task.FromResult(reply); + }).RequestQueue(_queueName).BuildAsync(); + Assert.NotNull(rpcServer); + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .Timeout(TimeSpan.FromMilliseconds(300)) + .BuildAsync(); + + IMessage reply = await rpcClient.PublishAsync( + new AmqpMessage("ping").ApplicationProperty("wait", 1)); + Assert.Equal("pong", reply.Body()); + + await Assert.ThrowsAsync(() => rpcClient.PublishAsync( + new AmqpMessage("ping").ApplicationProperty("wait", 700))); + + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } } } diff --git a/docs/Examples/README.md b/docs/Examples/README.md new file mode 100644 index 00000000..afed97c2 --- /dev/null +++ b/docs/Examples/README.md @@ -0,0 +1,8 @@ +# RabbitMQ AMQP 1.0 .NET Client examples + +This directory contains examples of how to use the RabbitMQ AMQP 1.0 .NET client. + +- Getting Started with the Client [here](./GettingStarted/) +- RPC Server and Client [here](./Rpc/) +- How to write a reliable client [here](./HAClient/) +- Performance Test [here](./PerformanceTest/). You can tune some parameters in the `Program.cs` file.