Skip to content

Commit 7b9ba92

Browse files
Implement Direct Reply-To. Rename RpcClient/RpcServer to Requester/Responder (#135)
* This PR renames RPC-related classes and interfaces to more descriptive names (RpcClient/RpcServer → Requester/Responder) and adds support for RabbitMQ's DirectReply-To feature for improved RPC performance. The DirectReply-To feature allows the server to send replies directly to clients without creating explicit reply queues when supported by RabbitMQ 4.2+. * Fixes #133 --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent aee17f7 commit 7b9ba92

25 files changed

+681
-363
lines changed

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"
2323

2424
if [[ ! -v GITHUB_ACTIONS ]]
2525
then

.ci/ubuntu/cluster/rmq/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
ARG RABBITMQ_DOCKER_TAG=rabbitmq:4.1.0-beta.4-management-alpine
1+
ARG RABBITMQ_DOCKER_TAG=rabbitmq:4.2-management-alpine
22

33
FROM ${RABBITMQ_DOCKER_TAG}
44

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

1111

12-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
12+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"
1313

1414

1515

.ci/windows/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "27.2",
3-
"rabbitmq": "4.1.0"
3+
"rabbitmq": "4.2.0"
44
}

RabbitMQ.AMQP.Client/FeatureFlags.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ public class FeatureFlags
2424
/// </summary>
2525
public bool IsBrokerCompatible { get; internal set; } = false;
2626

27+
/// <summary>
28+
/// Check if Direct Reply-To is supported.
29+
/// Direct Reply-To is available in RabbitMQ 4.2 and later.
30+
/// see: https://www.rabbitmq.com/docs/direct-reply-to
31+
/// </summary>
32+
public bool IsDirectReplyToSupported { get; internal set; } = false;
33+
2734
public void Validate()
2835
{
2936
if (!IsBrokerCompatible)

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@ public interface IConnection : ILifeCycle
2828
IConsumerBuilder ConsumerBuilder();
2929

3030
/// <summary>
31-
/// Create an <see cref="IRpcServerBuilder"/> instance for this connection.
31+
/// Create an <see cref="IResponderBuilder"/> instance for this connection.
3232
/// </summary>
33-
/// <returns><see cref="IRpcServerBuilder"/> instance for this connection.</returns>
34-
IRpcServerBuilder RpcServerBuilder();
33+
/// <returns><see cref="IResponderBuilder"/> instance for this connection.</returns>
34+
IResponderBuilder ResponderBuilder();
3535

3636
/// <summary>
37-
/// Create an <see cref="IRpcClientBuilder"/> instance for this connection.
37+
/// Create an <see cref="IRequesterBuilder"/> instance for this connection.
3838
/// </summary>
39-
/// <returns><see cref="IRpcClientBuilder"/> instance for this connection.</returns>
40-
IRpcClientBuilder RpcClientBuilder();
39+
/// <returns><see cref="IRequesterBuilder"/> instance for this connection.</returns>
40+
IRequesterBuilder RequesterBuilder();
4141

4242
/// <summary>
4343
/// Get the properties for this connection.

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ public interface IConsumer : ILifeCycle
3737
/// Returns the number of unsettled messages.
3838
/// </summary>
3939
long UnsettledMessageCount { get; }
40+
41+
/// <summary>
42+
/// Returns queue name the consumer is consuming from.
43+
/// The Queue name is usually configured by the user via the <see cref="IConsumerBuilder"/>,
44+
/// but can also be generated by the client the special direct-reply-to queue.
45+
/// </summary>
46+
string Queue { get; }
4047
}
4148

