2525 amqp10_publish_msg /4 ,
2626 amqp10_expect_one /2 ,
2727 amqp10_expect_count /3 ,
28- make_uri /3 ,
28+ make_uri /2 , make_uri /3 ,
29+ make_uri /5 ,
2930 await_no_shovel /2
3031 ]).
3132
@@ -66,6 +67,16 @@ tests() ->
6667 simple_quorum_no_ack ,
6768 simple_quorum_on_confirm ,
6869 simple_quorum_on_publish ,
70+ % % Credit flow tests are just simple tests that publish a high
71+ % % number of messages, on the attempt to trigger the different
72+ % % credit flow mechanisms. Having the same test twice (simple/credit)
73+ % % helps to isolate the problem.
74+ credit_flow_classic_no_ack ,
75+ credit_flow_classic_on_confirm ,
76+ credit_flow_classic_on_publish ,
77+ credit_flow_quorum_no_ack ,
78+ credit_flow_quorum_on_confirm ,
79+ credit_flow_quorum_on_publish ,
6980 autodelete_classic_on_confirm ,
7081 autodelete_quorum_on_confirm ,
7182 autodelete_classic_on_publish ,
@@ -84,8 +95,10 @@ tests() ->
8495 autodelete_classic_on_publish_with_rejections ,
8596 autodelete_quorum_on_publish_with_rejections ,
8697 no_vhost_access ,
98+ no_user_access ,
8799 application_properties ,
88- delete_src_queue
100+ delete_src_queue ,
101+ shovel_status
89102 ].
90103
91104% % -------------------------------------------------------------------
@@ -238,24 +251,42 @@ simple(Config) ->
238251 end ).
239252
240253simple_classic_no_ack (Config ) ->
241- simple_queue_type_ack_mode (Config , <<" classic" >>, <<" no-ack" >>).
254+ simple_queue_type_ack_mode (Config , <<" classic" >>, <<" no-ack" >>, 10 ).
242255
243256simple_classic_on_confirm (Config ) ->
244- simple_queue_type_ack_mode (Config , <<" classic" >>, <<" on-confirm" >>).
257+ simple_queue_type_ack_mode (Config , <<" classic" >>, <<" on-confirm" >>, 10 ).
245258
246259simple_classic_on_publish (Config ) ->
247- simple_queue_type_ack_mode (Config , <<" classic" >>, <<" on-publish" >>).
260+ simple_queue_type_ack_mode (Config , <<" classic" >>, <<" on-publish" >>, 10 ).
248261
249262simple_quorum_no_ack (Config ) ->
250- simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" no-ack" >>).
263+ simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" no-ack" >>, 10 ).
251264
252265simple_quorum_on_confirm (Config ) ->
253- simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" on-confirm" >>).
266+ simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" on-confirm" >>, 10 ).
254267
255268simple_quorum_on_publish (Config ) ->
256- simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" on-publish" >>).
269+ simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" on-publish" >>, 10 ).
257270
258- simple_queue_type_ack_mode (Config , Type , AckMode ) ->
271+ credit_flow_classic_no_ack (Config ) ->
272+ simple_queue_type_ack_mode (Config , <<" classic" >>, <<" no-ack" >>, 5000 ).
273+
274+ credit_flow_classic_on_confirm (Config ) ->
275+ simple_queue_type_ack_mode (Config , <<" classic" >>, <<" on-confirm" >>, 5000 ).
276+
277+ credit_flow_classic_on_publish (Config ) ->
278+ simple_queue_type_ack_mode (Config , <<" classic" >>, <<" on-publish" >>, 5000 ).
279+
280+ credit_flow_quorum_no_ack (Config ) ->
281+ simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" no-ack" >>, 5000 ).
282+
283+ credit_flow_quorum_on_confirm (Config ) ->
284+ simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" on-confirm" >>, 5000 ).
285+
286+ credit_flow_quorum_on_publish (Config ) ->
287+ simple_queue_type_ack_mode (Config , <<" quorum" >>, <<" on-publish" >>, 5000 ).
288+
289+ simple_queue_type_ack_mode (Config , Type , AckMode , NMsgs ) ->
259290 Src = ? config (srcq , Config ),
260291 Dest = ? config (destq , Config ),
261292 with_amqp10_session (
@@ -266,7 +297,7 @@ simple_queue_type_ack_mode(Config, Type, AckMode) ->
266297 ExtraArgs = [{<<" ack-mode" >>, AckMode }],
267298 ShovelArgs = ? config (shovel_args , Config ) ++ ExtraArgs ,
268299 set_param (Config , ? PARAM , ShovelArgs ),
269- amqp10_publish_expect (Sess , Src , Dest , <<" hello" >>, 10 )
300+ amqp10_publish_expect (Sess , Src , Dest , <<" hello" >>, NMsgs )
270301 end ).
271302
272303autodelete_classic_on_confirm_no_transfer (Config ) ->
@@ -297,7 +328,7 @@ autodelete_no_ack(Config) ->
297328 ExtraArgs = [{<<" ack-mode" >>, <<" no-ack" >>},
298329 {<<" src-delete-after" >>, 100 }],
299330 ShovelArgs = ? config (shovel_args , Config ) ++ ExtraArgs ,
300- Uri = shovel_test_utils : make_uri (Config , 0 ),
331+ Uri = make_uri (Config , 0 ),
301332 ? assertMatch ({error_string , _ },
302333 rabbit_ct_broker_helpers :rpc (
303334 Config , 0 , rabbit_runtime_parameters , set ,
@@ -405,6 +436,16 @@ no_vhost_access(Config) ->
405436 [<<" /" >>, <<" shovel" >>, ? PARAM , ShovelArgs , none ]),
406437 await_no_shovel (Config , ? PARAM ).
407438
439+ no_user_access (Config ) ->
440+ Uri = make_uri (
441+ Config , 0 , <<" guest" >>, <<" forgotmypassword" >>, <<" %2F" >>),
442+ ShovelArgs = [{<<" src-uri" >>, Uri },
443+ {<<" dest-uri" >>, [Uri ]}] ++ ? config (shovel_args , Config ),
444+ ok = rabbit_ct_broker_helpers :rpc (
445+ Config , 0 , rabbit_runtime_parameters , set ,
446+ [<<" /" >>, <<" shovel" >>, ? PARAM , ShovelArgs , none ]),
447+ await_no_shovel (Config , ? PARAM ).
448+
408449application_properties (Config ) ->
409450 Src = ? config (srcq , Config ),
410451 Dest = ? config (destq , Config ),
@@ -445,13 +486,31 @@ delete_src_queue(Config) ->
445486 Config , 0 ,
446487 [" list_queues" , " name" , " messages" , " --no-table-headers" ])),
447488 45_000 ),
448- ? awaitMatch ([{_Name , dynamic , {running , _ }, _ , _ }],
489+ ? awaitMatch ([{_Name , dynamic , {running , _ }, #{ forwarded : = 0 } , _ }],
449490 rabbit_ct_broker_helpers :rpc (Config , 0 ,
450491 rabbit_shovel_status , status , []),
451492 30000 ),
452493 _ = amqp10_publish_expect (Sess , Src , Dest , <<" hello" >>, 1 )
453494 end ).
454495
496+ shovel_status (Config ) ->
497+ Src = ? config (srcq , Config ),
498+ Dest = ? config (destq , Config ),
499+ SrcProtocol = ? config (src_protocol , Config ),
500+ DestProtocol = ? config (dest_protocol , Config ),
501+ set_param (Config , ? PARAM , ? config (shovel_args , Config )),
502+ Status = rabbit_ct_broker_helpers :rpc (Config , 0 ,
503+ rabbit_shovel_status , status , []),
504+ ? assertMatch ([{_ , dynamic , {running , _ }, _ , _ }], Status ),
505+ [{_ , dynamic , {running , Info }, _ , _ }] = Status ,
506+ ? assertMatch (SrcProtocol , proplists :get_value (src_protocol , Info )),
507+ ? assertMatch (DestProtocol , proplists :get_value (dest_protocol , Info )),
508+ SrcAddress = binary_to_atom (binary :replace (? config (src_address , Config ), <<" -" >>, <<" _" >>)),
509+ DestAddress = binary_to_atom (binary :replace (? config (dest_address , Config ), <<" -" >>, <<" _" >>)),
510+ ? assertMatch (Src , proplists :get_value (SrcAddress , Info )),
511+ ? assertMatch (Dest , proplists :get_value (DestAddress , Info )),
512+ ok .
513+
455514% %----------------------------------------------------------------------------
456515maybe_skip_local_protocol (Config ) ->
457516 [Node ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
0 commit comments