Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-export([validate/5, notify/5, notify_clear/4]).
-export([register/0, unregister/0, parse/3]).
-export([obfuscate_uris_in_definition/1]).

-export([src_protocol/1, dest_protocol/1, protocols/1]).
-export([is_internal/1, internal_owner/1]).

-import(rabbit_misc, [pget/2, pget/3, pset/3]).
Expand Down Expand Up @@ -143,6 +143,31 @@ validate_internal_owner(Name, Term0) ->
['exchange', 'queue'])},
{<<"virtual_host">>, fun rabbit_parameter_validation:binary/2}], Term).

src_protocol(Def) when is_map(Def) ->
src_protocol(rabbit_data_coercion:to_proplist(Def));
src_protocol(Def) when is_list(Def) ->
case lists:keyfind(<<"src-protocol">>, 1, Def) of
{_, SrcProtocol} ->
rabbit_data_coercion:to_atom(SrcProtocol);
false -> amqp091
end.

dest_protocol(Def) when is_map(Def) ->
dest_protocol(rabbit_data_coercion:to_proplist(Def));
dest_protocol(Def) when is_list(Def) ->
case lists:keyfind(<<"dest-protocol">>, 1, Def) of
{_, DstProtocol} ->
rabbit_data_coercion:to_atom(DstProtocol);
false -> amqp091
end.

protocols(Def) when is_map(Def) ->
protocols(rabbit_data_coercion:to_proplist(Def));
protocols(Def) ->
Src = src_protocol(Def),
Dst = dest_protocol(Def),
{Src, Dst}.

%%----------------------------------------------------------------------------

parse({VHost, Name}, ClusterName, Def) ->
Expand Down Expand Up @@ -177,20 +202,5 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm;
translate_ack_mode(<<"on-publish">>) -> on_publish;
translate_ack_mode(<<"no-ack">>) -> no_ack.

protocols(Def) when is_map(Def) ->
protocols(rabbit_data_coercion:to_proplist(Def));
protocols(Def) ->
Src = case lists:keyfind(<<"src-protocol">>, 1, Def) of
{_, SrcProtocol} ->
rabbit_data_coercion:to_atom(SrcProtocol);
false -> amqp091
end,
Dst = case lists:keyfind(<<"dest-protocol">>, 1, Def) of
{_, DstProtocol} ->
rabbit_data_coercion:to_atom(DstProtocol);
false -> amqp091
end,
{Src, Dst}.

list_all_protocols() ->
[P || {P, _} <- rabbit_registry:lookup_all(shovel_protocol)].
48 changes: 46 additions & 2 deletions deps/rabbitmq_shovel/test/unit_parsing_and_validation_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ groups() ->
[
{tests, [parallel], [
parse_amqp091,
parse_amqp10_mixed
parse_amqp10_mixed,
parse_local
]}
].

Expand All @@ -53,7 +54,7 @@ end_per_testcase(_Testcase, Config) -> Config.


%% -------------------------------------------------------------------
%% Testcases.
%% Test cases
%% -------------------------------------------------------------------

parse_amqp091(_Config) ->
Expand Down Expand Up @@ -128,3 +129,46 @@ parse_amqp10_mixed(_Config) ->
add_forward_headers := true}}},
rabbit_shovel_config:parse(my_shovel, In)),
ok.

parse_local(_Config) ->
Amqp091Src = {source, [
{protocol, local},
{uris, ["ampq://myhost:5672/vhost"]},
{declarations, []},
{queue, <<"the-queue">>},
{delete_after, never},
{prefetch_count, 10}]},
Amqp091Dst = {destination, [
{protocol, local},
{uris, ["ampq://myhost:5672"]},
{declarations, []},
{publish_properties, [{delivery_mode, 1}]},
{publish_fields, []},
{add_forward_headers, true}]},
In = [Amqp091Src,
Amqp091Dst,
{ack_mode, on_confirm},
{reconnect_delay, 2}],

?assertMatch(
{ok, #{name := my_shovel,
ack_mode := on_confirm,
reconnect_delay := 2,
shovel_type := static,
dest := #{
module := rabbit_local_shovel,
uris := ["ampq://myhost:5672"],
exchange := none,
routing_key := none,
resource_decl := _DDecl,
add_timestamp_header := false,
add_forward_headers := true},
source := #{
module := rabbit_local_shovel,
uris := ["ampq://myhost:5672/vhost"],
queue := <<"the-queue">>,
consumer_args := [],
delete_after := never,
resource_decl := _SDecl}}},
rabbit_shovel_config:parse(my_shovel, In)),
ok.
176 changes: 175 additions & 1 deletion deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ groups() ->
parse_amqp10,
parse_amqp10_minimal,
validate_amqp10,
validate_amqp10_with_a_map
validate_amqp10_with_a_map,
test_src_protocol_defaults,
test_src_protocol_explicit,
test_dest_protocol_defaults,
test_dest_protocol_explicit,
test_protocols_defaults,
test_protocols_explicit,
test_protocols_mixed,
test_protocols_with_maps
]}
].

