Skip to content

Commit 656a4bf

Browse files
Merge pull request #14673 from rabbitmq/shovel-tests
Shovels: start local shovels with predeclared queues when queue arguments don't match
2 parents eac3d1c + d222d4c commit 656a4bf

File tree

6 files changed

+236
-93
lines changed

6 files changed

+236
-93
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,12 @@ decl_queue(QName, QArgs, VHost, User) ->
594594
Method = #'queue.declare'{queue = QName,
595595
durable = true,
596596
arguments = Args},
597-
decl_fun([Method], VHost, User).
597+
try
598+
decl_fun([#'queue.declare'{queue = QName,
599+
passive = true}], VHost, User)
600+
catch exit:{amqp_error, not_found, _, _} ->
601+
decl_fun([Method], VHost, User)
602+
end.
598603

599604
dest_check_queue(none, _, _, _) ->
600605
ok;

deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ groups() ->
4040
restart,
4141
change_definition,
4242
autodelete,
43+
autodelete_with_rejections,
4344
validation,
4445
security_validation,
4546
get_connection_name,
@@ -519,6 +520,35 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
519520
expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc)
520521
end.
521522

523+
autodelete_with_rejections(Config) ->
524+
Src = <<"src">>,
525+
Dest = <<"dst">>,
526+
Args = [{<<"x-max-length">>, long, 5},
527+
{<<"x-overflow">>, longstr, <<"reject-publish">>}],
528+
with_ch(Config,
529+
fun (Ch) ->
530+
amqp_channel:call(Ch, #'queue.declare'{queue = Dest,
531+
durable = true,
532+
arguments = Args}),
533+
shovel_test_utils:set_param(Config, <<"test">>,
534+
[{<<"src-protocol">>, <<"local">>},
535+
{<<"src-queue">>, Src},
536+
{<<"src-delete-after">>, 10},
537+
{<<"dest-protocol">>, <<"local">>},
538+
{<<"dest-predeclared">>, true},
539+
{<<"dest-queue">>, Dest}
540+
]),
541+
publish_count(Ch, <<>>, Src, <<"hello">>, 10),
542+
await_autodelete(Config, <<"test">>),
543+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
544+
eventually(
545+
?_assertMatch(
546+
Expected,
547+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
548+
Config, 0,
549+
["list_queues", "name", "messages_ready", "--no-table-headers"]))))
550+
end).
551+
522552
validation(Config) ->
523553
URIs = [{<<"src-uri">>, <<"amqp://">>},
524554
{<<"dest-uri">>, <<"amqp://">>}],

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

Lines changed: 89 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@
99

1010
-include_lib("common_test/include/ct.hrl").
1111
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1213
-compile(export_all).
1314

1415
-import(shovel_test_utils, [with_amqp10_session/2,
1516
amqp10_publish/3, amqp10_publish/5,
1617
amqp10_expect_empty/2,
1718
await_amqp10_event/3, amqp10_expect_one/2,
1819
amqp10_expect_count/3, amqp10_publish/4,
19-
amqp10_publish_expect/5,
20+
amqp10_publish_expect/5, amqp10_declare_queue/3,
2021
await_autodelete/2]).
2122

