Skip to content

Commit c4138d5

Browse files
committed
Shovel tests: refactor and new tests for all protocols
1 parent 7f25c52 commit c4138d5

File tree

4 files changed

+148
-102
lines changed

4 files changed

+148
-102
lines changed

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

Lines changed: 64 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
amqp10_expect_empty/2,
1818
await_amqp10_event/3, amqp10_expect_one/2,
1919
amqp10_expect_count/3, amqp10_publish/4,
20-
amqp10_publish_expect/5,
20+
amqp10_publish_expect/5, amqp10_declare_queue/3,
2121
await_autodelete/2]).
2222

23+
-define(PARAM, <<"test">>).
24+
2325
all() ->
2426
[
2527
{group, non_parallel_tests},
@@ -85,7 +87,8 @@ init_per_testcase(Testcase, Config0) ->
8587
rabbit_ct_helpers:testcase_started(Config, Testcase).
8688

8789
end_per_testcase(Testcase, Config) ->
88-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []),
90+
shovel_test_utils:clear_param(Config, ?PARAM),
91+
rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
8992
rabbit_ct_helpers:testcase_finished(Config, Testcase).
9093

9194
%% -------------------------------------------------------------------
@@ -116,11 +119,9 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
116119
TmpQ = <<"tmp">>,
117120
with_amqp10_session(Config,
118121
fun (Sess) ->
119-
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
120-
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ,
121-
#{arguments =>#{<<"x-max-length">> => {uint, 0},
122-
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
123-
<<"x-dead-letter-routing-key">> => {utf8, Src}}}),
122+
amqp10_declare_queue(Sess, TmpQ, #{<<"x-max-length">> => {uint, 0},
123+
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
124+
<<"x-dead-letter-routing-key">> => {utf8, Src}}),
124125
{ok, Sender} = amqp10_client:attach_sender_link(Sess,
125126
<<"sender-tmp">>,
126127
<<"/queues/", TmpQ/binary>>,
@@ -135,7 +136,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
135136

136137
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
137138
MapConfig = ?config(map_config, Config),
138-
shovel_test_utils:set_param(Config, <<"test">>,
139+
shovel_test_utils:set_param(Config, ?PARAM,
139140
[{<<"src-protocol">>, Protocol},
140141
{ProtocolSrc, Src},
141142
{<<"dest-protocol">>, <<"amqp10">>},
@@ -189,18 +190,18 @@ simple_amqp10_src(Config) ->
189190
fun (Sess) ->
190191
shovel_test_utils:set_param(
191192
Config,
192-
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
193-
{<<"src-address">>, Src},
194-
{<<"dest-protocol">>, <<"amqp091">>},
195-
{<<"dest-queue">>, Dest},
196-
{<<"add-forward-headers">>, true},
197-
{<<"dest-add-timestamp-header">>, true},
198-
{<<"publish-properties">>,
199-
case MapConfig of
200-
true -> #{<<"cluster_id">> => <<"x">>};
201-
_ -> [{<<"cluster_id">>, <<"x">>}]
202-
end}
203-
]),
193+
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
194+
{<<"src-address">>, Src},
195+
{<<"dest-protocol">>, <<"amqp091">>},
196+
{<<"dest-queue">>, Dest},
197+
{<<"add-forward-headers">>, true},
198+
{<<"dest-add-timestamp-header">>, true},
199+
{<<"publish-properties">>,
200+
case MapConfig of
201+
true -> #{<<"cluster_id">> => <<"x">>};
202+
_ -> [{<<"cluster_id">>, <<"x">>}]
203+
end}
204+
]),
204205
_Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
205206
% the fidelity loss is quite high when consuming using the amqp10
206207
% plugin. For example custom headers aren't current translated.
@@ -216,18 +217,18 @@ amqp10_to_amqp091_application_properties(Config) ->
216217
fun (Sess) ->
217218
shovel_test_utils:set_param(
218219
Config,
219-
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
220-
{<<"src-address">>, Src},
221-
{<<"dest-protocol">>, <<"amqp091">>},
222-
{<<"dest-queue">>, Dest},
223-
{<<"add-forward-headers">>, true},
224-
{<<"dest-add-timestamp-header">>, true},
225-
{<<"publish-properties">>,
226-
case MapConfig of
227-
true -> #{<<"cluster_id">> => <<"x">>};
228-
_ -> [{<<"cluster_id">>, <<"x">>}]
229-
end}
230-
]),
220+
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
221+
{<<"src-address">>, Src},
222+
{<<"dest-protocol">>, <<"amqp091">>},
223+
{<<"dest-queue">>, Dest},
224+
{<<"add-forward-headers">>, true},
225+
{<<"dest-add-timestamp-header">>, true},
226+
{<<"publish-properties">>,
227+
case MapConfig of
228+
true -> #{<<"cluster_id">> => <<"x">>};
229+
_ -> [{<<"cluster_id">>, <<"x">>}]
230+
end}
231+
]),
231232

