1414
1515-compile (export_all ).
1616
17+ -import (rabbit_ct_helpers , [eventually /3 ]).
1718-import (shovel_test_utils , [await_autodelete /2 ,
1819 set_param /3 ,
1920 set_param_nowait /3 ,
@@ -69,7 +70,11 @@ tests() ->
6970 autodelete_classic_on_confirm_no_transfer ,
7071 autodelete_quorum_on_confirm_no_transfer ,
7172 autodelete_classic_on_publish_no_transfer ,
72- autodelete_quorum_on_publish_no_transfer
73+ autodelete_quorum_on_publish_no_transfer ,
74+ autodelete_classic_on_confirm_with_rejections ,
75+ autodelete_quorum_on_confirm_with_rejections ,
76+ autodelete_classic_on_publish_with_rejections ,
77+ autodelete_quorum_on_publish_with_rejections
7378 ].
7479
7580% % -------------------------------------------------------------------
@@ -311,7 +316,80 @@ autodelete(Config, Type, AckMode, After, ExpSrc, ExpDest) ->
311316 amqp10_expect_count (Sess , Dest , ExpDest )
312317 end ).
313318
319+ <<<<<<< HEAD
314320>>>>>>> d5f9ff27b (Shovel tests : tests for autodelete common to all protocols )
321+ =======
322+ autodelete_classic_on_confirm_with_rejections (Config ) ->
323+ autodelete_with_rejections (Config , <<" classic" >>, <<" on-confirm" >>, 5 , 5 ).
324+
325+ autodelete_quorum_on_confirm_with_rejections (Config ) ->
326+ ExpSrc = fun (ExpDest ) -> 100 - ExpDest end ,
327+ autodelete_with_quorum_rejections (Config , <<" on-confirm" >>, ExpSrc ).
328+
329+ autodelete_classic_on_publish_with_rejections (Config ) ->
330+ autodelete_with_rejections (Config , <<" classic" >>, <<" on-publish" >>, 0 , 5 ).
331+
332+ autodelete_quorum_on_publish_with_rejections (Config ) ->
333+ ExpSrc = fun (_ ) -> 0 end ,
334+ autodelete_with_quorum_rejections (Config , <<" on-publish" >>, ExpSrc ).
335+
336+ autodelete_with_rejections (Config , Type , AckMode , ExpSrc , ExpDest ) ->
337+ Src = ? config (srcq , Config ),
338+ Dest = ? config (destq , Config ),
339+ with_amqp10_session (
340+ Config ,
341+ fun (Sess ) ->
342+ amqp10_declare_queue (Sess , Src , #{<<" x-queue-type" >> => {utf8 , Type }}),
343+ amqp10_declare_queue (Sess , Dest , #{<<" x-queue-type" >> => {utf8 , Type },
344+ <<" x-overflow" >> => {utf8 , <<" reject-publish" >>},
345+ <<" x-max-length" >> => {ulong , 5 }
346+ }),
347+ amqp10_publish (Sess , Src , <<" hello" >>, 10 ),
348+ ExtraArgs = [{<<" ack-mode" >>, AckMode },
349+ {<<" src-delete-after" >>, 10 }],
350+ ShovelArgs = ? config (shovel_args , Config ) ++ ExtraArgs ,
351+ set_param_nowait (Config , ? PARAM , ShovelArgs ),
352+ await_autodelete (Config , ? PARAM ),
353+ Expected = lists :sort ([[Src , integer_to_binary (ExpSrc )],
354+ [Dest , integer_to_binary (ExpDest )]]),
355+ ? awaitMatch (
356+ Expected ,
357+ lists :sort (rabbit_ct_broker_helpers :rabbitmqctl_list (
358+ Config , 0 ,
359+ [" list_queues" , " name" , " messages" , " --no-table-headers" ])),
360+ 45_000 ),
361+ amqp10_expect_count (Sess , Src , ExpSrc ),
362+ amqp10_expect_count (Sess , Dest , ExpDest )
363+ end ).
364+
365+ autodelete_with_quorum_rejections (Config , AckMode , ExpSrcFun ) ->
366+ Src = ? config (srcq , Config ),
367+ Dest = ? config (destq , Config ),
368+ Type = <<" quorum" >>,
369+ with_amqp10_session (
370+ Config ,
371+ fun (Sess ) ->
372+ amqp10_declare_queue (Sess , Src , #{<<" x-queue-type" >> => {utf8 , Type }}),
373+ amqp10_declare_queue (Sess , Dest , #{<<" x-queue-type" >> => {utf8 , Type },
374+ <<" x-overflow" >> => {utf8 , <<" reject-publish" >>},
375+ <<" x-max-length" >> => {ulong , 5 }
376+ }),
377+ amqp10_publish (Sess , Src , <<" hello" >>, 100 ),
378+ ExtraArgs = [{<<" ack-mode" >>, AckMode },
379+ {<<" src-delete-after" >>, 50 }],
380+ ShovelArgs = ? config (shovel_args , Config ) ++ ExtraArgs ,
381+ set_param_nowait (Config , ? PARAM , ShovelArgs ),
382+ await_autodelete (Config , ? PARAM ),
383+ eventually (
384+ ? _assert (
385+ list_queue_messages (Config , Dest ) >= 5 ),
386+ 1000 , 45 ),
387+ ExpDest = list_queue_messages (Config , Dest ),
388+ amqp10_expect_count (Sess , Src , ExpSrcFun (ExpDest )),
389+ amqp10_expect_count (Sess , Dest , ExpDest )
390+ end ).
391+
392+ >>>>>>> 059813 a83 (Shovel : more common testcases )
315393% %----------------------------------------------------------------------------
316394maybe_skip_local_protocol (Config ) ->
317395 [Node ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
@@ -322,3 +400,12 @@ maybe_skip_local_protocol(Config) ->
322400 _ ->
323401 {skip , " This group requires rabbitmq_4.0.0 feature flag" }
324402 end .
403+
404+ list_queue_messages (Config , QName ) ->
405+ List = rabbit_ct_broker_helpers :rabbitmqctl_list (
406+ Config , 0 ,
407+ [" list_queues" , " name" , " messages" , " --no-table-headers" ]),
408+ [[_ , Messages ]] = lists :filter (fun ([Q , _ ]) ->
409+ Q == QName
410+ end , List ),
411+ binary_to_integer (Messages ).
0 commit comments