Skip to content

Commit 1eff3e4

Browse files
Merge pull request #14930 from rabbitmq/mergify/bp/v4.2.x/pr-14929
Shovels: introduce `rabbit_shovel_parameters:{src,dest}_protocol/1` (backport #14929)
2 parents 686e76e + b931ba7 commit 1eff3e4

File tree

3 files changed

+247
-19
lines changed

3 files changed

+247
-19
lines changed

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
-export([validate/5, notify/5, notify_clear/4]).
1616
-export([register/0, unregister/0, parse/3]).
1717
-export([obfuscate_uris_in_definition/1]).
18-
18+
-export([src_protocol/1, dest_protocol/1, protocols/1]).
1919
-export([is_internal/1, internal_owner/1]).
2020

2121
-import(rabbit_misc, [pget/2, pget/3, pset/3]).
@@ -143,6 +143,31 @@ validate_internal_owner(Name, Term0) ->
143143
['exchange', 'queue'])},
144144
{<<"virtual_host">>, fun rabbit_parameter_validation:binary/2}], Term).
145145

146+
src_protocol(Def) when is_map(Def) ->
147+
src_protocol(rabbit_data_coercion:to_proplist(Def));
148+
src_protocol(Def) when is_list(Def) ->
149+
case lists:keyfind(<<"src-protocol">>, 1, Def) of
150+
{_, SrcProtocol} ->
151+
rabbit_data_coercion:to_atom(SrcProtocol);
152+
false -> amqp091
153+
end.
154+
155+
dest_protocol(Def) when is_map(Def) ->
156+
dest_protocol(rabbit_data_coercion:to_proplist(Def));
157+
dest_protocol(Def) when is_list(Def) ->
158+
case lists:keyfind(<<"dest-protocol">>, 1, Def) of
159+
{_, DstProtocol} ->
160+
rabbit_data_coercion:to_atom(DstProtocol);
161+
false -> amqp091
162+
end.
163+
164+
protocols(Def) when is_map(Def) ->
165+
protocols(rabbit_data_coercion:to_proplist(Def));
166+
protocols(Def) ->
167+
Src = src_protocol(Def),
168+
Dst = dest_protocol(Def),
169+
{Src, Dst}.
170+
146171
%%----------------------------------------------------------------------------
147172

148173
parse({VHost, Name}, ClusterName, Def) ->
@@ -177,20 +202,5 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm;
177202
translate_ack_mode(<<"on-publish">>) -> on_publish;
178203
translate_ack_mode(<<"no-ack">>) -> no_ack.
179204

180-
protocols(Def) when is_map(Def) ->
181-
protocols(rabbit_data_coercion:to_proplist(Def));
182-
protocols(Def) ->
183-
Src = case lists:keyfind(<<"src-protocol">>, 1, Def) of
184-
{_, SrcProtocol} ->
185-
rabbit_data_coercion:to_atom(SrcProtocol);
186-
false -> amqp091
187-
end,
188-
Dst = case lists:keyfind(<<"dest-protocol">>, 1, Def) of
189-
{_, DstProtocol} ->
190-
rabbit_data_coercion:to_atom(DstProtocol);
191-
false -> amqp091
192-
end,
193-
{Src, Dst}.
194-
195205
list_all_protocols() ->
196206
[P || {P, _} <- rabbit_registry:lookup_all(shovel_protocol)].

deps/rabbitmq_shovel/test/unit_parsing_and_validation_SUITE.erl

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ groups() ->
2727
[
2828
{tests, [parallel], [
2929
parse_amqp091,
30-
parse_amqp10_mixed
30+
parse_amqp10_mixed,
31+
parse_local
3132
]}
3233
].
3334

@@ -53,7 +54,7 @@ end_per_testcase(_Testcase, Config) -> Config.
5354

5455

5556
%% -------------------------------------------------------------------
56-
%% Testcases.
57+
%% Test cases
5758
%% -------------------------------------------------------------------
5859