23+
-define(PARAM, <<"test">>).
24+
2225
all() ->
2326
[
2427
{group, non_parallel_tests},
@@ -34,6 +37,7 @@ groups() ->
3437
autodelete_amqp091_src_on_publish,
3538
autodelete_amqp091_dest_on_confirm,
3639
autodelete_amqp091_dest_on_publish,
40+
autodelete_with_rejections,
3741
simple_amqp10_dest,
3842
simple_amqp10_src,
3943
amqp091_to_amqp10_with_dead_lettering,
@@ -83,6 +87,8 @@ init_per_testcase(Testcase, Config0) ->
8387
rabbit_ct_helpers:testcase_started(Config, Testcase).
8488

8589
end_per_testcase(Testcase, Config) ->
90+
shovel_test_utils:clear_param(Config, ?PARAM),
91+
rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
8692
rabbit_ct_helpers:testcase_finished(Config, Testcase).
8793

8894
%% -------------------------------------------------------------------
@@ -113,11 +119,9 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
113119
TmpQ = <<"tmp">>,
114120
with_amqp10_session(Config,
115121
fun (Sess) ->
116-
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
117-
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ,
118-
#{arguments =>#{<<"x-max-length">> => {uint, 0},
119-
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
120-
<<"x-dead-letter-routing-key">> => {utf8, Src}}}),
122+
amqp10_declare_queue(Sess, TmpQ, #{<<"x-max-length">> => {uint, 0},
123+
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
124+
<<"x-dead-letter-routing-key">> => {utf8, Src}}),
121125
{ok, Sender} = amqp10_client:attach_sender_link(Sess,
122126
<<"sender-tmp">>,
123127
<<"/queues/", TmpQ/binary>>,
@@ -132,7 +136,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
132136

133137
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
134138
MapConfig = ?config(map_config, Config),
135-
shovel_test_utils:set_param(Config, <<"test">>,
139+
shovel_test_utils:set_param(Config, ?PARAM,
136140
[{<<"src-protocol">>, Protocol},
137141
{ProtocolSrc, Src},
138142
{<<"dest-protocol">>, <<"amqp10">>},
@@ -186,18 +190,18 @@ simple_amqp10_src(Config) ->
186190
fun (Sess) ->
187191
shovel_test_utils:set_param(
188192
Config,
189-
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
190-
{<<"src-address">>, Src},
191-
{<<"dest-protocol">>, <<"amqp091">>},
192-
{<<"dest-queue">>, Dest},
193-
{<<"add-forward-headers">>, true},
194-
{<<"dest-add-timestamp-header">>, true},
195-
{<<"publish-properties">>,
196-
case MapConfig of
197-
true -> #{<<"cluster_id">> => <<"x">>};
198-
_ -> [{<<"cluster_id">>, <<"x">>}]
199-
end}
200-
]),
193+
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
194+
{<<"src-address">>, Src},
195+
{<<"dest-protocol">>, <<"amqp091">>},
196+
{<<"dest-queue">>, Dest},
197+
{<<"add-forward-headers">>, true},
198+
{<<"dest-add-timestamp-header">>, true},
199+
{<<"publish-properties">>,
200+
case MapConfig of
201+
true -> #{<<"cluster_id">> => <<"x">>};
202+
_ -> [{<<"cluster_id">>, <<"x">>}]
203+
end}
204+
]),
201205
_Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
202206
% the fidelity loss is quite high when consuming using the amqp10
203207
% plugin. For example custom headers aren't current translated.
@@ -213,18 +217,18 @@ amqp10_to_amqp091_application_properties(Config) ->
213217
fun (Sess) ->
214218
shovel_test_utils:set_param(
215219
Config,
216-
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
217-
{<<"src-address">>, Src},
218-
{<<"dest-protocol">>, <<"amqp091">>},
219-
{<<"dest-queue">>, Dest},
220-
{<<"add-forward-headers">>, true},
221-
{<<"dest-add-timestamp-header">>, true},
222-
{<<"publish-properties">>,
223-
case MapConfig of
224-
true -> #{<<"cluster_id">> => <<"x">>};
225-
_ -> [{<<"cluster_id">>, <<"x">>}]
226-
end}
227-
]),
220+
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
221+
{<<"src-address">>, Src},
222+
{<<"dest-protocol">>, <<"amqp091">>},
223+
{<<"dest-queue">>, Dest},
224+
{<<"add-forward-headers">>, true},
225+
{<<"dest-add-timestamp-header">>, true},
226+
{<<"publish-properties">>,
227+
case MapConfig of
228+
true -> #{<<"cluster_id">> => <<"x">>};
229+
_ -> [{<<"cluster_id">>, <<"x">>}]
230+
end}
231+
]),
228232

