Skip to content

Commit bb1216f

Browse files
committed
reply to wip
fixes #133 Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent ea0ae19 commit bb1216f

File tree

4 files changed

+42
-29
lines changed

4 files changed

+42
-29
lines changed

RabbitMQ.AMQP.Client/FeatureFlags.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ public class FeatureFlags
2424
/// </summary>
2525
public bool IsBrokerCompatible { get; internal set; } = false;
2626

27+
/// <summary>
28+
/// Check if Direct Reply-To is supported.
29+
/// Direct Reply-To is available in RabbitMQ 4.2 and later.
30+
/// see: https://www.rabbitmq.com/docs/direct-reply-to
31+
/// </summary>
32+
public bool IsDirectReplyToSupported { get; internal set; } = false;
33+
2734
public void Validate()
2835
{
2936
if (!IsBrokerCompatible)

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3939
private readonly IMetricsReporter? _metricsReporter;
4040

4141
private readonly Dictionary<string, object> _connectionProperties = new();
42-
internal readonly FeatureFlags _featureFlags = new FeatureFlags();
42+
internal readonly FeatureFlags _featureFlags = new();
4343

4444
/// <summary>
4545
/// _publishersDict contains all the publishers created by the connection.
@@ -670,6 +670,8 @@ private void HandleProperties(Fields properties)
670670
// check if the broker supports filter expressions
671671
// this is a feature that was introduced in RabbitMQ 4.2.0
672672
_featureFlags.IsSqlFeatureEnabled = Utils.Is4_2_OrMore(brokerVersion);
673+
674+
_featureFlags.IsDirectReplyToSupported = Utils.Is4_2_OrMore(brokerVersion);
673675

674676
_featureFlags.IsFilterFeatureEnabled = Utils.SupportsFilterExpressions(brokerVersion);
675677
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ RabbitMQ.AMQP.Client.ExchangeType.TOPIC = 2 -> RabbitMQ.AMQP.Client.ExchangeType
116116
RabbitMQ.AMQP.Client.FeatureFlags
117117
RabbitMQ.AMQP.Client.FeatureFlags.FeatureFlags() -> void
118118
RabbitMQ.AMQP.Client.FeatureFlags.IsBrokerCompatible.get -> bool
119+
RabbitMQ.AMQP.Client.FeatureFlags.IsDirectReplyToSupported.get -> bool
119120
RabbitMQ.AMQP.Client.FeatureFlags.IsFilterFeatureEnabled.get -> bool
120121
RabbitMQ.AMQP.Client.FeatureFlags.IsSqlFeatureEnabled.get -> bool
121122
RabbitMQ.AMQP.Client.FeatureFlags.Validate() -> void

Tests/Rpc/ResponderTests.cs

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,8 @@ public async Task MockResponderPingPong()
3939
Assert.NotNull(_connection);
4040
TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();
4141

42-
Task<IMessage> RpcHandler(IResponder.IContext context, IMessage request)
43-
{
44-
IMessage reply = context.Message("pong");
45-
tcs.SetResult(reply);
46-
return Task.FromResult(reply);
47-
}
48-
4942
IResponder responder = await _connection.ResponderBuilder()
50-
.Handler(RpcHandler)
43+
.Handler(Handler)
5144
.RequestQueue(_requestQueueName)
5245
.BuildAsync();
5346

@@ -59,6 +52,14 @@ Task<IMessage> RpcHandler(IResponder.IContext context, IMessage request)
5952
IMessage m = await WhenTcsCompletes(tcs);
6053
Assert.Equal("pong", m.BodyAsString());
6154
await responder.CloseAsync();
55+
return;
56+
57+
Task<IMessage> Handler(IResponder.IContext context, IMessage request)
58+
{
59+
IMessage reply = context.Message("pong");
60+
tcs.SetResult(reply);
61+
return Task.FromResult(reply);
62+
}
6263
}
6364

6465
[Fact]
@@ -69,14 +70,8 @@ public async Task ResponderValidateStateChange()
6970
List<(State, State)> states = [];
7071
TaskCompletionSource<int> tcs = CreateTaskCompletionSource<int>();
7172