232233
MsgSent = amqp10_msg:set_application_properties(
233234
#{<<"key">> => <<"value">>},
@@ -250,13 +251,13 @@ change_definition(Config) ->
250251
Dest2 = ?config(destq2, Config),
251252
with_amqp10_session(Config,
252253
fun (Sess) ->
253-
shovel_test_utils:set_param(Config, <<"test">>,
254+
shovel_test_utils:set_param(Config, ?PARAM,
254255
[{<<"src-address">>, Src},
255256
{<<"src-protocol">>, <<"amqp10">>},
256257
{<<"dest-protocol">>, <<"amqp10">>},
257258
{<<"dest-address">>, Dest}]),
258259
amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1),
259-
shovel_test_utils:set_param(Config, <<"test">>,
260+
shovel_test_utils:set_param(Config, ?PARAM,
260261
[{<<"src-address">>, Src},
261262
{<<"src-protocol">>, <<"amqp10">>},
262263
{<<"dest-protocol">>, <<"amqp10">>},
@@ -299,14 +300,14 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
299300
amqp10_publish(Session, Src, <<"hello">>, 100),
300301
shovel_test_utils:set_param_nowait(
301302
Config,
302-
<<"test">>, [{<<"src-address">>, Src},
303-
{<<"src-protocol">>, <<"amqp10">>},
304-
{<<"src-delete-after">>, After},
305-
{<<"src-prefetch-count">>, 5},
306-
{<<"dest-address">>, Dest},
307-
{<<"dest-protocol">>, <<"amqp10">>},
308-
{<<"ack-mode">>, AckMode}
309-
]),
303+
?PARAM, [{<<"src-address">>, Src},
304+
{<<"src-protocol">>, <<"amqp10">>},
305+
{<<"src-delete-after">>, After},
306+
{<<"src-prefetch-count">>, 5},
307+
{<<"dest-address">>, Dest},
308+
{<<"dest-protocol">>, <<"amqp10">>},
309+
{<<"ack-mode">>, AckMode}
310+
]),
310311
await_autodelete(Config, <<"test">>),
311312
amqp10_expect_count(Session, Dest, ExpDest),
312313
amqp10_expect_count(Session, Src, ExpSrc)
@@ -319,14 +320,14 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
319320
amqp10_publish(Session, Src, <<"hello">>, 100),
320321
shovel_test_utils:set_param_nowait(
321322
Config,
322-
<<"test">>, [{<<"src-queue">>, Src},
323-
{<<"src-protocol">>, <<"amqp091">>},
324-
{<<"src-delete-after">>, After},
325-
{<<"src-prefetch-count">>, 5},
326-
{<<"dest-address">>, Dest},
327-
{<<"dest-protocol">>, <<"amqp10">>},
328-
{<<"ack-mode">>, AckMode}
329-
]),
323+
?PARAM, [{<<"src-queue">>, Src},
324+
{<<"src-protocol">>, <<"amqp091">>},
325+
{<<"src-delete-after">>, After},
326+
{<<"src-prefetch-count">>, 5},
327+
{<<"dest-address">>, Dest},
328+
{<<"dest-protocol">>, <<"amqp10">>},
329+
{<<"ack-mode">>, AckMode}
330+
]),
330331
await_autodelete(Config, <<"test">>),
331332
amqp10_expect_count(Session, Dest, ExpDest),
332333
amqp10_expect_count(Session, Src, ExpSrc)
@@ -339,14 +340,14 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
339340
amqp10_publish(Session, Src, <<"hello">>, 100),
340341
shovel_test_utils:set_param_nowait(
341342
Config,
342-
<<"test">>, [{<<"src-address">>, Src},
343-
{<<"src-protocol">>, <<"amqp10">>},
344-
{<<"src-delete-after">>, After},
345-
{<<"src-prefetch-count">>, 5},
346-
{<<"dest-queue">>, Dest},
347-
{<<"dest-protocol">>, <<"amqp091">>},
348-
{<<"ack-mode">>, AckMode}
349-
]),
343+
?PARAM, [{<<"src-address">>, Src},
344+
{<<"src-protocol">>, <<"amqp10">>},
345+
{<<"src-delete-after">>, After},
346+
{<<"src-prefetch-count">>, 5},
347+
{<<"dest-queue">>, Dest},
348+
{<<"dest-protocol">>, <<"amqp091">>},
349+
{<<"ack-mode">>, AckMode}
350+
]),
350351
await_autodelete(Config, <<"test">>),
351352
amqp10_expect_count(Session, Dest, ExpDest),
352353
amqp10_expect_count(Session, Src, ExpSrc)
@@ -355,23 +356,21 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
355356
autodelete_with_rejections(Config) ->
356357
Src = ?config(srcq, Config),
357358
Dest = ?config(destq, Config),
358-
with_session(
359+
with_amqp10_session(
359360
Config,
360361
fun (Sess) ->
361-
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
362-
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Dest,
363-
#{arguments =>#{<<"x-max-length">> => {uint, 5},
364-
<<"x-overflow">> => {utf8, <<"reject-publish">>}}}),
362+
amqp10_declare_queue(Sess, Dest, #{<<"x-max-length">> => {uint, 5},
363+
<<"x-overflow">> => {utf8, <<"reject-publish">>}}),
365364