229233
MsgSent = amqp10_msg:set_application_properties(
230234
#{<<"key">> => <<"value">>},
@@ -247,13 +251,13 @@ change_definition(Config) ->
247251
Dest2 = ?config(destq2, Config),
248252
with_amqp10_session(Config,
249253
fun (Sess) ->
250-
shovel_test_utils:set_param(Config, <<"test">>,
254+
shovel_test_utils:set_param(Config, ?PARAM,
251255
[{<<"src-address">>, Src},
252256
{<<"src-protocol">>, <<"amqp10">>},
253257
{<<"dest-protocol">>, <<"amqp10">>},
254258
{<<"dest-address">>, Dest}]),
255259
amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1),
256-
shovel_test_utils:set_param(Config, <<"test">>,
260+
shovel_test_utils:set_param(Config, ?PARAM,
257261
[{<<"src-address">>, Src},
258262
{<<"src-protocol">>, <<"amqp10">>},
259263
{<<"dest-protocol">>, <<"amqp10">>},
@@ -296,14 +300,14 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
296300
amqp10_publish(Session, Src, <<"hello">>, 100),
297301
shovel_test_utils:set_param_nowait(
298302
Config,
299-
<<"test">>, [{<<"src-address">>, Src},
300-
{<<"src-protocol">>, <<"amqp10">>},
301-
{<<"src-delete-after">>, After},
302-
{<<"src-prefetch-count">>, 5},
303-
{<<"dest-address">>, Dest},
304-
{<<"dest-protocol">>, <<"amqp10">>},
305-
{<<"ack-mode">>, AckMode}
306-
]),
303+
?PARAM, [{<<"src-address">>, Src},
304+
{<<"src-protocol">>, <<"amqp10">>},
305+
{<<"src-delete-after">>, After},
306+
{<<"src-prefetch-count">>, 5},
307+
{<<"dest-address">>, Dest},
308+
{<<"dest-protocol">>, <<"amqp10">>},
309+
{<<"ack-mode">>, AckMode}
310+
]),
307311
await_autodelete(Config, <<"test">>),
308312
amqp10_expect_count(Session, Dest, ExpDest),
309313
amqp10_expect_count(Session, Src, ExpSrc)
@@ -316,14 +320,14 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
316320
amqp10_publish(Session, Src, <<"hello">>, 100),
317321
shovel_test_utils:set_param_nowait(
318322
Config,
319-
<<"test">>, [{<<"src-queue">>, Src},
320-
{<<"src-protocol">>, <<"amqp091">>},
321-
{<<"src-delete-after">>, After},
322-
{<<"src-prefetch-count">>, 5},
323-
{<<"dest-address">>, Dest},
324-
{<<"dest-protocol">>, <<"amqp10">>},
325-
{<<"ack-mode">>, AckMode}
326-
]),
323+
?PARAM, [{<<"src-queue">>, Src},
324+
{<<"src-protocol">>, <<"amqp091">>},
325+
{<<"src-delete-after">>, After},
326+
{<<"src-prefetch-count">>, 5},
327+
{<<"dest-address">>, Dest},
328+
{<<"dest-protocol">>, <<"amqp10">>},
329+
{<<"ack-mode">>, AckMode}
330+
]),
327331
await_autodelete(Config, <<"test">>),
328332
amqp10_expect_count(Session, Dest, ExpDest),
329333
amqp10_expect_count(Session, Src, ExpSrc)
@@ -336,19 +340,47 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
336340
amqp10_publish(Session, Src, <<"hello">>, 100),
337341
shovel_test_utils:set_param_nowait(
338342
Config,
339-
<<"test">>, [{<<"src-address">>, Src},
340-
{<<"src-protocol">>, <<"amqp10">>},
341-
{<<"src-delete-after">>, After},
342-
{<<"src-prefetch-count">>, 5},
343-
{<<"dest-queue">>, Dest},
344-
{<<"dest-protocol">>, <<"amqp091">>},
345-
{<<"ack-mode">>, AckMode}
346-
]),
343+
?PARAM, [{<<"src-address">>, Src},
344+
{<<"src-protocol">>, <<"amqp10">>},
345+
{<<"src-delete-after">>, After},
346+
{<<"src-prefetch-count">>, 5},
347+
{<<"dest-queue">>, Dest},
348+
{<<"dest-protocol">>, <<"amqp091">>},
349+
{<<"ack-mode">>, AckMode}
350+
]),
347351
await_autodelete(Config, <<"test">>),
348352
amqp10_expect_count(Session, Dest, ExpDest),
349353
amqp10_expect_count(Session, Src, ExpSrc)
350354
end.
351355

356+
autodelete_with_rejections(Config) ->
357+
Src = ?config(srcq, Config),
358+
Dest = ?config(destq, Config),
359+
with_amqp10_session(
360+
Config,
361+
fun (Sess) ->
362+
amqp10_declare_queue(Sess, Dest, #{<<"x-max-length">> => {uint, 5},
363+
<<"x-overflow">> => {utf8, <<"reject-publish">>}}),
364+
365+
shovel_test_utils:set_param(Config, ?PARAM,
366+
[{<<"src-protocol">>, <<"local">>},
367+
{<<"src-queue">>, Src},
368+
{<<"src-delete-after">>, 10},
369+
{<<"dest-protocol">>, <<"local">>},
370+
{<<"dest-predeclared">>, true},
371+
{<<"dest-queue">>, Dest}
372+
]),
373+
amqp10_publish(Sess, Src, <<"hello">>, 10),
374+
await_autodelete(Config, <<"test">>),
375+
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
376+
?awaitMatch(
377+
Expected,
378+
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
379+
Config, 0,
380+
["list_queues", "name", "messages_ready", "--no-table-headers"])),
381+
30_000)
382+
end).
383+
352384
test_amqp10_delete_after_queue_length(Config) ->
353385
Src = ?config(srcq, Config),
354386
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)