4249
public interface IContext

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,17 @@ public enum StreamOffsetSpecification
1919
public interface IConsumerBuilder
2020
{
2121
IConsumerBuilder Queue(IQueueSpecification queueSpecification);
22-
IConsumerBuilder Queue(string queueName);
22+
IConsumerBuilder Queue(string? queueName);
23+
24+
/// <summary>
25+
/// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
26+
/// The server must also support direct reply-to.
27+
/// This feature allows the server to send the reply directly to the client without going through a reply queue.
28+
/// This can improve performance and reduce latency.
29+
/// Default is false.
30+
/// https://www.rabbitmq.com/docs/direct-reply-to
31+
/// </summary>
32+
IConsumerBuilder DirectReplyTo(bool directReplyTo);
2333

2434
IConsumerBuilder MessageHandler(MessageHandler handler);
2535

RabbitMQ.AMQP.Client/IRpcClient.cs renamed to RabbitMQ.AMQP.Client/IRequester.cs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,33 @@
88

99
namespace RabbitMQ.AMQP.Client
1010
{
11-
12-
public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBuilder>
11+
public interface IRequesterAddressBuilder : IAddressBuilder<IRequesterAddressBuilder>
1312
{
14-
IRpcClientBuilder RpcClient();
13+
IRequesterBuilder Requester();
1514
}
1615

1716
/// <summary>
1817
/// IRpcClientBuilder is the interface for creating an RPC client.
19-
/// See also <seealso cref="IRpcClient"/> and <seealso cref="IRpcServerBuilder"/>
18+
/// See also <seealso cref="IRequester"/> and <seealso cref="IResponderBuilder"/>
2019
/// </summary>
21-
public interface IRpcClientBuilder
20+
public interface IRequesterBuilder
2221
{
2322
/// <summary>
2423
/// Request address where the client sends requests.
2524
/// The server consumes requests from this address.
2625
/// </summary>
2726
/// <returns></returns>
28-
IRpcClientAddressBuilder RequestAddress();
27+
IRequesterAddressBuilder RequestAddress();
2928

3029
/// <summary>
3130
/// The queue from which requests are consumed.
3231
/// if not set the client will create a temporary queue.
3332
/// </summary>
3433
/// <param name="replyToQueueName"> The queue name</param>
3534
/// <returns></returns>
36-
IRpcClientBuilder ReplyToQueue(string replyToQueueName);
35+
IRequesterBuilder ReplyToQueue(string replyToQueueName);
3736

38-
IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);
37+
IRequesterBuilder ReplyToQueue(IQueueSpecification replyToQueue);
3938

4039
/// <summary>
4140
/// Extracts the correlation id from the request message.
@@ -45,7 +44,7 @@ public interface IRpcClientBuilder
4544
/// </summary>
4645
/// <param name="correlationIdExtractor"></param>
4746
/// <returns></returns>
48-
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
47+
IRequesterBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
4948

5049
/// <summary>
5150
/// Post processes the reply message before sending it to the server.
@@ -56,7 +55,7 @@ public interface IRpcClientBuilder
5655
/// </summary>
5756
/// <param name="requestPostProcessor"></param>
5857
/// <returns></returns>
59-
IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);
58+
IRequesterBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);
6059

6160
/// <summary>
6261
/// Client and Server must agree on the correlation id.
@@ -66,27 +65,27 @@ public interface IRpcClientBuilder
6665
/// </summary>
6766
/// <param name="correlationIdSupplier"></param>
6867
/// <returns></returns>
69-
70-
IRpcClientBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);
68+
IRequesterBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);
7169

7270
/// <summary>
7371
/// The time to wait for a reply from the server.
7472
/// </summary>
7573
/// <param name="timeout"></param>
7674
/// <returns></returns>
77-
IRpcClientBuilder Timeout(TimeSpan timeout);
75+
IRequesterBuilder Timeout(TimeSpan timeout);
76+
7877
/// <summary>
7978
/// Build and return the RPC client.
8079
/// </summary>
8180
/// <returns></returns>
82-
Task<IRpcClient> BuildAsync();
81+
Task<IRequester> BuildAsync();
8382
}
8483