5960
parse_amqp091(_Config) ->
@@ -128,3 +129,46 @@ parse_amqp10_mixed(_Config) ->
128129
add_forward_headers := true}}},
129130
rabbit_shovel_config:parse(my_shovel, In)),
130131
ok.
132+
133+
parse_local(_Config) ->
134+
Amqp091Src = {source, [
135+
{protocol, local},
136+
{uris, ["ampq://myhost:5672/vhost"]},
137+
{declarations, []},
138+
{queue, <<"the-queue">>},
139+
{delete_after, never},
140+
{prefetch_count, 10}]},
141+
Amqp091Dst = {destination, [
142+
{protocol, local},
143+
{uris, ["ampq://myhost:5672"]},
144+
{declarations, []},
145+
{publish_properties, [{delivery_mode, 1}]},
146+
{publish_fields, []},
147+
{add_forward_headers, true}]},
148+
In = [Amqp091Src,
149+
Amqp091Dst,
150+
{ack_mode, on_confirm},
151+
{reconnect_delay, 2}],
152+
153+
?assertMatch(
154+
{ok, #{name := my_shovel,
155+
ack_mode := on_confirm,
156+
reconnect_delay := 2,
157+
shovel_type := static,
158+
dest := #{
159+
module := rabbit_local_shovel,
160+
uris := ["ampq://myhost:5672"],
161+
exchange := none,
162+
routing_key := none,
163+
resource_decl := _DDecl,
164+
add_timestamp_header := false,
165+
add_forward_headers := true},
166+
source := #{
167+
module := rabbit_local_shovel,
168+
uris := ["ampq://myhost:5672/vhost"],
169+
queue := <<"the-queue">>,
170+
consumer_args := [],
171+
delete_after := never,
172+
resource_decl := _SDecl}}},
173+
rabbit_shovel_config:parse(my_shovel, In)),
174+
ok.

deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,15 @@ groups() ->
3434
parse_amqp10,
3535
parse_amqp10_minimal,
3636
validate_amqp10,
37-
validate_amqp10_with_a_map
37+
validate_amqp10_with_a_map,
38+
test_src_protocol_defaults,
39+
test_src_protocol_explicit,
40+
test_dest_protocol_defaults,
41+
test_dest_protocol_explicit,
42+
test_protocols_defaults,
43+
test_protocols_explicit,
44+
test_protocols_mixed,
45+
test_protocols_with_maps
3846
]}
3947
].
4048