366-
shovel_test_utils:set_param(Config, <<"test">>,
365+
shovel_test_utils:set_param(Config, ?PARAM,
367366
[{<<"src-protocol">>, <<"local">>},
368367
{<<"src-queue">>, Src},
369368
{<<"src-delete-after">>, 10},
370369
{<<"dest-protocol">>, <<"local">>},
371370
{<<"dest-predeclared">>, true},
372371
{<<"dest-queue">>, Dest}
373372
]),
374-
publish_count(Sess, Src, <<"hello">>, 10),
373+
amqp10_publish(Sess, Src, <<"hello">>, 10),
375374
await_autodelete(Config, <<"test">>),
376375
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
377376
?awaitMatch(

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
amqp10_expect_empty/2,
1919
amqp10_publish/4, amqp10_expect_one/2,
2020
amqp10_expect_count/3, amqp10_expect/3,
21-
amqp10_publish_expect/5,
21+
amqp10_publish_expect/5, amqp10_subscribe/2,
2222
await_autodelete/2]).
2323

2424
-define(PARAM, <<"test">>).
@@ -593,7 +593,7 @@ local_to_local_delete_after_with_rejections(Config) ->
593593
VHost = <<"/">>,
594594
declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5},
595595
{<<"x-overflow">>, longstr, <<"reject-publish">>}]),
596-
with_session(Config,
596+
with_amqp10_session(Config,
597597
fun (Sess) ->
598598
shovel_test_utils:set_param(Config, ?PARAM,
599599
[{<<"src-protocol">>, <<"local">>},
@@ -603,7 +603,7 @@ local_to_local_delete_after_with_rejections(Config) ->
603603
{<<"dest-predeclared">>, true},
604604
{<<"dest-queue">>, Dest}
605605
]),
606-
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
606+
amqp10_publish(Sess, Src, <<"tag1">>, 10),
607607
?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
608608
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
609609
?awaitMatch(
@@ -666,7 +666,7 @@ local_to_local_stream_no_ack(Config) ->
666666
{<<"dest-queue">>, Dest},
667667
{<<"ack-mode">>, <<"no-ack">>}
668668
]),
669-
Receiver = subscribe(Sess, Dest),
669+
Receiver = amqp10_subscribe(Sess, Dest),
670670
amqp10_publish(Sess, Src, <<"tag1">>, 10),
671671
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
672672
rabbit_ct_broker_helpers:rpc(Config, 0,
@@ -728,7 +728,7 @@ local_to_local_stream_on_confirm(Config) ->
728728
{<<"dest-queue">>, Dest},
729729
{<<"ack-mode">>, <<"on-confirm">>}
730730
]),
731-
Receiver = subscribe(Sess, Dest),
731+
Receiver = amqp10_subscribe(Sess, Dest),
732732
amqp10_publish(Sess, Src, <<"tag1">>, 10),
733733
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
734734
rabbit_ct_broker_helpers:rpc(Config, 0,
@@ -790,7 +790,7 @@ local_to_local_stream_on_publish(Config) ->
790790
{<<"dest-queue">>, Dest},
791791
{<<"ack-mode">>, <<"on-publish">>}
792792
]),
793-
Receiver = subscribe(Sess, Dest),
793+
Receiver = amqp10_subscribe(Sess, Dest),
794794
amqp10_publish(Sess, Src, <<"tag1">>, 10),
795795
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
796796
rabbit_ct_broker_helpers:rpc(Config, 0,
@@ -1040,7 +1040,7 @@ local_to_local_stream_credit_flow(Config, AckMode) ->
10401040
{<<"ack-mode">>, AckMode}
10411041
]),
10421042

