Skip to content

Commit 390c82a

Browse files
committed
RPC test cleanup
Addresses test flakes due to using not-quite-unique queue names.
1 parent afa3c82 commit 390c82a

File tree

1 file changed

+100
-48
lines changed

1 file changed

+100
-48
lines changed

Tests/Rpc/RpcServerTests.cs

Lines changed: 100 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,45 @@ namespace Tests.Rpc
1515
{
1616
public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
1717
{
18+
private string _requestQueueName = string.Empty;
19+
private string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}";
20+
21+
public override async Task InitializeAsync()
22+
{
23+
await base.InitializeAsync();
24+
25+
Assert.NotNull(_management);
26+
27+
IQueueInfo requestQueueInfo = await _management.Queue(_queueName)
28+
.Exclusive(true)
29+
.AutoDelete(true)
30+
.DeclareAsync();
31+
32+
Assert.Equal(_queueName, requestQueueInfo.Name());
33+
_requestQueueName = requestQueueInfo.Name();
34+
}
35+
1836
[Fact]
1937
public async Task MockRpcServerPingPong()
2038
{
2139
Assert.NotNull(_connection);
22-
Assert.NotNull(_management);
23-
await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync();
2440
TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();
25-
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
41+
42+
Task<IMessage> rpcHandler(IRpcServer.IContext context, IMessage request)
2643
{
27-
var reply = context.Message("pong");
44+
IMessage reply = context.Message("pong");
2845
tcs.SetResult(reply);
2946
return Task.FromResult(reply);
30-
}).RequestQueue(_queueName).BuildAsync();
31-
Assert.NotNull(rpcServer);
32-
IPublisher p = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
47+
}
48+
49+
IRpcServer rpcServer = await _connection.RpcServerBuilder()
50+
.Handler(rpcHandler)
51+
.RequestQueue(_requestQueueName)
52+
.BuildAsync();
53+
54+
IPublisher p = await _connection.PublisherBuilder()
55+
.Queue(_requestQueueName)
56+
.BuildAsync();
3357

3458
await p.PublishAsync(new AmqpMessage("test"));
3559
IMessage m = await WhenTcsCompletes(tcs);
@@ -41,15 +65,21 @@ public async Task MockRpcServerPingPong()
4165
public async Task RpcServerValidateStateChange()
4266
{
4367
Assert.NotNull(_connection);
44-
Assert.NotNull(_management);
68+
4569
List<(State, State)> states = [];
46-
await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync();
4770
TaskCompletionSource<int> tcs = CreateTaskCompletionSource<int>();
48-
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
71+
72+
static Task<IMessage> rpcHandler(IRpcServer.IContext context, IMessage request)
4973
{
50-
var m = context.Message(request.Body());
74+
IMessage m = context.Message(request.Body());
5175
return Task.FromResult(m);
52-
}).RequestQueue(_queueName).BuildAsync();
76+
}
77+
78+
IRpcServer rpcServer = await _connection.RpcServerBuilder()
79+
.Handler(rpcHandler)
80+
.RequestQueue(_requestQueueName)
81+
.BuildAsync();
82+
5383
rpcServer.ChangeState += (sender, fromState, toState, e) =>
5484
{
5585
states.Add((fromState, toState));
@@ -58,8 +88,9 @@ public async Task RpcServerValidateStateChange()
5888
tcs.SetResult(states.Count);
5989
}
6090
};
61-
Assert.NotNull(rpcServer);
91+
6292
await rpcServer.CloseAsync();
93+
6394
int count = await WhenTcsCompletes(tcs);
6495
Assert.Equal(2, count);
6596
Assert.Equal(State.Open, states[0].Item1);
@@ -76,33 +107,44 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
76107
{
77108
Assert.NotNull(_connection);
78109
Assert.NotNull(_management);
79-
string requestQueue = _queueName;
80-
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
81-
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
110+
111+
static Task<IMessage> rpcHandler(IRpcServer.IContext context, IMessage request)
82112
{
83-
var reply = context.Message("pong");
113+
IMessage reply = context.Message("pong");
84114
return Task.FromResult(reply);
85-
}).RequestQueue(requestQueue).BuildAsync();
115+
}
116+
117+
IRpcServer rpcServer = await _connection.RpcServerBuilder()
118+
.Handler(rpcHandler)
119+
.RequestQueue(_requestQueueName)
120+
.BuildAsync();
121+
122+
IQueueSpecification replyQueueSpec = _management.Queue(_replyToName)
123+
.Exclusive(true)
124+
.AutoDelete(true);
125+
await replyQueueSpec.DeclareAsync();
86126

