1414
1515-compile (export_all ).
1616
17+ -import (rabbit_ct_helpers , [eventually /3 ]).
1718-import (shovel_test_utils , [await_autodelete /2 ,
1819 set_param /3 ,
1920 set_param_nowait /3 ,
@@ -69,7 +70,11 @@ tests() ->
6970 autodelete_classic_on_confirm_no_transfer ,
7071 autodelete_quorum_on_confirm_no_transfer ,
7172 autodelete_classic_on_publish_no_transfer ,
72- autodelete_quorum_on_publish_no_transfer
73+ autodelete_quorum_on_publish_no_transfer ,
74+ autodelete_classic_on_confirm_with_rejections ,
75+ autodelete_quorum_on_confirm_with_rejections ,
76+ autodelete_classic_on_publish_with_rejections ,
77+ autodelete_quorum_on_publish_with_rejections
7378 ].
7479
7580% % -------------------------------------------------------------------
@@ -304,6 +309,76 @@ autodelete(Config, Type, AckMode, After, ExpSrc, ExpDest) ->
304309 amqp10_expect_count (Sess , Dest , ExpDest )
305310 end ).
306311
312+ autodelete_classic_on_confirm_with_rejections (Config ) ->
313+ autodelete_with_rejections (Config , <<" classic" >>, <<" on-confirm" >>, 5 , 5 ).
314+
315+ autodelete_quorum_on_confirm_with_rejections (Config ) ->
316+ ExpSrc = fun (ExpDest ) -> 100 - ExpDest end ,
317+ autodelete_with_quorum_rejections (Config , <<" on-confirm" >>, ExpSrc ).
318+
319+ autodelete_classic_on_publish_with_rejections (Config ) ->
320+ autodelete_with_rejections (Config , <<" classic" >>, <<" on-publish" >>, 0 , 5 ).
321+
322+ autodelete_quorum_on_publish_with_rejections (Config ) ->
323+ ExpSrc = fun (_ ) -> 0 end ,
324+ autodelete_with_quorum_rejections (Config , <<" on-publish" >>, ExpSrc ).
325+
326+ autodelete_with_rejections (Config , Type , AckMode , ExpSrc , ExpDest ) ->
327+ Src = ? config (srcq , Config ),
328+ Dest = ? config (destq , Config ),
329+ with_amqp10_session (
330+ Config ,
331+ fun (Sess ) ->
332+ amqp10_declare_queue (Sess , Src , #{<<" x-queue-type" >> => {utf8 , Type }}),
333+ amqp10_declare_queue (Sess , Dest , #{<<" x-queue-type" >> => {utf8 , Type },
334+ <<" x-overflow" >> => {utf8 , <<" reject-publish" >>},
335+ <<" x-max-length" >> => {ulong , 5 }
336+ }),
337+ amqp10_publish (Sess , Src , <<" hello" >>, 10 ),
338+ ExtraArgs = [{<<" ack-mode" >>, AckMode },
339+ {<<" src-delete-after" >>, 10 }],
340+ ShovelArgs = ? config (shovel_args , Config ) ++ ExtraArgs ,
341+ set_param_nowait (Config , ? PARAM , ShovelArgs ),
342+ await_autodelete (Config , ? PARAM ),
343+ Expected = lists :sort ([[Src , integer_to_binary (ExpSrc )],
344+ [Dest , integer_to_binary (ExpDest )]]),
345+ ? awaitMatch (
346+ Expected ,
347+ lists :sort (rabbit_ct_broker_helpers :rabbitmqctl_list (
348+ Config , 0 ,
349+ [" list_queues" , " name" , " messages" , " --no-table-headers" ])),
350+ 45_000 ),
351+ amqp10_expect_count (Sess , Src , ExpSrc ),
352+ amqp10_expect_count (Sess , Dest , ExpDest )
353+ end ).
354+
355+ autodelete_with_quorum_rejections (Config , AckMode , ExpSrcFun ) ->
356+ Src = ? config (srcq , Config ),
357+ Dest = ? config (destq , Config ),
358+ Type = <<" quorum" >>,
359+ with_amqp10_session (
360+ Config ,
361+ fun (Sess ) ->
362+ amqp10_declare_queue (Sess , Src , #{<<" x-queue-type" >> => {utf8 , Type }}),
363+ amqp10_declare_queue (Sess , Dest , #{<<" x-queue-type" >> => {utf8 , Type },
364+ <<" x-overflow" >> => {utf8 , <<" reject-publish" >>},
365+ <<" x-max-length" >> => {ulong , 5 }
366+ }),
367+ amqp10_publish (Sess , Src , <<" hello" >>, 100 ),
368+ ExtraArgs = [{<<" ack-mode" >>, AckMode },
369+ {<<" src-delete-after" >>, 50 }],
370+ ShovelArgs = ? config (shovel_args , Config ) ++ ExtraArgs ,
371+ set_param_nowait (Config , ? PARAM , ShovelArgs ),
372+ await_autodelete (Config , ? PARAM ),
373+ eventually (
374+ ? _assert (
375+ list_queue_messages (Config , Dest ) >= 5 ),
376+ 1000 , 45 ),
377+ ExpDest = list_queue_messages (Config , Dest ),
378+ amqp10_expect_count (Sess , Src , ExpSrcFun (ExpDest )),
379+ amqp10_expect_count (Sess , Dest , ExpDest )
380+ end ).
381+
307382% %----------------------------------------------------------------------------
308383maybe_skip_local_protocol (Config ) ->
309384 [Node ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
@@ -314,3 +389,12 @@ maybe_skip_local_protocol(Config) ->
314389 _ ->
315390 {skip , " This group requires rabbitmq_4.0.0 feature flag" }
316391 end .
392+
393+ list_queue_messages (Config , QName ) ->
394+ List = rabbit_ct_broker_helpers :rabbitmqctl_list (
395+ Config , 0 ,
396+ [" list_queues" , " name" , " messages" , " --no-table-headers" ]),
397+ [[_ , Messages ]] = lists :filter (fun ([Q , _ ]) ->
398+ Q == QName
399+ end , List ),
400+ binary_to_integer (Messages ).
0 commit comments