@@ -15,21 +15,45 @@ namespace Tests.Rpc
1515{
1616 public class RpcServerTests ( ITestOutputHelper testOutputHelper ) : IntegrationTest ( testOutputHelper )
1717 {
18+ private string _requestQueueName = string . Empty ;
19+ private string _replyToName = $ "queueReplyTo-{ Now } -{ Guid . NewGuid ( ) } ";
20+ private string _correlationId = $ "my-correlation-id-{ Guid . NewGuid ( ) } ";
21+
22+ public override async Task InitializeAsync ( )
23+ {
24+ await base . InitializeAsync ( ) ;
25+
26+ Assert . NotNull ( _management ) ;
27+
28+ IQueueInfo requestQueueInfo = await _management . Queue ( )
29+ . Exclusive ( true )
30+ . AutoDelete ( true )
31+ . DeclareAsync ( ) ;
32+
33+ _requestQueueName = requestQueueInfo . Name ( ) ;
34+ }
35+
1836 [ Fact ]
1937 public async Task MockRpcServerPingPong ( )
2038 {
2139 Assert . NotNull ( _connection ) ;
22- Assert . NotNull ( _management ) ;
23- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
2440 TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
25- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
41+
42+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
2643 {
27- var reply = context . Message ( "pong" ) ;
44+ IMessage reply = context . Message ( "pong" ) ;
2845 tcs . SetResult ( reply ) ;
2946 return Task . FromResult ( reply ) ;
30- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
31- Assert . NotNull ( rpcServer ) ;
32- IPublisher p = await _connection . PublisherBuilder ( ) . Queue ( _queueName ) . BuildAsync ( ) ;
47+ }
48+
49+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
50+ . Handler ( RpcHandler )
51+ . RequestQueue ( _requestQueueName )
52+ . BuildAsync ( ) ;
53+
54+ IPublisher p = await _connection . PublisherBuilder ( )
55+ . Queue ( _requestQueueName )
56+ . BuildAsync ( ) ;
3357
3458 await p . PublishAsync ( new AmqpMessage ( "test" ) ) ;
3559 IMessage m = await WhenTcsCompletes ( tcs ) ;
@@ -41,15 +65,21 @@ public async Task MockRpcServerPingPong()
4165 public async Task RpcServerValidateStateChange ( )
4266 {
4367 Assert . NotNull ( _connection ) ;
44- Assert . NotNull ( _management ) ;
68+
4569 List < ( State , State ) > states = [ ] ;
46- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
4770 TaskCompletionSource < int > tcs = CreateTaskCompletionSource < int > ( ) ;
48- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
71+
72+ static Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
4973 {
50- var m = context . Message ( request . Body ( ) ) ;
74+ IMessage m = context . Message ( request . Body ( ) ) ;
5175 return Task . FromResult ( m ) ;
52- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
76+ }
77+
78+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
79+ . Handler ( RpcHandler )
80+ . RequestQueue ( _requestQueueName )
81+ . BuildAsync ( ) ;
82+
5383 rpcServer . ChangeState += ( sender , fromState , toState , e ) =>
5484 {
5585 states . Add ( ( fromState , toState ) ) ;
@@ -58,8 +88,9 @@ public async Task RpcServerValidateStateChange()
5888 tcs . SetResult ( states . Count ) ;
5989 }
6090 } ;
61- Assert . NotNull ( rpcServer ) ;
91+
6292 await rpcServer . CloseAsync ( ) ;
93+
6394 int count = await WhenTcsCompletes ( tcs ) ;
6495 Assert . Equal ( 2 , count ) ;
6596 Assert . Equal ( State . Open , states [ 0 ] . Item1 ) ;
@@ -76,33 +107,38 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
76107 {
77108 Assert . NotNull ( _connection ) ;
78109 Assert . NotNull ( _management ) ;
79- string requestQueue = _queueName ;
80- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
81- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
82- {
83- var reply = context . Message ( "pong" ) ;
84- return Task . FromResult ( reply ) ;
85- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
86110
87- Assert . NotNull ( rpcServer ) ;
88- string queueReplyTo = $ "queueReplyTo-{ Now } ";
89- IQueueSpecification spec = _management . Queue ( queueReplyTo ) . Exclusive ( true ) . AutoDelete ( true ) ;
90- await spec . DeclareAsync ( ) ;
111+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
112+ . Handler ( PongRpcHandler )
113+ . RequestQueue ( _requestQueueName )
114+ . BuildAsync ( ) ;
115+
116+ IQueueSpecification replyQueueSpec = _management . Queue ( _replyToName )
117+ . Exclusive ( true )
118+ . AutoDelete ( true ) ;
119+ await replyQueueSpec . DeclareAsync ( ) ;
120+
91121 TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
92122
93- IConsumer consumer = await _connection . ConsumerBuilder ( ) . Queue ( queueReplyTo ) . MessageHandler (
94- ( context , message ) =>
95- {
96- context . Accept ( ) ;
97- tcs . SetResult ( message ) ;
98- return Task . CompletedTask ;
99- } ) . BuildAndStartAsync ( ) ;
123+ Task MessageHandler ( IContext context , IMessage message )
124+ {
125+ context . Accept ( ) ;
126+ tcs . SetResult ( message ) ;
127+ return Task . CompletedTask ;
128+ }
100129
101- IPublisher publisher = await _connection . PublisherBuilder ( ) . Queue ( requestQueue ) . BuildAsync ( ) ;
102- Assert . NotNull ( publisher ) ;
103- AddressBuilder addressBuilder = new ( ) ;
130+ IConsumer consumer = await _connection . ConsumerBuilder ( )
131+ . Queue ( replyQueueSpec )
132+ . MessageHandler ( MessageHandler )
133+ . BuildAndStartAsync ( ) ;
134+
135+ IPublisher publisher = await _connection . PublisherBuilder ( )
136+ . Queue ( _requestQueueName )
137+ . BuildAsync ( ) ;
104138
105- IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( addressBuilder . Queue ( queueReplyTo ) . Address ( ) ) ;
139+ AddressBuilder addressBuilder = new ( ) ;
140+ string replyToAddress = addressBuilder . Queue ( replyQueueSpec ) . Address ( ) ;
141+ IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( replyToAddress ) ;
106142 PublishResult pr = await publisher . PublishAsync ( message ) ;
107143 Assert . Equal ( OutcomeState . Accepted , pr . Outcome . State ) ;
108144
@@ -123,17 +159,15 @@ public async Task RpcServerClientPingPongWithDefault()
123159 {
124160 Assert . NotNull ( _connection ) ;
125161 Assert . NotNull ( _management ) ;
126- string requestQueue = _queueName ;
127- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
128- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
129- {
130- var reply = context . Message ( "pong" ) ;
131- return Task . FromResult ( reply ) ;
132- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
133- Assert . NotNull ( rpcServer ) ;
134162
135- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
136- . Queue ( requestQueue )
163+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
164+ . Handler ( PongRpcHandler )
165+ . RequestQueue ( _requestQueueName )
166+ . BuildAsync ( ) ;
167+
168+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
169+ . RequestAddress ( )
170+ . Queue ( _requestQueueName )
137171 . RpcClient ( )
138172 . BuildAsync ( ) ;
139173
@@ -153,27 +187,22 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
153187 {
154188 Assert . NotNull ( _connection ) ;
155189 Assert . NotNull ( _management ) ;
156- string requestQueue = _queueName ;
157- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
158- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
159- {
160- var reply = context . Message ( "pong" ) ;
161- return Task . FromResult ( reply ) ;
162- } ) . RequestQueue ( _queueName )
163- . BuildAsync ( ) ;
164- Assert . NotNull ( rpcServer ) ;
165190
166- // custom replyTo queue
167- IQueueInfo replyTo =
168- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
191+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
192+ . Handler ( PongRpcHandler )
193+ . RequestQueue ( _requestQueueName )
194+ . BuildAsync ( ) ;
169195
170- // custom correlationId supplier
171- const string correlationId = "my-correlation-id" ;
196+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
197+ . Exclusive ( true )
198+ . AutoDelete ( true )
199+ . DeclareAsync ( ) ;
172200
173- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
174- . Queue ( requestQueue )
201+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
202+ . RequestAddress ( )
203+ . Queue ( _requestQueueName )
175204 . RpcClient ( )
176- . CorrelationIdSupplier ( ( ) => correlationId )
205+ . CorrelationIdSupplier ( ( ) => _correlationId )
177206 . CorrelationIdExtractor ( message => message . CorrelationId ( ) )
178207 . ReplyToQueue ( replyTo . Name ( ) )
179208 . BuildAsync ( ) ;
@@ -182,7 +211,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
182211
183212 IMessage response = await rpcClient . PublishAsync ( message ) ;
184213 Assert . Equal ( "pong" , response . Body ( ) ) ;
185- Assert . Equal ( correlationId , response . CorrelationId ( ) ) ;
214+ Assert . Equal ( _correlationId , response . CorrelationId ( ) ) ;
186215 await rpcClient . CloseAsync ( ) ;
187216 await rpcServer . CloseAsync ( ) ;
188217 }
@@ -199,34 +228,29 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
199228 {
200229 Assert . NotNull ( _connection ) ;
201230 Assert . NotNull ( _management ) ;
202- string requestQueue = _queueName ;
203- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
204- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
205- {
206- var reply = context . Message ( "pong" ) ;
207- return Task . FromResult ( reply ) ;
208- } ) . RequestQueue ( _queueName )
209- //come from the client
231+
232+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
233+ . Handler ( PongRpcHandler )
234+ . RequestQueue ( _requestQueueName )
210235 . CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
211- // replace the correlation id location with Application properties
212236 . ReplyPostProcessor ( ( reply , replyCorrelationId ) => reply . Property ( "correlationId" ,
213237 replyCorrelationId . ToString ( ) ?? throw new InvalidOperationException ( ) ) )
214238 . BuildAsync ( ) ;
215- Assert . NotNull ( rpcServer ) ;
216239
217- IQueueInfo replyTo =
218- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
240+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
241+ . Exclusive ( true )
242+ . AutoDelete ( true )
243+ . DeclareAsync ( ) ;
219244
220- // custom correlationId supplier
221- const string correlationId = "my-correlation-id" ;
222245 int correlationIdCounter = 0 ;
223246
224- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
225- . Queue ( requestQueue )
247+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
248+ . RequestAddress ( )
249+ . Queue ( _requestQueueName )
226250 . RpcClient ( )
227251 . ReplyToQueue ( replyTo . Name ( ) )
228252 // replace the correlation id creation with a custom function
229- . CorrelationIdSupplier ( ( ) => $ "{ correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
253+ . CorrelationIdSupplier ( ( ) => $ "{ _correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
230254 // The server will reply with the correlation id in application properties
231255 . CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
232256 // The client will use application properties to set the correlation id
@@ -244,8 +268,8 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
244268 IMessage response = await rpcClient . PublishAsync ( message ) ;
245269 Assert . Equal ( "pong" , response . Body ( ) ) ;
246270 // the server replies with the correlation id in the application properties
247- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
248- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
271+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
272+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
249273 Assert . Single ( response . Properties ( ) ) ;
250274 i ++ ;
251275 }
@@ -259,18 +283,16 @@ public async Task RpcClientMultiThreadShouldBeSafe()
259283 {
260284 Assert . NotNull ( _connection ) ;
261285 Assert . NotNull ( _management ) ;
262-
263- string requestQueue = _queueName ;
264-
265- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
266286 const int messagesToSend = 99 ;
287+
267288 TaskCompletionSource < bool > tcs = CreateTaskCompletionSource ( ) ;
268289 List < IMessage > messagesReceived = [ ] ;
269- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
290+
291+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
270292 {
271293 try
272294 {
273- var reply = context . Message ( "pong" ) ;
295+ IMessage reply = context . Message ( "pong" ) ;
274296 messagesReceived . Add ( request ) ;
275297 return Task . FromResult ( reply ) ;
276298 }
@@ -281,17 +303,19 @@ public async Task RpcClientMultiThreadShouldBeSafe()
281303 tcs . SetResult ( true ) ;
282304 }
283305 }
284- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
306+ }
285307
286- Assert . NotNull ( rpcServer ) ;
308+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
309+ . Handler ( RpcHandler )
310+ . RequestQueue ( _requestQueueName )
311+ . BuildAsync ( ) ;
287312
288313 IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
289- . Queue ( requestQueue )
314+ . Queue ( _requestQueueName )
290315 . RpcClient ( )
291316 . BuildAsync ( ) ;
292317
293318 List < Task > tasks = [ ] ;
294-
295319 // we simulate a multi-thread environment
296320 // where multiple threads send messages to the server
297321 // and the server replies to each message in a consistent way
@@ -332,25 +356,29 @@ public async Task RpcClientShouldRaiseTimeoutError()
332356 {
333357 Assert . NotNull ( _connection ) ;
334358 Assert . NotNull ( _management ) ;
335- string requestQueue = _queueName ;
336- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
337- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( async ( context , request ) =>
359+
360+ static async Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
338361 {
339362 IMessage reply = context . Message ( "pong" ) ;
340363 object millisecondsToWait = request . Property ( "wait" ) ;
341364 await Task . Delay ( TimeSpan . FromMilliseconds ( ( int ) millisecondsToWait ) ) ;
342365 return reply ;
343- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
344- Assert . NotNull ( rpcServer ) ;
366+ }
345367
346- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
347- . Queue ( requestQueue )
368+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
369+ . Handler ( RpcHandler )
370+ . RequestQueue ( _requestQueueName )
371+ . BuildAsync ( ) ;
372+
373+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
374+ . RequestAddress ( )
375+ . Queue ( _requestQueueName )
348376 . RpcClient ( )
349377 . Timeout ( TimeSpan . FromMilliseconds ( 300 ) )
350378 . BuildAsync ( ) ;
351379
352- IMessage reply = await rpcClient . PublishAsync (
353- new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ) ;
380+ IMessage msg = new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ;
381+ IMessage reply = await rpcClient . PublishAsync ( msg ) ;
354382 Assert . Equal ( "pong" , reply . Body ( ) ) ;
355383
356384 await Assert . ThrowsAsync < TimeoutException > ( ( ) => rpcClient . PublishAsync (
@@ -359,5 +387,11 @@ await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
359387 await rpcClient . CloseAsync ( ) ;
360388 await rpcServer . CloseAsync ( ) ;
361389 }
390+
391+ private static Task < IMessage > PongRpcHandler ( IRpcServer . IContext context , IMessage request )
392+ {
393+ IMessage reply = context . Message ( "pong" ) ;
394+ return Task . FromResult ( reply ) ;
395+ }
362396 }
363397}
0 commit comments