87-
Assert.NotNull(rpcServer);
88-
string queueReplyTo = $"queueReplyTo-{Now}";
89-
IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true);
90-
await spec.DeclareAsync();
91127
TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();
92128

93-
IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler(
94-
(context, message) =>
95-
{
96-
context.Accept();
97-
tcs.SetResult(message);
98-
return Task.CompletedTask;
99-
}).BuildAndStartAsync();
129+
Task messageHandler(IContext context, IMessage message)
130+
{
131+
context.Accept();
132+
tcs.SetResult(message);
133+
return Task.CompletedTask;
134+
}
100135

101-
IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync();
102-
Assert.NotNull(publisher);
103-
AddressBuilder addressBuilder = new();
136+
IConsumer consumer = await _connection.ConsumerBuilder()
137+
.Queue(replyQueueSpec)
138+
.MessageHandler(messageHandler)
139+
.BuildAndStartAsync();
140+
141+
IPublisher publisher = await _connection.PublisherBuilder()
142+
.Queue(_requestQueueName)
143+
.BuildAsync();
104144

105-
IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address());
145+
AddressBuilder addressBuilder = new();
146+
string replyToAddress = addressBuilder.Queue(replyQueueSpec).Address();
147+
IMessage message = new AmqpMessage("test").ReplyTo(replyToAddress);
106148
PublishResult pr = await publisher.PublishAsync(message);
107149
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
108150

@@ -154,23 +196,30 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
154196
Assert.NotNull(_connection);
155197
Assert.NotNull(_management);
156198
string requestQueue = _queueName;
157-
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
158-
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
199+
string correlationId = $"my-correlation-id-{Guid.NewGuid()}";
200+
201+
await _management.Queue(requestQueue)
202+
.Exclusive(true)
203+
.AutoDelete(true)
204+
.DeclareAsync();
205+
206+
IRpcServer rpcServer = await _connection.RpcServerBuilder()
207+
.Handler((context, request) =>
159208
{
160-
var reply = context.Message("pong");
209+
IMessage reply = context.Message("pong");
161210
return Task.FromResult(reply);
162211
}).RequestQueue(_queueName)
163212
.BuildAsync();
164213
Assert.NotNull(rpcServer);
165214

166-
// custom replyTo queue
167-
IQueueInfo replyTo =
168-
await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync();
169-
170-
// custom correlationId supplier
171-
const string correlationId = "my-correlation-id";
215+
string replyToName = $"replyTo-{Now}-{Guid.NewGuid()}";
216+
IQueueInfo replyTo = await _management.Queue(replyToName)
217+
.Exclusive(true)
218+
.AutoDelete(true)
219+
.DeclareAsync();
172220

173-
IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
221+
IRpcClient rpcClient = await _connection.RpcClientBuilder()
222+
.RequestAddress()
174223
.Queue(requestQueue)
175224
.RpcClient()
176225
.CorrelationIdSupplier(() => correlationId)
@@ -200,10 +249,12 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
200249
Assert.NotNull(_connection);
201250
Assert.NotNull(_management);
202251
string requestQueue = _queueName;
252+
string correlationId = $"my-correlation-id-{Guid.NewGuid()}";
253+
203254
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
204255
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
205256
{
206-
var reply = context.Message("pong");
257+
IMessage reply = context.Message("pong");
207258
return Task.FromResult(reply);
208259
}).RequestQueue(_queueName)
209260
//come from the client
@@ -214,11 +265,12 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
214265
.BuildAsync();
215266
Assert.NotNull(rpcServer);
216267

217-
IQueueInfo replyTo =
218-
await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync();
268+
string replyToName = $"replyTo-{Now}-{Guid.NewGuid()}";
269+
IQueueInfo replyTo = await _management.Queue(replyToName)
270+
.Exclusive(true)
271+
.AutoDelete(true)
272+
.DeclareAsync();
219273

220-
// custom correlationId supplier
221-
const string correlationId = "my-correlation-id";
222274
int correlationIdCounter = 0;
223275

224276
IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()

0 commit comments

Comments
 (0)