@@ -389,3 +397,169 @@ validate_ok([[_|_] = L | T]) ->
389397
validate_ok([]) -> [];
390398
validate_ok(X) ->
391399
exit({not_ok, X}).
400+
401+
%% -------------------------------------------------------------------
402+
%% Protocol detection tests
403+
%% -------------------------------------------------------------------
404+
405+
test_src_protocol_defaults(_Config) ->
406+
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
407+
{<<"dest-uri">>, <<"amqp://remote">>}],
408+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefProplist)),
409+
410+
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
411+
<<"dest-uri">> => <<"amqp://remote">>},
412+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap)),
413+
ok.
414+
415+
test_src_protocol_explicit(_Config) ->
416+
Def091 = [{<<"src-protocol">>, <<"amqp091">>}],
417+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(Def091)),
418+
419+
Def10 = [{<<"src-protocol">>, <<"amqp10">>}],
420+
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(Def10)),
421+
422+
DefLocal = [{<<"src-protocol">>, <<"local">>}],
423+
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefLocal)),
424+
425+
DefMap091 = #{<<"src-protocol">> => <<"amqp091">>},
426+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap091)),
427+
428+
DefMap10 = #{<<"src-protocol">> => <<"amqp10">>},
429+
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(DefMap10)),
430+
431+
DefMapLocal = #{<<"src-protocol">> => <<"local">>},
432+
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefMapLocal)),
433+
ok.
434+
435+
test_dest_protocol_defaults(_Config) ->
436+
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
437+
{<<"dest-uri">>, <<"amqp://remote">>}],
438+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefProplist)),
439+
440+
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
441+
<<"dest-uri">> => <<"amqp://remote">>},
442+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap)),
443+
ok.
444+
445+
test_dest_protocol_explicit(_Config) ->
446+
Def091 = [{<<"dest-protocol">>, <<"amqp091">>}],
447+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(Def091)),
448+
449+
Def10 = [{<<"dest-protocol">>, <<"amqp10">>}],
450+
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(Def10)),
451+
452+
DefLocal = [{<<"dest-protocol">>, <<"local">>}],
453+
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefLocal)),
454+
455+
DefMap091 = #{<<"dest-protocol">> => <<"amqp091">>},
456+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap091)),
457+
458+
DefMap10 = #{<<"dest-protocol">> => <<"amqp10">>},
459+
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(DefMap10)),
460+
461+
DefMapLocal = #{<<"dest-protocol">> => <<"local">>},
462+
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefMapLocal)),
463+
ok.
464+
465+
test_protocols_defaults(_Config) ->
466+
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
467+
{<<"dest-uri">>, <<"amqp://remote">>}],
468+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefProplist)),
469+
470+
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
471+
<<"dest-uri">> => <<"amqp://remote">>},
472+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap)),
473+
ok.
474+
475+
test_protocols_explicit(_Config) ->
476+
Def091 = [{<<"src-protocol">>, <<"amqp091">>},
477+
{<<"dest-protocol">>, <<"amqp091">>}],
478+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(Def091)),
479+
480+
Def10 = [{<<"src-protocol">>, <<"amqp10">>},
481+
{<<"dest-protocol">>, <<"amqp10">>}],
482+
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(Def10)),
483+
484+
DefLocal = [{<<"src-protocol">>, <<"local">>},
485+
{<<"dest-protocol">>, <<"local">>}],
486+
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefLocal)),
487+
488+
DefMap091 = #{<<"src-protocol">> => <<"amqp091">>,
489+
<<"dest-protocol">> => <<"amqp091">>},
490+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap091)),
491+
492+
DefMap10 = #{<<"src-protocol">> => <<"amqp10">>,
493+
<<"dest-protocol">> => <<"amqp10">>},
494+
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(DefMap10)),
495+
496+
DefMapLocal = #{<<"src-protocol">> => <<"local">>,
497+
<<"dest-protocol">> => <<"local">>},
498+
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMapLocal)),
499+
ok.
500+
501+
test_protocols_mixed(_Config) ->
502+
Def091to10 = [{<<"src-protocol">>, <<"amqp091">>},
503+
{<<"dest-protocol">>, <<"amqp10">>}],
504+
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(Def091to10)),
505+
506+
Def10to091 = [{<<"src-protocol">>, <<"amqp10">>},
507+
{<<"dest-protocol">>, <<"amqp091">>}],
508+
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(Def10to091)),
509+
510+
DefLocalTo091 = [{<<"src-protocol">>, <<"local">>},
511+
{<<"dest-protocol">>, <<"amqp091">>}],
512+
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefLocalTo091)),
513+
514+
Def091ToLocal = [{<<"src-protocol">>, <<"amqp091">>},
515+
{<<"dest-protocol">>, <<"local">>}],
516+
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(Def091ToLocal)),
517+
518+
Def10ToLocal = [{<<"src-protocol">>, <<"amqp10">>},
519+
{<<"dest-protocol">>, <<"local">>}],
520+
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(Def10ToLocal)),
521+
522+
DefLocalTo10 = [{<<"src-protocol">>, <<"local">>},
523+
{<<"dest-protocol">>, <<"amqp10">>}],
524+
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefLocalTo10)),
525+
526+
DefMap091to10 = #{<<"src-protocol">> => <<"amqp091">>,
527+
<<"dest-protocol">> => <<"amqp10">>},
528+
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap091to10)),
529+
530+
DefMap10to091 = #{<<"src-protocol">> => <<"amqp10">>,
531+
<<"dest-protocol">> => <<"amqp091">>},
532+
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap10to091)),
533+
534+
DefMapLocalTo091 = #{<<"src-protocol">> => <<"local">>,
535+
<<"dest-protocol">> => <<"amqp091">>},
536+
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefMapLocalTo091)),
537+
538+
DefMap091ToLocal = #{<<"src-protocol">> => <<"amqp091">>,
539+
<<"dest-protocol">> => <<"local">>},
540+
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap091ToLocal)),
541+
542+
DefMap10ToLocal = #{<<"src-protocol">> => <<"amqp10">>,
543+
<<"dest-protocol">> => <<"local">>},
544+
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(DefMap10ToLocal)),
545+
546+
DefMapLocalTo10 = #{<<"src-protocol">> => <<"local">>,
547+
<<"dest-protocol">> => <<"amqp10">>},
548+
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefMapLocalTo10)),
549+
ok.
550+
551+
test_protocols_with_maps(_Config) ->
552+
DefMap1 = #{<<"src-protocol">> => <<"amqp091">>,
553+
<<"dest-protocol">> => <<"amqp10">>},
554+
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap1)),
555+
556+
DefMap2 = #{<<"src-protocol">> => <<"local">>,
557+
<<"dest-protocol">> => <<"local">>},
558+
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMap2)),
559+
560+
DefMap3 = #{<<"src-protocol">> => <<"amqp10">>},
561+
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap3)),
562+
563+
DefMap4 = #{<<"dest-protocol">> => <<"local">>},
564+
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap4)),
565+
ok.

0 commit comments

Comments
 (0)