1111-include_lib (" eunit/include/eunit.hrl" ).
1212-compile (export_all ).
1313
14- -import (shovel_test_utils , [await_credit /1 ]).
14+ -import (shovel_test_utils , [with_amqp10_session /2 ,
15+ amqp10_publish /3 , amqp10_publish /5 ,
16+ amqp10_expect_empty /2 ,
17+ await_amqp10_event /3 , amqp10_expect_one /2 ,
18+ amqp10_expect_count /3 , amqp10_publish /4 ,
19+ amqp10_publish_expect /5 ,
20+ await_autodelete /2 ]).
1521
1622all () ->
1723 [
@@ -86,7 +92,7 @@ end_per_testcase(Testcase, Config) ->
8692simple (Config ) ->
8793 Src = ? config (srcq , Config ),
8894 Dest = ? config (destq , Config ),
89- with_session (Config ,
95+ with_amqp10_session (Config ,
9096 fun (Sess ) ->
9197 test_amqp10_destination (Config , Src , Dest , Sess , <<" amqp10" >>,
9298 <<" src-address" >>)
@@ -95,7 +101,7 @@ simple(Config) ->
95101simple_amqp10_dest (Config ) ->
96102 Src = ? config (srcq , Config ),
97103 Dest = ? config (destq , Config ),
98- with_session (Config ,
104+ with_amqp10_session (Config ,
99105 fun (Sess ) ->
100106 test_amqp10_destination (Config , Src , Dest , Sess , <<" amqp091" >>,
101107 <<" src-queue" >>)
@@ -105,7 +111,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
105111 Dest = ? config (destq , Config ),
106112 Src = ? config (srcq , Config ),
107113 TmpQ = <<" tmp" >>,
108- with_session (Config ,
114+ with_amqp10_session (Config ,
109115 fun (Sess ) ->
110116 {ok , LinkPair } = rabbitmq_amqp_client :attach_management_link_pair_sync (Sess , <<" my link pair" >>),
111117 {ok , _ } = rabbitmq_amqp_client :declare_queue (LinkPair , TmpQ ,
@@ -118,10 +124,10 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
118124 unsettled ,
119125 unsettled_state ),
120126 ok = await_amqp10_event (link , Sender , attached ),
121- expect_empty (Sess , TmpQ ),
127+ amqp10_expect_empty (Sess , TmpQ ),
122128 test_amqp10_destination (Config , Src , Dest , Sess , <<" amqp091" >>, <<" src-queue" >>),
123129 % % publish to tmp, it should be dead-lettered to src and then shovelled to dest
124- _ = publish_expect (Sess , TmpQ , Dest , <<" tag1 " >>, << " hello " >> )
130+ _ = amqp10_publish_expect (Sess , TmpQ , Dest , <<" hello " >>, 1 )
125131 end ).
126132
127133test_amqp10_destination (Config , Src , Dest , Sess , Protocol , ProtocolSrc ) ->
@@ -156,7 +162,7 @@ test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
156162 [{<<" x-message-ann-key" >>,
157163 <<" message-ann-value" >>}]
158164 end }]),
159- Msg = publish_expect (Sess , Src , Dest , <<" tag1 " >>, << " hello " >> ),
165+ [ Msg ] = amqp10_publish_expect (Sess , Src , Dest , <<" hello " >>, 1 ),
160166 AppProps = amqp10_msg :application_properties (Msg ),
161167 Anns = amqp10_msg :message_annotations (Msg ),
162168 % % We no longer add/override properties, application properties or
@@ -176,7 +182,7 @@ simple_amqp10_src(Config) ->
176182 MapConfig = ? config (map_config , Config ),
177183 Src = ? config (srcq , Config ),
178184 Dest = ? config (destq , Config ),
179- with_session (Config ,
185+ with_amqp10_session (Config ,
180186 fun (Sess ) ->
181187 shovel_test_utils :set_param (
182188 Config ,
@@ -192,8 +198,7 @@ simple_amqp10_src(Config) ->
192198 _ -> [{<<" cluster_id" >>, <<" x" >>}]
193199 end }
194200 ]),
195- _Msg = publish_expect (Sess , Src , Dest , <<" tag1" >>,
196- <<" hello" >>),
201+ _Msg = amqp10_publish_expect (Sess , Src , Dest , <<" hello" >>, 1 ),
197202 % the fidelity loss is quite high when consuming using the amqp10
198203 % plugin. For example custom headers aren't current translated.
199204 % This isn't due to the shovel though.
@@ -204,7 +209,7 @@ amqp10_to_amqp091_application_properties(Config) ->
204209 MapConfig = ? config (map_config , Config ),
205210 Src = ? config (srcq , Config ),
206211 Dest = ? config (destq , Config ),
207- with_session (Config ,
212+ with_amqp10_session (Config ,
208213 fun (Sess ) ->
209214 shovel_test_utils :set_param (
210215 Config ,
@@ -240,25 +245,25 @@ change_definition(Config) ->
240245 Src = ? config (srcq , Config ),
241246 Dest = ? config (destq , Config ),
242247 Dest2 = ? config (destq2 , Config ),
243- with_session (Config ,
248+ with_amqp10_session (Config ,
244249 fun (Sess ) ->
245250 shovel_test_utils :set_param (Config , <<" test" >>,
246251 [{<<" src-address" >>, Src },
247252 {<<" src-protocol" >>, <<" amqp10" >>},
248253 {<<" dest-protocol" >>, <<" amqp10" >>},
249254 {<<" dest-address" >>, Dest }]),
250- publish_expect (Sess , Src , Dest , <<" tag2 " >>,<< " hello " >> ),
255+ amqp10_publish_expect (Sess , Src , Dest , <<" hello1 " >>, 1 ),
251256 shovel_test_utils :set_param (Config , <<" test" >>,
252257 [{<<" src-address" >>, Src },
253258 {<<" src-protocol" >>, <<" amqp10" >>},
254259 {<<" dest-protocol" >>, <<" amqp10" >>},
255260 {<<" dest-address" >>, Dest2 }]),
256- publish_expect (Sess , Src , Dest2 , <<" tag3 " >>, << " hello " >> ),
257- expect_empty (Sess , Dest ),
261+ amqp10_publish_expect (Sess , Src , Dest2 , <<" hello2 " >>, 1 ),
262+ amqp10_expect_empty (Sess , Dest ),
258263 shovel_test_utils :clear_param (Config , <<" test" >>),
259- publish_expect (Sess , Src , Src , <<" tag4 " >>, << " hello2 " >> ),
260- expect_empty (Sess , Dest ),
261- expect_empty (Sess , Dest2 )
264+ amqp10_publish_expect (Sess , Src , Src , <<" hello3 " >>, 1 ),
265+ amqp10_expect_empty (Sess , Dest ),
266+ amqp10_expect_empty (Sess , Dest2 )
262267 end ).
263268
264269autodelete_amqp091_src_on_confirm (Config ) ->
@@ -282,13 +287,13 @@ autodelete_amqp091_dest_on_publish(Config) ->
282287 ok .
283288
284289autodelete_case (Config , Args , CaseFun ) ->
285- with_session (Config , CaseFun (Config , Args )).
290+ with_amqp10_session (Config , CaseFun (Config , Args )).
286291
287292autodelete_do (Config , {AckMode , After , ExpSrc , ExpDest }) ->
288293 Src = ? config (srcq , Config ),
289294 Dest = ? config (destq , Config ),
290295 fun (Session ) ->
291- publish_count (Session , Src , <<" hello" >>, 100 ),
296+ amqp10_publish (Session , Src , <<" hello" >>, 100 ),
292297 shovel_test_utils :set_param_nowait (
293298 Config ,
294299 <<" test" >>, [{<<" src-address" >>, Src },
@@ -300,15 +305,15 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
300305 {<<" ack-mode" >>, AckMode }
301306 ]),
302307 await_autodelete (Config , <<" test" >>),
303- expect_count (Session , Dest , ExpDest ),
304- expect_count (Session , Src , ExpSrc )
308+ amqp10_expect_count (Session , Dest , ExpDest ),
309+ amqp10_expect_count (Session , Src , ExpSrc )
305310 end .
306311
307312autodelete_amqp091_src (Config , {AckMode , After , ExpSrc , ExpDest }) ->
308313 Src = ? config (srcq , Config ),
309314 Dest = ? config (destq , Config ),
310315 fun (Session ) ->
311- publish_count (Session , Src , <<" hello" >>, 100 ),
316+ amqp10_publish (Session , Src , <<" hello" >>, 100 ),
312317 shovel_test_utils :set_param_nowait (
313318 Config ,
314319 <<" test" >>, [{<<" src-queue" >>, Src },
@@ -320,15 +325,15 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
320325 {<<" ack-mode" >>, AckMode }
321326 ]),
322327 await_autodelete (Config , <<" test" >>),
323- expect_count (Session , Dest , ExpDest ),
324- expect_count (Session , Src , ExpSrc )
328+ amqp10_expect_count (Session , Dest , ExpDest ),
329+ amqp10_expect_count (Session , Src , ExpSrc )
325330 end .
326331
327332autodelete_amqp091_dest (Config , {AckMode , After , ExpSrc , ExpDest }) ->
328333 Src = ? config (srcq , Config ),
329334 Dest = ? config (destq , Config ),
330335 fun (Session ) ->
331- publish_count (Session , Src , <<" hello" >>, 100 ),
336+ amqp10_publish (Session , Src , <<" hello" >>, 100 ),
332337 shovel_test_utils :set_param_nowait (
333338 Config ,
334339 <<" test" >>, [{<<" src-address" >>, Src },
@@ -340,8 +345,8 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
340345 {<<" ack-mode" >>, AckMode }
341346 ]),
342347 await_autodelete (Config , <<" test" >>),
343- expect_count (Session , Dest , ExpDest ),
344- expect_count (Session , Src , ExpSrc )
348+ amqp10_expect_count (Session , Dest , ExpDest ),
349+ amqp10_expect_count (Session , Src , ExpSrc )
345350 end .
346351
347352test_amqp10_delete_after_queue_length (Config ) ->
@@ -364,27 +369,6 @@ test_amqp10_delete_after_queue_length(Config) ->
364369 ? assertMatch (match , re :run (Msg , " Validation failed.*" , [{capture , none }])).
365370
366371% %----------------------------------------------------------------------------
367-
368- with_session (Config , Fun ) ->
369- Hostname = ? config (rmq_hostname , Config ),
370- Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
371- {ok , Conn } = amqp10_client :open_connection (Hostname , Port ),
372- {ok , Sess } = amqp10_client :begin_session (Conn ),
373- Fun (Sess ),
374- amqp10_client :close_connection (Conn ),
375- ok .
376-
377- publish (Sender , Tag , Payload ) when is_binary (Payload ) ->
378- Headers = #{durable => true },
379- Msg = amqp10_msg :set_headers (Headers ,
380- amqp10_msg :new (Tag , Payload , false )),
381- ok = amqp10_client :send_msg (Sender , Msg ),
382- receive
383- {amqp10_disposition , {accepted , Tag }} -> ok
384- after 3000 ->
385- exit (publish_disposition_not_received )
386- end .
387-
388372publish (Sender , Msg ) ->
389373 ok = amqp10_client :send_msg (Sender , Msg ),
390374 Tag = amqp10_msg :delivery_tag (Msg ),
@@ -394,121 +378,16 @@ publish(Sender, Msg) ->
394378 exit (publish_disposition_not_received )
395379 end .
396380
397- publish_expect (Session , Source , Dest , Tag , Payload ) ->
398- LinkName = <<" dynamic-sender-" , Dest /binary >>,
399- {ok , Sender } = amqp10_client :attach_sender_link (Session , LinkName , Source ,
400- unsettled , unsettled_state ),
401- ok = await_amqp10_event (link , Sender , attached ),
402- await_credit (Sender ),
403- publish (Sender , Tag , Payload ),
404- amqp10_client :detach_link (Sender ),
405- expect_one (Session , Dest ).
406-
407381publish_expect_msg (Session , Source , Dest , Msg ) ->
408382 LinkName = <<" dynamic-sender-" , Dest /binary >>,
409383 {ok , Sender } = amqp10_client :attach_sender_link (Session , LinkName , Source ,
410384 unsettled , unsettled_state ),
411385 ok = await_amqp10_event (link , Sender , attached ),
412386 publish (Sender , Msg ),
413387 amqp10_client :detach_link (Sender ),
414- expect_one (Session , Dest ).
415-
416- await_amqp10_event (On , Ref , Evt ) ->
417- receive
418- {amqp10_event , {On , Ref , Evt }} -> ok
419- after 5000 ->
420- exit ({amqp10_event_timeout , On , Ref , Evt })
421- end .
422-
423- expect_one (Session , Dest ) ->
424- LinkName = <<" dynamic-receiver-" , Dest /binary >>,
425- {ok , Receiver } = amqp10_client :attach_receiver_link (Session , LinkName ,
426- Dest , settled ,
427- unsettled_state ),
428- ok = amqp10_client :flow_link_credit (Receiver , 1 , never ),
429- Msg = expect (Receiver ),
430- amqp10_client :detach_link (Receiver ),
431- Msg .
432-
433- expect (Receiver ) ->
434- receive
435- {amqp10_msg , Receiver , InMsg } ->
436- InMsg
437- after 4000 ->
438- throw (timeout_in_expect_waiting_for_delivery )
439- end .
440-
441- expect_empty (Session , Dest ) ->
442- {ok , Receiver } = amqp10_client :attach_receiver_link (Session ,
443- <<" dynamic-receiver" >>,
444- Dest , settled ,
445- unsettled_state ),
446- % probably good enough given we don't currently have a means of
447- % echoing flow state
448- {error , timeout } = amqp10_client :get_msg (Receiver , 250 ),
449- amqp10_client :detach_link (Receiver ).
450-
451- publish_count (Session , Address , Payload , Count ) ->
452- LinkName = <<" dynamic-sender-" , Address /binary >>,
453- {ok , Sender } = amqp10_client :attach_sender_link (Session , LinkName ,
454- Address , unsettled ,
455- unsettled_state ),
456- ok = await_amqp10_event (link , Sender , attached ),
457- [begin
458- Tag = rabbit_data_coercion :to_binary (I ),
459- publish (Sender , Tag , <<Payload /binary , Tag /binary >>)
460- end || I <- lists :seq (1 , Count )],
461- amqp10_client :detach_link (Sender ).
462-
463- expect_count (Session , Address , Count ) ->
464- {ok , Receiver } = amqp10_client :attach_receiver_link (Session ,
465- <<" dynamic-receiver" ,
466- Address /binary >>,
467- Address , settled ,
468- unsettled_state ),
469- ok = amqp10_client :flow_link_credit (Receiver , Count , never ),
470- [begin
471- expect (Receiver )
472- end || _ <- lists :seq (1 , Count )],
473- expect_empty (Session , Address ),
474- amqp10_client :detach_link (Receiver ).
475-
476-
477- invalid_param (Config , Value , User ) ->
478- {error_string , _ } = rabbit_ct_broker_helpers :rpc (Config , 0 ,
479- rabbit_runtime_parameters , set ,
480- [<<" /" >>, <<" shovel" >>, <<" invalid" >>, Value , User ]).
481-
482- valid_param (Config , Value , User ) ->
483- rabbit_ct_broker_helpers :rpc (Config , 0 ,
484- ? MODULE , valid_param1 , [Config , Value , User ]).
485-
486- valid_param1 (_Config , Value , User ) ->
487- ok = rabbit_runtime_parameters :set (
488- <<" /" >>, <<" shovel" >>, <<" a" >>, Value , User ),
489- ok = rabbit_runtime_parameters :clear (<<" /" >>, <<" shovel" >>, <<" a" >>, <<" acting-user" >>).
490-
491- invalid_param (Config , Value ) -> invalid_param (Config , Value , none ).
492- valid_param (Config , Value ) -> valid_param (Config , Value , none ).
388+ amqp10_expect_one (Session , Dest ).
493389
494390lookup_user (Config , Name ) ->
495391 {ok , User } = rabbit_ct_broker_helpers :rpc (Config , 0 ,
496392 rabbit_access_control , check_user_login , [Name , []]),
497393 User .
498-
499- await_autodelete (Config , Name ) ->
500- rabbit_ct_broker_helpers :rpc (Config , 0 ,
501- ? MODULE , await_autodelete1 , [Config , Name ], 10000 ).
502-
503- await_autodelete1 (_Config , Name ) ->
504- shovel_test_utils :await (
505- fun () -> not lists :member (Name , shovels_from_parameters ()) end ),
506- shovel_test_utils :await (
507- fun () ->
508- not lists :member (Name ,
509- shovel_test_utils :shovels_from_status ())
510- end ).
511-
512- shovels_from_parameters () ->
513- L = rabbit_runtime_parameters :list (<<" /" >>, <<" shovel" >>),
514- [rabbit_misc :pget (name , Shovel ) || Shovel <- L ].
0 commit comments