@@ -15,21 +15,45 @@ namespace Tests.Rpc
15
15
{
16
16
public class RpcServerTests ( ITestOutputHelper testOutputHelper ) : IntegrationTest ( testOutputHelper )
17
17
{
18
+ private string _requestQueueName = string . Empty ;
19
+ private string _replyToName = $ "queueReplyTo-{ Now } -{ Guid . NewGuid ( ) } ";
20
+ private string _correlationId = $ "my-correlation-id-{ Guid . NewGuid ( ) } ";
21
+
22
+ public override async Task InitializeAsync ( )
23
+ {
24
+ await base . InitializeAsync ( ) ;
25
+
26
+ Assert . NotNull ( _management ) ;
27
+
28
+ IQueueInfo requestQueueInfo = await _management . Queue ( )
29
+ . Exclusive ( true )
30
+ . AutoDelete ( true )
31
+ . DeclareAsync ( ) ;
32
+
33
+ _requestQueueName = requestQueueInfo . Name ( ) ;
34
+ }
35
+
18
36
[ Fact ]
19
37
public async Task MockRpcServerPingPong ( )
20
38
{
21
39
Assert . NotNull ( _connection ) ;
22
- Assert . NotNull ( _management ) ;
23
- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
24
40
TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
25
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
41
+
42
+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
26
43
{
27
- var reply = context . Message ( "pong" ) ;
44
+ IMessage reply = context . Message ( "pong" ) ;
28
45
tcs . SetResult ( reply ) ;
29
46
return Task . FromResult ( reply ) ;
30
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
31
- Assert . NotNull ( rpcServer ) ;
32
- IPublisher p = await _connection . PublisherBuilder ( ) . Queue ( _queueName ) . BuildAsync ( ) ;
47
+ }
48
+
49
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
50
+ . Handler ( RpcHandler )
51
+ . RequestQueue ( _requestQueueName )
52
+ . BuildAsync ( ) ;
53
+
54
+ IPublisher p = await _connection . PublisherBuilder ( )
55
+ . Queue ( _requestQueueName )
56
+ . BuildAsync ( ) ;
33
57
34
58
await p . PublishAsync ( new AmqpMessage ( "test" ) ) ;
35
59
IMessage m = await WhenTcsCompletes ( tcs ) ;
@@ -41,15 +65,21 @@ public async Task MockRpcServerPingPong()
41
65
public async Task RpcServerValidateStateChange ( )
42
66
{
43
67
Assert . NotNull ( _connection ) ;
44
- Assert . NotNull ( _management ) ;
68
+
45
69
List < ( State , State ) > states = [ ] ;
46
- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
47
70
TaskCompletionSource < int > tcs = CreateTaskCompletionSource < int > ( ) ;
48
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
71
+
72
+ static Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
49
73
{
50
- var m = context . Message ( request . Body ( ) ) ;
74
+ IMessage m = context . Message ( request . Body ( ) ) ;
51
75
return Task . FromResult ( m ) ;
52
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
76
+ }
77
+
78
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
79
+ . Handler ( RpcHandler )
80
+ . RequestQueue ( _requestQueueName )
81
+ . BuildAsync ( ) ;
82
+
53
83
rpcServer . ChangeState += ( sender , fromState , toState , e ) =>
54
84
{
55
85
states . Add ( ( fromState , toState ) ) ;
@@ -58,8 +88,9 @@ public async Task RpcServerValidateStateChange()
58
88
tcs . SetResult ( states . Count ) ;
59
89
}
60
90
} ;
61
- Assert . NotNull ( rpcServer ) ;
91
+
62
92
await rpcServer . CloseAsync ( ) ;
93
+
63
94
int count = await WhenTcsCompletes ( tcs ) ;
64
95
Assert . Equal ( 2 , count ) ;
65
96
Assert . Equal ( State . Open , states [ 0 ] . Item1 ) ;
@@ -76,33 +107,38 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
76
107
{
77
108
Assert . NotNull ( _connection ) ;
78
109
Assert . NotNull ( _management ) ;
79
- string requestQueue = _queueName ;
80
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
81
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
82
- {
83
- var reply = context . Message ( "pong" ) ;
84
- return Task . FromResult ( reply ) ;
85
- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
86
110
87
- Assert . NotNull ( rpcServer ) ;
88
- string queueReplyTo = $ "queueReplyTo-{ Now } ";
89
- IQueueSpecification spec = _management . Queue ( queueReplyTo ) . Exclusive ( true ) . AutoDelete ( true ) ;
90
- await spec . DeclareAsync ( ) ;
111
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
112
+ . Handler ( PongRpcHandler )
113
+ . RequestQueue ( _requestQueueName )
114
+ . BuildAsync ( ) ;
115
+
116
+ IQueueSpecification replyQueueSpec = _management . Queue ( _replyToName )
117
+ . Exclusive ( true )
118
+ . AutoDelete ( true ) ;
119
+ await replyQueueSpec . DeclareAsync ( ) ;
120
+
91
121
TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
92
122
93
- IConsumer consumer = await _connection . ConsumerBuilder ( ) . Queue ( queueReplyTo ) . MessageHandler (
94
- ( context , message ) =>
95
- {
96
- context . Accept ( ) ;
97
- tcs . SetResult ( message ) ;
98
- return Task . CompletedTask ;
99
- } ) . BuildAndStartAsync ( ) ;
123
+ Task MessageHandler ( IContext context , IMessage message )
124
+ {
125
+ context . Accept ( ) ;
126
+ tcs . SetResult ( message ) ;
127
+ return Task . CompletedTask ;
128
+ }
100
129
101
- IPublisher publisher = await _connection . PublisherBuilder ( ) . Queue ( requestQueue ) . BuildAsync ( ) ;
102
- Assert . NotNull ( publisher ) ;
103
- AddressBuilder addressBuilder = new ( ) ;
130
+ IConsumer consumer = await _connection . ConsumerBuilder ( )
131
+ . Queue ( replyQueueSpec )
132
+ . MessageHandler ( MessageHandler )
133
+ . BuildAndStartAsync ( ) ;
104
134
105
- IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( addressBuilder . Queue ( queueReplyTo ) . Address ( ) ) ;
135
+ IPublisher publisher = await _connection . PublisherBuilder ( )
136
+ . Queue ( _requestQueueName )
137
+ . BuildAsync ( ) ;
138
+
139
+ AddressBuilder addressBuilder = new ( ) ;
140
+ string replyToAddress = addressBuilder . Queue ( replyQueueSpec ) . Address ( ) ;
141
+ IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( replyToAddress ) ;
106
142
PublishResult pr = await publisher . PublishAsync ( message ) ;
107
143
Assert . Equal ( OutcomeState . Accepted , pr . Outcome . State ) ;
108
144
@@ -122,18 +158,15 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
122
158
public async Task RpcServerClientPingPongWithDefault ( )
123
159
{
124
160
Assert . NotNull ( _connection ) ;
125
- Assert . NotNull ( _management ) ;
126
- string requestQueue = _queueName ;
127
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
128
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
129
- {
130
- var reply = context . Message ( "pong" ) ;
131
- return Task . FromResult ( reply ) ;
132
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
133
- Assert . NotNull ( rpcServer ) ;
134
161
135
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
136
- . Queue ( requestQueue )
162
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
163
+ . Handler ( PongRpcHandler )
164
+ . RequestQueue ( _requestQueueName )
165
+ . BuildAsync ( ) ;
166
+
167
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
168
+ . RequestAddress ( )
169
+ . Queue ( _requestQueueName )
137
170
. RpcClient ( )
138
171
. BuildAsync ( ) ;
139
172
@@ -153,27 +186,22 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
153
186
{
154
187
Assert . NotNull ( _connection ) ;
155
188
Assert . NotNull ( _management ) ;
156
- string requestQueue = _queueName ;
157
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
158
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
159
- {
160
- var reply = context . Message ( "pong" ) ;
161
- return Task . FromResult ( reply ) ;
162
- } ) . RequestQueue ( _queueName )
163
- . BuildAsync ( ) ;
164
- Assert . NotNull ( rpcServer ) ;
165
189
166
- // custom replyTo queue
167
- IQueueInfo replyTo =
168
- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
190
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
191
+ . Handler ( PongRpcHandler )
192
+ . RequestQueue ( _requestQueueName )
193
+ . BuildAsync ( ) ;
169
194
170
- // custom correlationId supplier
171
- const string correlationId = "my-correlation-id" ;
195
+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
196
+ . Exclusive ( true )
197
+ . AutoDelete ( true )
198
+ . DeclareAsync ( ) ;
172
199
173
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
174
- . Queue ( requestQueue )
200
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
201
+ . RequestAddress ( )
202
+ . Queue ( _requestQueueName )
175
203
. RpcClient ( )
176
- . CorrelationIdSupplier ( ( ) => correlationId )
204
+ . CorrelationIdSupplier ( ( ) => _correlationId )
177
205
. CorrelationIdExtractor ( message => message . CorrelationId ( ) )
178
206
. ReplyToQueue ( replyTo . Name ( ) )
179
207
. BuildAsync ( ) ;
@@ -182,7 +210,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
182
210
183
211
IMessage response = await rpcClient . PublishAsync ( message ) ;
184
212
Assert . Equal ( "pong" , response . Body ( ) ) ;
185
- Assert . Equal ( correlationId , response . CorrelationId ( ) ) ;
213
+ Assert . Equal ( _correlationId , response . CorrelationId ( ) ) ;
186
214
await rpcClient . CloseAsync ( ) ;
187
215
await rpcServer . CloseAsync ( ) ;
188
216
}
@@ -199,34 +227,29 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
199
227
{
200
228
Assert . NotNull ( _connection ) ;
201
229
Assert . NotNull ( _management ) ;
202
- string requestQueue = _queueName ;
203
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
204
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
205
- {
206
- var reply = context . Message ( "pong" ) ;
207
- return Task . FromResult ( reply ) ;
208
- } ) . RequestQueue ( _queueName )
209
- //come from the client
230
+
231
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
232
+ . Handler ( PongRpcHandler )
233
+ . RequestQueue ( _requestQueueName )
210
234
. CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
211
- // replace the correlation id location with Application properties
212
235
. ReplyPostProcessor ( ( reply , replyCorrelationId ) => reply . Property ( "correlationId" ,
213
236
replyCorrelationId . ToString ( ) ?? throw new InvalidOperationException ( ) ) )
214
237
. BuildAsync ( ) ;
215
- Assert . NotNull ( rpcServer ) ;
216
238
217
- IQueueInfo replyTo =
218
- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
239
+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
240
+ . Exclusive ( true )
241
+ . AutoDelete ( true )
242
+ . DeclareAsync ( ) ;
219
243
220
- // custom correlationId supplier
221
- const string correlationId = "my-correlation-id" ;
222
244
int correlationIdCounter = 0 ;
223
245
224
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
225
- . Queue ( requestQueue )
246
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
247
+ . RequestAddress ( )
248
+ . Queue ( _requestQueueName )
226
249
. RpcClient ( )
227
250
. ReplyToQueue ( replyTo . Name ( ) )
228
251
// replace the correlation id creation with a custom function
229
- . CorrelationIdSupplier ( ( ) => $ "{ correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
252
+ . CorrelationIdSupplier ( ( ) => $ "{ _correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
230
253
// The server will reply with the correlation id in application properties
231
254
. CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
232
255
// The client will use application properties to set the correlation id
@@ -244,8 +267,8 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
244
267
IMessage response = await rpcClient . PublishAsync ( message ) ;
245
268
Assert . Equal ( "pong" , response . Body ( ) ) ;
246
269
// the server replies with the correlation id in the application properties
247
- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
248
- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
270
+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
271
+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
249
272
Assert . Single ( response . Properties ( ) ) ;
250
273
i ++ ;
251
274
}
@@ -258,19 +281,16 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
258
281
public async Task RpcClientMultiThreadShouldBeSafe ( )
259
282
{
260
283
Assert . NotNull ( _connection ) ;
261
- Assert . NotNull ( _management ) ;
262
-
263
- string requestQueue = _queueName ;
264
-
265
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
266
284
const int messagesToSend = 99 ;
285
+
267
286
TaskCompletionSource < bool > tcs = CreateTaskCompletionSource ( ) ;
268
287
List < IMessage > messagesReceived = [ ] ;
269
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
288
+
289
+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
270
290
{
271
291
try
272
292
{
273
- var reply = context . Message ( "pong" ) ;
293
+ IMessage reply = context . Message ( "pong" ) ;
274
294
messagesReceived . Add ( request ) ;
275
295
return Task . FromResult ( reply ) ;
276
296
}
@@ -281,17 +301,19 @@ public async Task RpcClientMultiThreadShouldBeSafe()
281
301
tcs . SetResult ( true ) ;
282
302
}
283
303
}
284
- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
304
+ }
285
305
286
- Assert . NotNull ( rpcServer ) ;
306
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
307
+ . Handler ( RpcHandler )
308
+ . RequestQueue ( _requestQueueName )
309
+ . BuildAsync ( ) ;
287
310
288
311
IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
289
- . Queue ( requestQueue )
312
+ . Queue ( _requestQueueName )
290
313
. RpcClient ( )
291
314
. BuildAsync ( ) ;
292
315
293
316
List < Task > tasks = [ ] ;
294
-
295
317
// we simulate a multi-thread environment
296
318
// where multiple threads send messages to the server
297
319
// and the server replies to each message in a consistent way
@@ -331,26 +353,29 @@ public async Task RpcClientMultiThreadShouldBeSafe()
331
353
public async Task RpcClientShouldRaiseTimeoutError ( )
332
354
{
333
355
Assert . NotNull ( _connection ) ;
334
- Assert . NotNull ( _management ) ;
335
- string requestQueue = _queueName ;
336
- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
337
- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( async ( context , request ) =>
356
+
357
+ static async Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
338
358
{
339
359
IMessage reply = context . Message ( "pong" ) ;
340
360
object millisecondsToWait = request . Property ( "wait" ) ;
341
361
await Task . Delay ( TimeSpan . FromMilliseconds ( ( int ) millisecondsToWait ) ) ;
342
362
return reply ;
343
- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
344
- Assert . NotNull ( rpcServer ) ;
363
+ }
345
364
346
- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
347
- . Queue ( requestQueue )
365
+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
366
+ . Handler ( RpcHandler )
367
+ . RequestQueue ( _requestQueueName )
368
+ . BuildAsync ( ) ;
369
+
370
+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
371
+ . RequestAddress ( )
372
+ . Queue ( _requestQueueName )
348
373
. RpcClient ( )
349
374
. Timeout ( TimeSpan . FromMilliseconds ( 300 ) )
350
375
. BuildAsync ( ) ;
351
376
352
- IMessage reply = await rpcClient . PublishAsync (
353
- new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ) ;
377
+ IMessage msg = new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ;
378
+ IMessage reply = await rpcClient . PublishAsync ( msg ) ;
354
379
Assert . Equal ( "pong" , reply . Body ( ) ) ;
355
380
356
381
await Assert . ThrowsAsync < TimeoutException > ( ( ) => rpcClient . PublishAsync (
@@ -359,5 +384,11 @@ await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
359
384
await rpcClient . CloseAsync ( ) ;
360
385
await rpcServer . CloseAsync ( ) ;
361
386
}
387
+
388
+ private static Task < IMessage > PongRpcHandler ( IRpcServer . IContext context , IMessage request )
389
+ {
390
+ IMessage reply = context . Message ( "pong" ) ;
391
+ return Task . FromResult ( reply ) ;
392
+ }
362
393
}
363
394
}
0 commit comments