1043-
Receiver = subscribe(Sess, Dest),
1043+
Receiver = amqp10_subscribe(Sess, Dest),
10441044
amqp10_publish(Sess, Src, <<"tag1">>, 1000),
10451045
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1000}, _}],
10461046
rabbit_ct_broker_helpers:rpc(Config, 0,
@@ -1096,14 +1096,6 @@ local_to_local_counters(Config) ->
10961096
end).
10971097

10981098
%%----------------------------------------------------------------------------
1099-
subscribe(Session, Dest) ->
1100-
LinkName = <<"dynamic-receiver-", Dest/binary>>,
1101-
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
1102-
Dest, settled,
1103-
unsettled_state),
1104-
ok = amqp10_client:flow_link_credit(Receiver, 10, 1),
1105-
Receiver.
1106-
11071099
declare_queue(Config, VHost, QName) ->
11081100
declare_queue(Config, VHost, QName, []).
11091101

deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
-import(shovel_test_utils, [set_param/3,
1818
with_amqp10_session/2,
19-
amqp10_publish_expect/5]).
19+
amqp10_publish_expect/5,
20+
amqp10_declare_queue/3]).
2021

2122
-define(PARAM, <<"test">>).
2223

@@ -48,7 +49,13 @@ groups() ->
4849

4950
tests() ->
5051
[
51-
simple
52+
simple,
53+
simple_classic_no_ack,
54+
simple_classic_on_confirm,
55+
simple_classic_on_publish,
56+
simple_quorum_no_ack,
57+
simple_quorum_on_confirm,
58+
simple_quorum_on_publish
5259
].
5360

5461
%% -------------------------------------------------------------------
@@ -189,19 +196,49 @@ end_per_testcase(Testcase, Config) ->
189196
%% Testcases.
190197
%% -------------------------------------------------------------------
191198
simple(Config) ->
192-
Name = <<"test">>,
193199
Src = ?config(srcq, Config),
194200
Dest = ?config(destq, Config),
195201
with_amqp10_session(
196202
Config,
197203
fun (Sess) ->
198-
set_param(Config, Name, ?config(shovel_args, Config)),
204+
set_param(Config, ?PARAM, ?config(shovel_args, Config)),
199205
amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
200-
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
206+
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, ?PARAM}]),
201207
?assertMatch([_|_], Status),
202208
?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status))
203209
end).
204210

211+
simple_classic_no_ack(Config) ->
212+
simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>).
213+
214+
simple_classic_on_confirm(Config) ->
215+
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>).
216+
217+
simple_classic_on_publish(Config) ->
218+
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>).
219+
220+
simple_quorum_no_ack(Config) ->
221+
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>).
222+
223+
simple_quorum_on_confirm(Config) ->
224+
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>).
225+
226+
simple_quorum_on_publish(Config) ->
227+
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>).
228+
229+
simple_queue_type_ack_mode(Config, Type, AckMode) ->
230+
Src = ?config(srcq, Config),
231+
Dest = ?config(destq, Config),
232+
with_amqp10_session(
233+
Config,
234+
fun (Sess) ->
235+
amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}),
236+
amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}),
237+
ExtraArgs = [{<<"ack-mode">>, AckMode}],
238+
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
239+
set_param(Config, ?PARAM, ShovelArgs),
240+
amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 10)
241+
end).
205242

206243
%%----------------------------------------------------------------------------
207244
maybe_skip_local_protocol(Config) ->

0 commit comments

Comments
 (0)