@@ -15,21 +15,45 @@ namespace Tests.Rpc
1515{
1616 public class RpcServerTests ( ITestOutputHelper testOutputHelper ) : IntegrationTest ( testOutputHelper )
1717 {
18+ private string _requestQueueName = string . Empty ;
19+ private string _replyToName = $ "queueReplyTo-{ Now } -{ Guid . NewGuid ( ) } ";
20+ private string _correlationId = $ "my-correlation-id-{ Guid . NewGuid ( ) } ";
21+
22+ public override async Task InitializeAsync ( )
23+ {
24+ await base . InitializeAsync ( ) ;
25+
26+ Assert . NotNull ( _management ) ;
27+
28+ IQueueInfo requestQueueInfo = await _management . Queue ( )
29+ . Exclusive ( true )
30+ . AutoDelete ( true )
31+ . DeclareAsync ( ) ;
32+
33+ _requestQueueName = requestQueueInfo . Name ( ) ;
34+ }
35+
1836 [ Fact ]
1937 public async Task MockRpcServerPingPong ( )
2038 {
2139 Assert . NotNull ( _connection ) ;
22- Assert . NotNull ( _management ) ;
23- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
2440 TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
25- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
41+
42+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
2643 {
27- var reply = context . Message ( "pong" ) ;
44+ IMessage reply = context . Message ( "pong" ) ;
2845 tcs . SetResult ( reply ) ;
2946 return Task . FromResult ( reply ) ;
30- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
31- Assert . NotNull ( rpcServer ) ;
32- IPublisher p = await _connection . PublisherBuilder ( ) . Queue ( _queueName ) . BuildAsync ( ) ;
47+ }
48+
49+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
50+ . Handler ( RpcHandler )
51+ . RequestQueue ( _requestQueueName )
52+ . BuildAsync ( ) ;
53+
54+ IPublisher p = await _connection . PublisherBuilder ( )
55+ . Queue ( _requestQueueName )
56+ . BuildAsync ( ) ;
3357
3458 await p . PublishAsync ( new AmqpMessage ( "test" ) ) ;
3559 IMessage m = await WhenTcsCompletes ( tcs ) ;
@@ -41,15 +65,21 @@ public async Task MockRpcServerPingPong()
4165 public async Task RpcServerValidateStateChange ( )
4266 {
4367 Assert . NotNull ( _connection ) ;
44- Assert . NotNull ( _management ) ;
68+
4569 List < ( State , State ) > states = [ ] ;
46- await _management . Queue ( _queueName ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
4770 TaskCompletionSource < int > tcs = CreateTaskCompletionSource < int > ( ) ;
48- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
71+
72+ static Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
4973 {
50- var m = context . Message ( request . Body ( ) ) ;
74+ IMessage m = context . Message ( request . Body ( ) ) ;
5175 return Task . FromResult ( m ) ;
52- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
76+ }
77+
78+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
79+ . Handler ( RpcHandler )
80+ . RequestQueue ( _requestQueueName )
81+ . BuildAsync ( ) ;
82+
5383 rpcServer . ChangeState += ( sender , fromState , toState , e ) =>
5484 {
5585 states . Add ( ( fromState , toState ) ) ;
@@ -58,8 +88,9 @@ public async Task RpcServerValidateStateChange()
5888 tcs . SetResult ( states . Count ) ;
5989 }
6090 } ;
61- Assert . NotNull ( rpcServer ) ;
91+
6292 await rpcServer . CloseAsync ( ) ;
93+
6394 int count = await WhenTcsCompletes ( tcs ) ;
6495 Assert . Equal ( 2 , count ) ;
6596 Assert . Equal ( State . Open , states [ 0 ] . Item1 ) ;
@@ -76,33 +107,38 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
76107 {
77108 Assert . NotNull ( _connection ) ;
78109 Assert . NotNull ( _management ) ;
79- string requestQueue = _queueName ;
80- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
81- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
82- {
83- var reply = context . Message ( "pong" ) ;
84- return Task . FromResult ( reply ) ;
85- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
86110
87- Assert . NotNull ( rpcServer ) ;
88- string queueReplyTo = $ "queueReplyTo-{ Now } ";
89- IQueueSpecification spec = _management . Queue ( queueReplyTo ) . Exclusive ( true ) . AutoDelete ( true ) ;
90- await spec . DeclareAsync ( ) ;
111+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
112+ . Handler ( PongRpcHandler )
113+ . RequestQueue ( _requestQueueName )
114+ . BuildAsync ( ) ;
115+
116+ IQueueSpecification replyQueueSpec = _management . Queue ( _replyToName )
117+ . Exclusive ( true )
118+ . AutoDelete ( true ) ;
119+ await replyQueueSpec . DeclareAsync ( ) ;
120+
91121 TaskCompletionSource < IMessage > tcs = CreateTaskCompletionSource < IMessage > ( ) ;
92122
93- IConsumer consumer = await _connection . ConsumerBuilder ( ) . Queue ( queueReplyTo ) . MessageHandler (
94- ( context , message ) =>
95- {
96- context . Accept ( ) ;
97- tcs . SetResult ( message ) ;
98- return Task . CompletedTask ;
99- } ) . BuildAndStartAsync ( ) ;
123+ Task MessageHandler ( IContext context , IMessage message )
124+ {
125+ context . Accept ( ) ;
126+ tcs . SetResult ( message ) ;
127+ return Task . CompletedTask ;
128+ }
100129
101- IPublisher publisher = await _connection . PublisherBuilder ( ) . Queue ( requestQueue ) . BuildAsync ( ) ;
102- Assert . NotNull ( publisher ) ;
103- AddressBuilder addressBuilder = new ( ) ;
130+ IConsumer consumer = await _connection . ConsumerBuilder ( )
131+ . Queue ( replyQueueSpec )
132+ . MessageHandler ( MessageHandler )
133+ . BuildAndStartAsync ( ) ;
104134
105- IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( addressBuilder . Queue ( queueReplyTo ) . Address ( ) ) ;
135+ IPublisher publisher = await _connection . PublisherBuilder ( )
136+ . Queue ( _requestQueueName )
137+ . BuildAsync ( ) ;
138+
139+ AddressBuilder addressBuilder = new ( ) ;
140+ string replyToAddress = addressBuilder . Queue ( replyQueueSpec ) . Address ( ) ;
141+ IMessage message = new AmqpMessage ( "test" ) . ReplyTo ( replyToAddress ) ;
106142 PublishResult pr = await publisher . PublishAsync ( message ) ;
107143 Assert . Equal ( OutcomeState . Accepted , pr . Outcome . State ) ;
108144
@@ -122,18 +158,15 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
122158 public async Task RpcServerClientPingPongWithDefault ( )
123159 {
124160 Assert . NotNull ( _connection ) ;
125- Assert . NotNull ( _management ) ;
126- string requestQueue = _queueName ;
127- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
128- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
129- {
130- var reply = context . Message ( "pong" ) ;
131- return Task . FromResult ( reply ) ;
132- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
133- Assert . NotNull ( rpcServer ) ;
134161
135- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
136- . Queue ( requestQueue )
162+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
163+ . Handler ( PongRpcHandler )
164+ . RequestQueue ( _requestQueueName )
165+ . BuildAsync ( ) ;
166+
167+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
168+ . RequestAddress ( )
169+ . Queue ( _requestQueueName )
137170 . RpcClient ( )
138171 . BuildAsync ( ) ;
139172
@@ -153,27 +186,22 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
153186 {
154187 Assert . NotNull ( _connection ) ;
155188 Assert . NotNull ( _management ) ;
156- string requestQueue = _queueName ;
157- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
158- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
159- {
160- var reply = context . Message ( "pong" ) ;
161- return Task . FromResult ( reply ) ;
162- } ) . RequestQueue ( _queueName )
163- . BuildAsync ( ) ;
164- Assert . NotNull ( rpcServer ) ;
165189
166- // custom replyTo queue
167- IQueueInfo replyTo =
168- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
190+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
191+ . Handler ( PongRpcHandler )
192+ . RequestQueue ( _requestQueueName )
193+ . BuildAsync ( ) ;
169194
170- // custom correlationId supplier
171- const string correlationId = "my-correlation-id" ;
195+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
196+ . Exclusive ( true )
197+ . AutoDelete ( true )
198+ . DeclareAsync ( ) ;
172199
173- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
174- . Queue ( requestQueue )
200+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
201+ . RequestAddress ( )
202+ . Queue ( _requestQueueName )
175203 . RpcClient ( )
176- . CorrelationIdSupplier ( ( ) => correlationId )
204+ . CorrelationIdSupplier ( ( ) => _correlationId )
177205 . CorrelationIdExtractor ( message => message . CorrelationId ( ) )
178206 . ReplyToQueue ( replyTo . Name ( ) )
179207 . BuildAsync ( ) ;
@@ -182,7 +210,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
182210
183211 IMessage response = await rpcClient . PublishAsync ( message ) ;
184212 Assert . Equal ( "pong" , response . Body ( ) ) ;
185- Assert . Equal ( correlationId , response . CorrelationId ( ) ) ;
213+ Assert . Equal ( _correlationId , response . CorrelationId ( ) ) ;
186214 await rpcClient . CloseAsync ( ) ;
187215 await rpcServer . CloseAsync ( ) ;
188216 }
@@ -199,34 +227,29 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
199227 {
200228 Assert . NotNull ( _connection ) ;
201229 Assert . NotNull ( _management ) ;
202- string requestQueue = _queueName ;
203- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
204- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
205- {
206- var reply = context . Message ( "pong" ) ;
207- return Task . FromResult ( reply ) ;
208- } ) . RequestQueue ( _queueName )
209- //come from the client
230+
231+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
232+ . Handler ( PongRpcHandler )
233+ . RequestQueue ( _requestQueueName )
210234 . CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
211- // replace the correlation id location with Application properties
212235 . ReplyPostProcessor ( ( reply , replyCorrelationId ) => reply . Property ( "correlationId" ,
213236 replyCorrelationId . ToString ( ) ?? throw new InvalidOperationException ( ) ) )
214237 . BuildAsync ( ) ;
215- Assert . NotNull ( rpcServer ) ;
216238
217- IQueueInfo replyTo =
218- await _management . Queue ( $ "replyTo-{ Now } ") . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
239+ IQueueInfo replyTo = await _management . Queue ( _replyToName )
240+ . Exclusive ( true )
241+ . AutoDelete ( true )
242+ . DeclareAsync ( ) ;
219243
220- // custom correlationId supplier
221- const string correlationId = "my-correlation-id" ;
222244 int correlationIdCounter = 0 ;
223245
224- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
225- . Queue ( requestQueue )
246+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
247+ . RequestAddress ( )
248+ . Queue ( _requestQueueName )
226249 . RpcClient ( )
227250 . ReplyToQueue ( replyTo . Name ( ) )
228251 // replace the correlation id creation with a custom function
229- . CorrelationIdSupplier ( ( ) => $ "{ correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
252+ . CorrelationIdSupplier ( ( ) => $ "{ _correlationId } _{ Interlocked . Increment ( ref correlationIdCounter ) } ")
230253 // The server will reply with the correlation id in application properties
231254 . CorrelationIdExtractor ( message => message . Property ( "correlationId" ) )
232255 // The client will use application properties to set the correlation id
@@ -244,8 +267,8 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
244267 IMessage response = await rpcClient . PublishAsync ( message ) ;
245268 Assert . Equal ( "pong" , response . Body ( ) ) ;
246269 // the server replies with the correlation id in the application properties
247- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
248- Assert . Equal ( $ "{ correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
270+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Property ( "correlationId" ) ) ;
271+ Assert . Equal ( $ "{ _correlationId } _{ i } ", response . Properties ( ) [ "correlationId" ] ) ;
249272 Assert . Single ( response . Properties ( ) ) ;
250273 i ++ ;
251274 }
@@ -258,19 +281,16 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
258281 public async Task RpcClientMultiThreadShouldBeSafe ( )
259282 {
260283 Assert . NotNull ( _connection ) ;
261- Assert . NotNull ( _management ) ;
262-
263- string requestQueue = _queueName ;
264-
265- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
266284 const int messagesToSend = 99 ;
285+
267286 TaskCompletionSource < bool > tcs = CreateTaskCompletionSource ( ) ;
268287 List < IMessage > messagesReceived = [ ] ;
269- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( ( context , request ) =>
288+
289+ Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
270290 {
271291 try
272292 {
273- var reply = context . Message ( "pong" ) ;
293+ IMessage reply = context . Message ( "pong" ) ;
274294 messagesReceived . Add ( request ) ;
275295 return Task . FromResult ( reply ) ;
276296 }
@@ -281,17 +301,19 @@ public async Task RpcClientMultiThreadShouldBeSafe()
281301 tcs . SetResult ( true ) ;
282302 }
283303 }
284- } ) . RequestQueue ( requestQueue ) . BuildAsync ( ) ;
304+ }
285305
286- Assert . NotNull ( rpcServer ) ;
306+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
307+ . Handler ( RpcHandler )
308+ . RequestQueue ( _requestQueueName )
309+ . BuildAsync ( ) ;
287310
288311 IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
289- . Queue ( requestQueue )
312+ . Queue ( _requestQueueName )
290313 . RpcClient ( )
291314 . BuildAsync ( ) ;
292315
293316 List < Task > tasks = [ ] ;
294-
295317 // we simulate a multi-thread environment
296318 // where multiple threads send messages to the server
297319 // and the server replies to each message in a consistent way
@@ -331,26 +353,29 @@ public async Task RpcClientMultiThreadShouldBeSafe()
331353 public async Task RpcClientShouldRaiseTimeoutError ( )
332354 {
333355 Assert . NotNull ( _connection ) ;
334- Assert . NotNull ( _management ) ;
335- string requestQueue = _queueName ;
336- await _management . Queue ( requestQueue ) . Exclusive ( true ) . AutoDelete ( true ) . DeclareAsync ( ) ;
337- IRpcServer rpcServer = await _connection . RpcServerBuilder ( ) . Handler ( async ( context , request ) =>
356+
357+ static async Task < IMessage > RpcHandler ( IRpcServer . IContext context , IMessage request )
338358 {
339359 IMessage reply = context . Message ( "pong" ) ;
340360 object millisecondsToWait = request . Property ( "wait" ) ;
341361 await Task . Delay ( TimeSpan . FromMilliseconds ( ( int ) millisecondsToWait ) ) ;
342362 return reply ;
343- } ) . RequestQueue ( _queueName ) . BuildAsync ( ) ;
344- Assert . NotNull ( rpcServer ) ;
363+ }
345364
346- IRpcClient rpcClient = await _connection . RpcClientBuilder ( ) . RequestAddress ( )
347- . Queue ( requestQueue )
365+ IRpcServer rpcServer = await _connection . RpcServerBuilder ( )
366+ . Handler ( RpcHandler )
367+ . RequestQueue ( _requestQueueName )
368+ . BuildAsync ( ) ;
369+
370+ IRpcClient rpcClient = await _connection . RpcClientBuilder ( )
371+ . RequestAddress ( )
372+ . Queue ( _requestQueueName )
348373 . RpcClient ( )
349374 . Timeout ( TimeSpan . FromMilliseconds ( 300 ) )
350375 . BuildAsync ( ) ;
351376
352- IMessage reply = await rpcClient . PublishAsync (
353- new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ) ;
377+ IMessage msg = new AmqpMessage ( "ping" ) . Property ( "wait" , 1 ) ;
378+ IMessage reply = await rpcClient . PublishAsync ( msg ) ;
354379 Assert . Equal ( "pong" , reply . Body ( ) ) ;
355380
356381 await Assert . ThrowsAsync < TimeoutException > ( ( ) => rpcClient . PublishAsync (
@@ -359,5 +384,11 @@ await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
359384 await rpcClient . CloseAsync ( ) ;
360385 await rpcServer . CloseAsync ( ) ;
361386 }
387+
388+ private static Task < IMessage > PongRpcHandler ( IRpcServer . IContext context , IMessage request )
389+ {
390+ IMessage reply = context . Message ( "pong" ) ;
391+ return Task . FromResult ( reply ) ;
392+ }
362393 }
363394}
0 commit comments