Expand Down Expand Up @@ -389,3 +397,169 @@ validate_ok([[_|_] = L | T]) ->
validate_ok([]) -> [];
validate_ok(X) ->
exit({not_ok, X}).

%% -------------------------------------------------------------------
%% Protocol detection tests
%% -------------------------------------------------------------------

test_src_protocol_defaults(_Config) ->
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
{<<"dest-uri">>, <<"amqp://remote">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefProplist)),

DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
<<"dest-uri">> => <<"amqp://remote">>},
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap)),
ok.

test_src_protocol_explicit(_Config) ->
Def091 = [{<<"src-protocol">>, <<"amqp091">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(Def091)),

Def10 = [{<<"src-protocol">>, <<"amqp10">>}],
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(Def10)),

DefLocal = [{<<"src-protocol">>, <<"local">>}],
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefLocal)),

DefMap091 = #{<<"src-protocol">> => <<"amqp091">>},
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap091)),

DefMap10 = #{<<"src-protocol">> => <<"amqp10">>},
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(DefMap10)),

DefMapLocal = #{<<"src-protocol">> => <<"local">>},
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefMapLocal)),
ok.

test_dest_protocol_defaults(_Config) ->
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
{<<"dest-uri">>, <<"amqp://remote">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefProplist)),

DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
<<"dest-uri">> => <<"amqp://remote">>},
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap)),
ok.

test_dest_protocol_explicit(_Config) ->
Def091 = [{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(Def091)),

Def10 = [{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(Def10)),

DefLocal = [{<<"dest-protocol">>, <<"local">>}],
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefLocal)),

DefMap091 = #{<<"dest-protocol">> => <<"amqp091">>},
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap091)),

DefMap10 = #{<<"dest-protocol">> => <<"amqp10">>},
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(DefMap10)),

DefMapLocal = #{<<"dest-protocol">> => <<"local">>},
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefMapLocal)),
ok.

test_protocols_defaults(_Config) ->
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
{<<"dest-uri">>, <<"amqp://remote">>}],
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefProplist)),

DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
<<"dest-uri">> => <<"amqp://remote">>},
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap)),
ok.

test_protocols_explicit(_Config) ->
Def091 = [{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(Def091)),

Def10 = [{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(Def10)),

DefLocal = [{<<"src-protocol">>, <<"local">>},
{<<"dest-protocol">>, <<"local">>}],
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefLocal)),

DefMap091 = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"amqp091">>},
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap091)),

DefMap10 = #{<<"src-protocol">> => <<"amqp10">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(DefMap10)),

DefMapLocal = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMapLocal)),
ok.

test_protocols_mixed(_Config) ->
Def091to10 = [{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(Def091to10)),

Def10to091 = [{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(Def10to091)),

DefLocalTo091 = [{<<"src-protocol">>, <<"local">>},
{<<"dest-protocol">>, <<"amqp091">>}],
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefLocalTo091)),

Def091ToLocal = [{<<"src-protocol">>, <<"amqp091">>},
{<<"dest-protocol">>, <<"local">>}],
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(Def091ToLocal)),

Def10ToLocal = [{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"local">>}],
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(Def10ToLocal)),

DefLocalTo10 = [{<<"src-protocol">>, <<"local">>},
{<<"dest-protocol">>, <<"amqp10">>}],
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefLocalTo10)),

DefMap091to10 = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap091to10)),

DefMap10to091 = #{<<"src-protocol">> => <<"amqp10">>,
<<"dest-protocol">> => <<"amqp091">>},
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap10to091)),

DefMapLocalTo091 = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"amqp091">>},
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefMapLocalTo091)),

DefMap091ToLocal = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap091ToLocal)),

DefMap10ToLocal = #{<<"src-protocol">> => <<"amqp10">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(DefMap10ToLocal)),

DefMapLocalTo10 = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefMapLocalTo10)),
ok.

test_protocols_with_maps(_Config) ->
DefMap1 = #{<<"src-protocol">> => <<"amqp091">>,
<<"dest-protocol">> => <<"amqp10">>},
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap1)),

DefMap2 = #{<<"src-protocol">> => <<"local">>,
<<"dest-protocol">> => <<"local">>},
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMap2)),

DefMap3 = #{<<"src-protocol">> => <<"amqp10">>},
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap3)),

DefMap4 = #{<<"dest-protocol">> => <<"local">>},
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap4)),
ok.
Loading