Skip to content

Commit 7f25c52

Browse files
committed
Shovel: tests delete after with queue rejections
1 parent 3c4707d commit 7f25c52

File tree

3 files changed

+92
-0
lines changed

3 files changed

+92
-0
lines changed

deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ groups() ->
4040
restart,
4141
change_definition,
4242
autodelete,
43+
autodelete_with_rejections,
4344
validation,
4445
security_validation,
4546
get_connection_name,
@@ -519,6 +520,35 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
519520
expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc)
520521
end.
521522

523+
autodelete_with_rejections(Config) ->
524+
Src = <<"src">>,
525+
Dest = <<"dst">>,
526+
Args = [{<<"x-max-length">>, long, 5},
527+
{<<"x-overflow">>, longstr, <<"reject-publish">>}],
528+
with_ch(Config,
529+
fun (Ch) ->
530+
amqp_channel:call(Ch, #'queue.declare'{queue = Dest,
531+
durable = true,
532+
arguments = Args}),
533+
shovel_test_utils:set_param(Config, <<"test">>,
534+
[{<<"src-protocol">>, <<"local">>},
535+
{<<"src-queue">>, Src},
536+
{<<"src-delete-after">>, 10},
537+
{<<"dest-protocol">>, <<"local">>},
538+
{<<"dest-predeclared">>, true},
539+
{<<"dest-queue">>, Dest}
540+
]),
541+
publish_count(Ch, <<>>, Src, <<"hello">>, 10),
542+
await_autodelete(Config, <<"test">>),
543+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
544+
eventually(
545+
?_assertMatch(
546+
Expected,
547+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
548+
Config, 0,
549+
["list_queues", "name", "messages_ready", "--no-table-headers"]))))
550+
end).
551+
522552
validation(Config) ->
523553
URIs = [{<<"src-uri">>, <<"amqp://">>},
524554
{<<"dest-uri">>, <<"amqp://">>}],

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-include_lib("common_test/include/ct.hrl").
1111
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1213
-compile(export_all).
1314

1415
-import(shovel_test_utils, [with_amqp10_session/2,
@@ -34,6 +35,7 @@ groups() ->
3435
autodelete_amqp091_src_on_publish,
3536
autodelete_amqp091_dest_on_confirm,
3637
autodelete_amqp091_dest_on_publish,
38+
autodelete_with_rejections,
3739
simple_amqp10_dest,
3840
simple_amqp10_src,
3941
amqp091_to_amqp10_with_dead_lettering,
@@ -83,6 +85,7 @@ init_per_testcase(Testcase, Config0) ->
8385
rabbit_ct_helpers:testcase_started(Config, Testcase).
8486

8587
end_per_testcase(Testcase, Config) ->
88+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []),
8689
rabbit_ct_helpers:testcase_finished(Config, Testcase).
8790

8891
%% -------------------------------------------------------------------
@@ -349,6 +352,36 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
349352
amqp10_expect_count(Session, Src, ExpSrc)
350353
end.
351354

355+
autodelete_with_rejections(Config) ->
356+
Src = ?config(srcq, Config),
357+
Dest = ?config(destq, Config),
358+
with_session(
359+
Config,
360+
fun (Sess) ->
361+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
362+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Dest,
363+
#{arguments =>#{<<"x-max-length">> => {uint, 5},
364+
<<"x-overflow">> => {utf8, <<"reject-publish">>}}}),
365+
366+
shovel_test_utils:set_param(Config, <<"test">>,
367+
[{<<"src-protocol">>, <<"local">>},
368+
{<<"src-queue">>, Src},
369+
{<<"src-delete-after">>, 10},
370+
{<<"dest-protocol">>, <<"local">>},
371+
{<<"dest-predeclared">>, true},
372+
{<<"dest-queue">>, Dest}
373+
]),
374+
publish_count(Sess, Src, <<"hello">>, 10),
375+
await_autodelete(Config, <<"test">>),
376+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
377+
?awaitMatch(
378+
Expected,
379+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
380+
Config, 0,
381+
["list_queues", "name", "messages_ready", "--no-table-headers"])),
382+
30_000)
383+
end).
384+
352385
test_amqp10_delete_after_queue_length(Config) ->
353386
Src = ?config(srcq, Config),
354387
Dest = ?config(destq, Config),

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ groups() ->
5454
local_to_local_delete_after_queue_length,
5555
local_to_local_delete_after_queue_length_zero,
5656
local_to_local_delete_after_number,
57+
local_to_local_delete_after_with_rejections,
5758
local_to_local_no_ack,
5859
local_to_local_quorum_no_ack,
5960
local_to_local_stream_no_ack,
@@ -586,6 +587,34 @@ local_to_local_delete_after_number(Config) ->
586587
amqp10_expect_empty(Sess, Dest)
587588
end).
588589

590+
local_to_local_delete_after_with_rejections(Config) ->
591+
Src = ?config(srcq, Config),
592+
Dest = ?config(destq, Config),
593+
VHost = <<"/">>,
594+
declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5},
595+
{<<"x-overflow">>, longstr, <<"reject-publish">>}]),
596+
with_session(Config,
597+
fun (Sess) ->
598+
shovel_test_utils:set_param(Config, ?PARAM,
599+
[{<<"src-protocol">>, <<"local">>},
600+
{<<"src-queue">>, Src},
601+
{<<"src-delete-after">>, 10},
602+
{<<"dest-protocol">>, <<"local">>},
603+
{<<"dest-predeclared">>, true},
604+
{<<"dest-queue">>, Dest}
605+
]),
606+
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
607+
?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
608+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
609+
?awaitMatch(
610+
Expected,
611+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
612+
Config, 0,
613+
["list_queues", "name", "messages_ready", "--no-table-headers"])),
614+
30_000)
615+
616+
end).
617+
589618
local_to_local_no_ack(Config) ->
590619
Src = ?config(srcq, Config),
591620
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)