diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl index 82314c9b3ff..a4784f9d45e 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl @@ -15,7 +15,7 @@ -export([validate/5, notify/5, notify_clear/4]). -export([register/0, unregister/0, parse/3]). -export([obfuscate_uris_in_definition/1]). - +-export([src_protocol/1, dest_protocol/1, protocols/1]). -export([is_internal/1, internal_owner/1]). -import(rabbit_misc, [pget/2, pget/3, pset/3]). @@ -143,6 +143,31 @@ validate_internal_owner(Name, Term0) -> ['exchange', 'queue'])}, {<<"virtual_host">>, fun rabbit_parameter_validation:binary/2}], Term). +src_protocol(Def) when is_map(Def) -> + src_protocol(rabbit_data_coercion:to_proplist(Def)); +src_protocol(Def) when is_list(Def) -> + case lists:keyfind(<<"src-protocol">>, 1, Def) of + {_, SrcProtocol} -> + rabbit_data_coercion:to_atom(SrcProtocol); + false -> amqp091 + end. + +dest_protocol(Def) when is_map(Def) -> + dest_protocol(rabbit_data_coercion:to_proplist(Def)); +dest_protocol(Def) when is_list(Def) -> + case lists:keyfind(<<"dest-protocol">>, 1, Def) of + {_, DstProtocol} -> + rabbit_data_coercion:to_atom(DstProtocol); + false -> amqp091 + end. + +protocols(Def) when is_map(Def) -> + protocols(rabbit_data_coercion:to_proplist(Def)); +protocols(Def) -> + Src = src_protocol(Def), + Dst = dest_protocol(Def), + {Src, Dst}. + %%---------------------------------------------------------------------------- parse({VHost, Name}, ClusterName, Def) -> @@ -177,20 +202,5 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm; translate_ack_mode(<<"on-publish">>) -> on_publish; translate_ack_mode(<<"no-ack">>) -> no_ack. -protocols(Def) when is_map(Def) -> - protocols(rabbit_data_coercion:to_proplist(Def)); -protocols(Def) -> - Src = case lists:keyfind(<<"src-protocol">>, 1, Def) of - {_, SrcProtocol} -> - rabbit_data_coercion:to_atom(SrcProtocol); - false -> amqp091 - end, - Dst = case lists:keyfind(<<"dest-protocol">>, 1, Def) of - {_, DstProtocol} -> - rabbit_data_coercion:to_atom(DstProtocol); - false -> amqp091 - end, - {Src, Dst}. - list_all_protocols() -> [P || {P, _} <- rabbit_registry:lookup_all(shovel_protocol)]. diff --git a/deps/rabbitmq_shovel/test/unit_parsing_and_validation_SUITE.erl b/deps/rabbitmq_shovel/test/unit_parsing_and_validation_SUITE.erl index 59e6e982ecb..1afdb4d2bac 100644 --- a/deps/rabbitmq_shovel/test/unit_parsing_and_validation_SUITE.erl +++ b/deps/rabbitmq_shovel/test/unit_parsing_and_validation_SUITE.erl @@ -27,7 +27,8 @@ groups() -> [ {tests, [parallel], [ parse_amqp091, - parse_amqp10_mixed + parse_amqp10_mixed, + parse_local ]} ]. @@ -53,7 +54,7 @@ end_per_testcase(_Testcase, Config) -> Config. %% ------------------------------------------------------------------- -%% Testcases. +%% Test cases %% ------------------------------------------------------------------- parse_amqp091(_Config) -> @@ -128,3 +129,46 @@ parse_amqp10_mixed(_Config) -> add_forward_headers := true}}}, rabbit_shovel_config:parse(my_shovel, In)), ok. + +parse_local(_Config) -> + Amqp091Src = {source, [ + {protocol, local}, + {uris, ["ampq://myhost:5672/vhost"]}, + {declarations, []}, + {queue, <<"the-queue">>}, + {delete_after, never}, + {prefetch_count, 10}]}, + Amqp091Dst = {destination, [ + {protocol, local}, + {uris, ["ampq://myhost:5672"]}, + {declarations, []}, + {publish_properties, [{delivery_mode, 1}]}, + {publish_fields, []}, + {add_forward_headers, true}]}, + In = [Amqp091Src, + Amqp091Dst, + {ack_mode, on_confirm}, + {reconnect_delay, 2}], + + ?assertMatch( + {ok, #{name := my_shovel, + ack_mode := on_confirm, + reconnect_delay := 2, + shovel_type := static, + dest := #{ + module := rabbit_local_shovel, + uris := ["ampq://myhost:5672"], + exchange := none, + routing_key := none, + resource_decl := _DDecl, + add_timestamp_header := false, + add_forward_headers := true}, + source := #{ + module := rabbit_local_shovel, + uris := ["ampq://myhost:5672/vhost"], + queue := <<"the-queue">>, + consumer_args := [], + delete_after := never, + resource_decl := _SDecl}}}, + rabbit_shovel_config:parse(my_shovel, In)), + ok. \ No newline at end of file diff --git a/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl b/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl index 8cdb71fb9fb..b492b699bd9 100644 --- a/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl +++ b/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl @@ -34,7 +34,15 @@ groups() -> parse_amqp10, parse_amqp10_minimal, validate_amqp10, - validate_amqp10_with_a_map + validate_amqp10_with_a_map, + test_src_protocol_defaults, + test_src_protocol_explicit, + test_dest_protocol_defaults, + test_dest_protocol_explicit, + test_protocols_defaults, + test_protocols_explicit, + test_protocols_mixed, + test_protocols_with_maps ]} ]. @@ -389,3 +397,169 @@ validate_ok([[_|_] = L | T]) -> validate_ok([]) -> []; validate_ok(X) -> exit({not_ok, X}). + +%% ------------------------------------------------------------------- +%% Protocol detection tests +%% ------------------------------------------------------------------- + +test_src_protocol_defaults(_Config) -> + DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>}, + {<<"dest-uri">>, <<"amqp://remote">>}], + ?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefProplist)), + + DefMap = #{<<"src-uri">> => <<"amqp://localhost">>, + <<"dest-uri">> => <<"amqp://remote">>}, + ?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap)), + ok. + +test_src_protocol_explicit(_Config) -> + Def091 = [{<<"src-protocol">>, <<"amqp091">>}], + ?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(Def091)), + + Def10 = [{<<"src-protocol">>, <<"amqp10">>}], + ?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(Def10)), + + DefLocal = [{<<"src-protocol">>, <<"local">>}], + ?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefLocal)), + + DefMap091 = #{<<"src-protocol">> => <<"amqp091">>}, + ?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap091)), + + DefMap10 = #{<<"src-protocol">> => <<"amqp10">>}, + ?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(DefMap10)), + + DefMapLocal = #{<<"src-protocol">> => <<"local">>}, + ?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefMapLocal)), + ok. + +test_dest_protocol_defaults(_Config) -> + DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>}, + {<<"dest-uri">>, <<"amqp://remote">>}], + ?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefProplist)), + + DefMap = #{<<"src-uri">> => <<"amqp://localhost">>, + <<"dest-uri">> => <<"amqp://remote">>}, + ?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap)), + ok. + +test_dest_protocol_explicit(_Config) -> + Def091 = [{<<"dest-protocol">>, <<"amqp091">>}], + ?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(Def091)), + + Def10 = [{<<"dest-protocol">>, <<"amqp10">>}], + ?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(Def10)), + + DefLocal = [{<<"dest-protocol">>, <<"local">>}], + ?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefLocal)), + + DefMap091 = #{<<"dest-protocol">> => <<"amqp091">>}, + ?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap091)), + + DefMap10 = #{<<"dest-protocol">> => <<"amqp10">>}, + ?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(DefMap10)), + + DefMapLocal = #{<<"dest-protocol">> => <<"local">>}, + ?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefMapLocal)), + ok. + +test_protocols_defaults(_Config) -> + DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>}, + {<<"dest-uri">>, <<"amqp://remote">>}], + ?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefProplist)), + + DefMap = #{<<"src-uri">> => <<"amqp://localhost">>, + <<"dest-uri">> => <<"amqp://remote">>}, + ?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap)), + ok. + +test_protocols_explicit(_Config) -> + Def091 = [{<<"src-protocol">>, <<"amqp091">>}, + {<<"dest-protocol">>, <<"amqp091">>}], + ?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(Def091)), + + Def10 = [{<<"src-protocol">>, <<"amqp10">>}, + {<<"dest-protocol">>, <<"amqp10">>}], + ?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(Def10)), + + DefLocal = [{<<"src-protocol">>, <<"local">>}, + {<<"dest-protocol">>, <<"local">>}], + ?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefLocal)), + + DefMap091 = #{<<"src-protocol">> => <<"amqp091">>, + <<"dest-protocol">> => <<"amqp091">>}, + ?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap091)), + + DefMap10 = #{<<"src-protocol">> => <<"amqp10">>, + <<"dest-protocol">> => <<"amqp10">>}, + ?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(DefMap10)), + + DefMapLocal = #{<<"src-protocol">> => <<"local">>, + <<"dest-protocol">> => <<"local">>}, + ?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMapLocal)), + ok. + +test_protocols_mixed(_Config) -> + Def091to10 = [{<<"src-protocol">>, <<"amqp091">>}, + {<<"dest-protocol">>, <<"amqp10">>}], + ?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(Def091to10)), + + Def10to091 = [{<<"src-protocol">>, <<"amqp10">>}, + {<<"dest-protocol">>, <<"amqp091">>}], + ?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(Def10to091)), + + DefLocalTo091 = [{<<"src-protocol">>, <<"local">>}, + {<<"dest-protocol">>, <<"amqp091">>}], + ?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefLocalTo091)), + + Def091ToLocal = [{<<"src-protocol">>, <<"amqp091">>}, + {<<"dest-protocol">>, <<"local">>}], + ?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(Def091ToLocal)), + + Def10ToLocal = [{<<"src-protocol">>, <<"amqp10">>}, + {<<"dest-protocol">>, <<"local">>}], + ?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(Def10ToLocal)), + + DefLocalTo10 = [{<<"src-protocol">>, <<"local">>}, + {<<"dest-protocol">>, <<"amqp10">>}], + ?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefLocalTo10)), + + DefMap091to10 = #{<<"src-protocol">> => <<"amqp091">>, + <<"dest-protocol">> => <<"amqp10">>}, + ?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap091to10)), + + DefMap10to091 = #{<<"src-protocol">> => <<"amqp10">>, + <<"dest-protocol">> => <<"amqp091">>}, + ?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap10to091)), + + DefMapLocalTo091 = #{<<"src-protocol">> => <<"local">>, + <<"dest-protocol">> => <<"amqp091">>}, + ?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefMapLocalTo091)), + + DefMap091ToLocal = #{<<"src-protocol">> => <<"amqp091">>, + <<"dest-protocol">> => <<"local">>}, + ?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap091ToLocal)), + + DefMap10ToLocal = #{<<"src-protocol">> => <<"amqp10">>, + <<"dest-protocol">> => <<"local">>}, + ?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(DefMap10ToLocal)), + + DefMapLocalTo10 = #{<<"src-protocol">> => <<"local">>, + <<"dest-protocol">> => <<"amqp10">>}, + ?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefMapLocalTo10)), + ok. + +test_protocols_with_maps(_Config) -> + DefMap1 = #{<<"src-protocol">> => <<"amqp091">>, + <<"dest-protocol">> => <<"amqp10">>}, + ?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap1)), + + DefMap2 = #{<<"src-protocol">> => <<"local">>, + <<"dest-protocol">> => <<"local">>}, + ?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMap2)), + + DefMap3 = #{<<"src-protocol">> => <<"amqp10">>}, + ?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap3)), + + DefMap4 = #{<<"dest-protocol">> => <<"local">>}, + ?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap4)), + ok.