72-
static Task<IMessage> RpcHandler(IResponder.IContext context, IMessage request)
73-
{
74-
IMessage m = context.Message(request.Body());
75-
return Task.FromResult(m);
76-
}
77-
7873
IResponder responder = await _connection.ResponderBuilder()
79-
.Handler(RpcHandler)
74+
.Handler(Handler)
8075
.RequestQueue(_requestQueueName)
8176
.BuildAsync();
8277

@@ -97,6 +92,13 @@ static Task<IMessage> RpcHandler(IResponder.IContext context, IMessage request)
9792
Assert.Equal(State.Closing, states[0].Item2);
9893
Assert.Equal(State.Closing, states[1].Item1);
9994
Assert.Equal(State.Closed, states[1].Item2);
95+
return;
96+
97+
static Task<IMessage> Handler(IResponder.IContext context, IMessage request)
98+
{
99+
IMessage m = context.Message(request.Body());
100+
return Task.FromResult(m);
101+
}
100102
}
101103

102104
/// <summary>
@@ -120,13 +122,6 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
120122

121123
TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();
122124

123-
Task MessageHandler(IContext context, IMessage message)
124-
{
125-
context.Accept();
126-
tcs.SetResult(message);
127-
return Task.CompletedTask;
128-
}
129-
130125
IConsumer consumer = await _connection.ConsumerBuilder()
131126
.Queue(replyQueueSpec)
132127
.MessageHandler(MessageHandler)
@@ -148,14 +143,22 @@ Task MessageHandler(IContext context, IMessage message)
148143
await responder.CloseAsync();
149144
await consumer.CloseAsync();
150145
await publisher.CloseAsync();
146+
return;
147+
148+
Task MessageHandler(IContext context, IMessage msg)
149+
{
150+
context.Accept();
151+
tcs.SetResult(msg);
152+
return Task.CompletedTask;
153+
}
151154
}
152155

153156
/// <summary>
154157
/// In this test the client has to create a reply queue since is not provided by the user
155158
/// with the ReplyToQueue method
156159
/// </summary>
157160
[Fact]
158-
public async Task ResponderClientPingPongWithDefault()
161+
public async Task ResponderRequesterPingPongWithDefault()
159162
{
160163
Assert.NotNull(_connection);
161164

@@ -179,10 +182,10 @@ public async Task ResponderClientPingPongWithDefault()
179182
}
180183

181184
/// <summary>
182-
/// In this test the client has to use the ReplyToQueue provided by the user
185+
/// In this test the Requester has to use the ReplyToQueue provided by the user
183186
/// </summary>
184187
[Fact]
185-
public async Task ResponderClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier()
188+
public async Task ResponderRequesterPingPongWithCustomReplyToQueueAndCorrelationIdSupplier()
186189
{
187190
Assert.NotNull(_connection);
188191
Assert.NotNull(_management);
@@ -223,7 +226,7 @@ public async Task ResponderClientPingPongWithCustomReplyToQueueAndCorrelationIdS
223226
/// </summary>
224227
/// <exception cref="InvalidOperationException"></exception>
225228
[Fact]
226-
public async Task ResponderClientOverridingTheRequestAndResponsePostProcessor()
229+
public async Task ResponderRequesterOverridingTheRequestAndResponsePostProcessor()
227230
{
228231
Assert.NotNull(_connection);
229232
Assert.NotNull(_management);
@@ -278,7 +281,7 @@ public async Task ResponderClientOverridingTheRequestAndResponsePostProcessor()
278281
}
279282

280283
[Fact]
281-
public async Task RpcClientMultiThreadShouldBeSafe()
284+
public async Task RequesterMultiThreadShouldBeSafe()
282285
{
283286
Assert.NotNull(_connection);
284287
const int messagesToSend = 99;
@@ -350,7 +353,7 @@ Task<IMessage> RpcHandler(IResponder.IContext context, IMessage request)
350353
/// The RPC client `PublishAsync` should raise a timeout exception if the server does not reply within the timeout
351354
/// </summary>
352355
[Fact]
353-
public async Task RpcClientShouldRaiseTimeoutError()
356+
public async Task RequesterShouldRaiseTimeoutError()
354357
{
355358
Assert.NotNull(_connection);
356359

0 commit comments

Comments
 (0)