8584
/// <summary>
8685
/// IRpcClient is the interface for an RPC client.
87-
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
86+
/// See also <seealso cref="IResponder"/> and <seealso cref="IRequesterBuilder"/>
8887
/// </summary>
89-
public interface IRpcClient : ILifeCycle
88+
public interface IRequester : ILifeCycle
9089
{
9190
/// <summary>
9291
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
@@ -98,5 +97,14 @@ public interface IRpcClient : ILifeCycle
9897
/// <param name="cancellationToken">Cancellation token</param>
9998
/// <returns></returns>
10099
Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default);
100+
101+
/// <summary>
102+
/// The ReplyTo queue address can be created by:
103+
/// - the client providing a specific queue name
104+
/// - the client creating a temporary queue
105+
/// - The server uses this address to send the reply message. with direct-reply-to
106+
/// </summary>
107+
/// <returns></returns>
108+
public string GetReplyToQueue();
101109
}
102110
}

RabbitMQ.AMQP.Client/IRpcServer.cs renamed to RabbitMQ.AMQP.Client/IResponder.cs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,22 @@
77

88
namespace RabbitMQ.AMQP.Client
99
{
10-
1110
/// <summary>
1211
/// IRpcServerBuilder is the interface for creating an RPC server.
1312
/// The RPC server consumes requests from a queue and sends replies to a reply queue.
14-
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
13+
/// See also <seealso cref="IResponder"/> and <seealso cref="IRequesterBuilder"/>
1514
/// </summary>
16-
public interface IRpcServerBuilder
15+
public interface IResponderBuilder
1716
{
1817
/// <summary>
1918
/// The queue from which requests are consumed.
2019
/// The client sends requests to this queue and the server consumes them.
2120
/// </summary>
2221
/// <param name="requestQueue"></param>
2322
/// <returns></returns>
24-
IRpcServerBuilder RequestQueue(string requestQueue);
25-
IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue);
23+
IResponderBuilder RequestQueue(string requestQueue);
24+
25+
IResponderBuilder RequestQueue(IQueueSpecification requestQueue);
2626

2727
/// <summary>
2828
/// Extracts the correlation id from the request message.
@@ -32,8 +32,7 @@ public interface IRpcServerBuilder
3232
/// </summary>
3333
/// <param name="correlationIdExtractor"></param>
3434
/// <returns></returns>
35-
36-
IRpcServerBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
35+
IResponderBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
3736

3837
/// <summary>
3938
/// Post processes the reply message before sending it to the client.
@@ -44,38 +43,37 @@ public interface IRpcServerBuilder
4443
/// </summary>
4544
/// <param name="replyPostProcessor"></param>
4645
/// <returns></returns>
47-
IRpcServerBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);
46+
IResponderBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);
4847

4948
/// <summary>
5049
/// Handle the request message and return the reply message.
5150
/// </summary>
5251
/// <param name="handler"></param>
5352
/// <returns></returns>
54-
IRpcServerBuilder Handler(RpcHandler handler);
53+
IResponderBuilder Handler(RpcHandler handler);
5554

5655
/// <summary>
5756
/// Build and return the RPC server.
5857
/// </summary>
5958
/// <returns></returns>
60-
Task<IRpcServer> BuildAsync();
59+
Task<IResponder> BuildAsync();
6160
}
6261

6362
/// <summary>
6463
/// Event handler for handling RPC requests.
6564
/// </summary>
6665
// TODO cancellation token
67-
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);
66+
public delegate Task<IMessage> RpcHandler(IResponder.IContext context, IMessage request);
6867

6968
/// <summary>
70-
/// IRpcServer interface for creating an RPC server.
69+
/// IResponder interface for creating an RPC server.
7170
/// The RPC is simulated by sending a request message and receiving a reply message.
7271
/// Where the client sends the queue where wants to receive the reply.
7372
/// RPC client ---> request queue ---> RPC server ---> reply queue ---> RPC client
74-
/// See also <seealso cref="IRpcClient"/>
73+
/// See also <seealso cref="IRequester"/>
7574
/// </summary>
76-
public interface IRpcServer : ILifeCycle
75+
public interface IResponder : ILifeCycle
7776
{
78-
7977
public interface IContext
8078
{
8179
IMessage Message(byte[] body);

0 commit comments

Comments
 (0)