Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 294 additions & 2 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

-export([
parse/2,
parse_source/1,
parse_dest/4,
validate_src/1,
validate_dest/1,
validate_src_funs/2,
validate_dest_funs/2,
source_uri/1,
dest_uri/1,
source_protocol/1,
Expand All @@ -42,11 +48,39 @@
%% from and can break with the next upgrade. It should not be used by
%% another one that the one who created it or survive a node restart.
%% Thus, function references have been replace by the following MFA.
-export([decl_fun/3, check_fun/3, publish_fun/4, props_fun_timestamp_header/4,
props_fun_forward_header/5]).
-export([decl_fun/3,
check_fun/3,
publish_fun/4,
props_fun_timestamp_header/4,
props_fun_forward_header/5,
dest_decl/4,
dest_check/4,
src_decl_exchange/4,
src_decl_queue/4,
src_check_queue/4,
fields_fun/5,
props_fun/9
]).

-import(rabbit_misc, [pget/2, pget/3]).
-import(rabbit_shovel_util, [
pget2count/3,
deobfuscated_uris/2,
validate_uri_fun/1
]).

-define(APP, rabbitmq_shovel).
-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).

-rabbit_boot_step(
{rabbit_amqp091_shovel_protocol,
[{description, "AMQP091 shovel protocol"},
{mfa, {rabbit_registry, register,
[shovel_protocol, <<"amqp091">>, ?MODULE]}},
{cleanup, {rabbit_registry, unregister,
[shovel_protocol, <<"amqp091">>]}},
{requires, rabbit_registry}]}).

parse(_Name, {source, Source}) ->
Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1,
proplists:get_value(prefetch_count, Source,
Expand Down Expand Up @@ -79,6 +113,147 @@ parse(Name, {destination, Dest}) ->
add_forward_headers => AFH,
add_timestamp_header => ATH}.

parse_source(Def) ->
SrcURIs = deobfuscated_uris(<<"src-uri">>, Def),
SrcX = pget(<<"src-exchange">>,Def, none),
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
SrcQ = pget(<<"src-queue">>, Def, none),
SrcQArgs = pget(<<"src-queue-args">>, Def, #{}),
SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])),
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared),
{SrcDeclFun, Queue, DestHeaders} =
case SrcQ of
none -> {{?MODULE, src_decl_exchange, [SrcX, SrcXKey]}, <<>>,
[{<<"src-exchange">>, SrcX},
{<<"src-exchange-key">>, SrcXKey}]};
_ -> case Predeclared of
false ->
{{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]},
SrcQ, [{<<"src-queue">>, SrcQ}]};
true ->
{{?MODULE, src_check_queue, [SrcQ, SrcQArgs]},
SrcQ, [{<<"src-queue">>, SrcQ}]}
end
end,
DeleteAfter = pget(<<"src-delete-after">>, Def,
pget(<<"delete-after">>, Def, <<"never">>)),
PrefetchCount = pget(<<"src-prefetch-count">>, Def,
pget(<<"prefetch-count">>, Def, 1000)),
%% Details are only used for status report in rabbitmqctl, as vhost is not
%% available to query the runtime parameters.
Details = maps:from_list([{K, V} || {K, V} <- [{source_exchange, SrcX},
{source_exchange_key, SrcXKey}],
V =/= none]),
{maps:merge(#{module => rabbit_amqp091_shovel,
uris => SrcURIs,
resource_decl => SrcDeclFun,
queue => Queue,
delete_after => opt_b2a(DeleteAfter),
prefetch_count => PrefetchCount,
consumer_args => SrcCArgs
}, Details), DestHeaders}.

parse_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
DestURIs = deobfuscated_uris(<<"dest-uri">>, Def),
DestX = pget(<<"dest-exchange">>, Def, none),
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
DestQ = pget(<<"dest-queue">>, Def, none),
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared),
DestDeclFun = case Predeclared of
true -> {?MODULE, dest_check, [DestQ, DestQArgs]};
false -> {?MODULE, dest_decl, [DestQ, DestQArgs]}
end,

{X, Key} = case DestQ of
none -> {DestX, DestXKey};
_ -> {<<>>, DestQ}
end,
Table2 = [{K, V} || {K, V} <- [{<<"dest-exchange">>, DestX},
{<<"dest-exchange-key">>, DestXKey},
{<<"dest-queue">>, DestQ}],
V =/= none],
AddHeadersLegacy = pget(<<"add-forward-headers">>, Def, false),
AddHeaders = pget(<<"dest-add-forward-headers">>, Def, AddHeadersLegacy),
Table0 = [{<<"shovelled-by">>, ClusterName},
{<<"shovel-type">>, <<"dynamic">>},
{<<"shovel-name">>, Name},
{<<"shovel-vhost">>, VHost}],
SetProps = lookup_indices(pget(<<"dest-publish-properties">>, Def,
pget(<<"publish-properties">>, Def, [])),
record_info(fields, 'P_basic')),
AddTimestampHeaderLegacy = pget(<<"add-timestamp-header">>, Def, false),
AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def,
AddTimestampHeaderLegacy),
%% Details are only used for status report in rabbitmqctl, as vhost is not
%% available to query the runtime parameters.
Details = maps:from_list([{K, V} || {K, V} <- [{dest_exchange, DestX},
{dest_exchange_key, DestXKey},
{dest_queue, DestQ}],
V =/= none]),
maps:merge(#{module => rabbit_amqp091_shovel,
uris => DestURIs,
resource_decl => DestDeclFun,
fields_fun => {?MODULE, fields_fun, [X, Key]},
props_fun => {?MODULE, props_fun, [Table0, Table2, SetProps,
AddHeaders, SourceHeaders,
AddTimestampHeader]}
}, Details).

validate_src(Def) ->
[case pget2count(<<"src-exchange">>, <<"src-queue">>, Def) of
zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []};
one -> ok;
both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []}
end,
case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of
{N, <<"no-ack">>} when is_integer(N) ->
{error, "Cannot specify 'no-ack' and numerical 'delete-after'", []};
_ ->
ok
end].

validate_dest(Def) ->
[case pget2count(<<"dest-exchange">>, <<"dest-queue">>, Def) of
zero -> ok;
one -> ok;
both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []}
end].

validate_src_funs(_Def, User) ->
[
{<<"src-uri">>, validate_uri_fun(User), mandatory},
{<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional},
{<<"src-consumer-args">>, fun rabbit_shovel_util:validate_consumer_args/2, optional},
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
%% a deprecated pre-3.7 setting
{<<"delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional},
%% currently used multi-protocol friend name, introduced in 3.7
{<<"src-delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional},
{<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
].

validate_dest_funs(_Def, User) ->
[{<<"dest-uri">>, validate_uri_fun(User), mandatory},
{<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional},
{<<"dest-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional},
{<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"publish-properties">>, fun validate_properties/2, optional},
{<<"dest-publish-properties">>, fun validate_properties/2, optional},
{<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
].

connect_source(Conf = #{name := Name,
source := #{uris := Uris} = Src}) ->
{Conn, Chan, Uri} = make_conn_and_chan(Uris, Name),
Expand Down Expand Up @@ -572,3 +747,120 @@ parse_binary(Binary) when is_binary(Binary) ->
Binary;
parse_binary(NotABinary) ->
fail({require_binary, NotABinary}).

lookup_indices(KVs0, L) ->
KVs = rabbit_data_coercion:to_proplist(KVs0),
[{1 + list_find(list_to_atom(binary_to_list(K)), L), V} || {K, V} <- KVs].

opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B));
opt_b2a(N) -> N.

list_find(K, L) -> list_find(K, L, 1).

list_find(K, [K|_], N) -> N;
list_find(K, [], _N) -> exit({not_found, K});
list_find(K, [_|L], N) -> list_find(K, L, N + 1).

dest_decl(DestQ, DestQArgs, Conn, _Ch) ->
case DestQ of
none -> ok;
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
end.

dest_check(DestQ, DestQArgs, Conn, _Ch) ->
case DestQ of
none -> ok;
_ -> check_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
end.

src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) ->
Ms = [#'queue.declare'{exclusive = true},
#'queue.bind'{routing_key = SrcXKey,
exchange = SrcX}],
[amqp_channel:call(Ch, M) || M <- Ms].

src_decl_queue(SrcQ, SrcQArgs, Conn, _Ch) ->
ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)).

src_check_queue(SrcQ, SrcQArgs, Conn, _Ch) ->
check_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)).

ensure_queue(Conn, Queue, XArgs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
passive = true})
catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
{ok, Ch2} = amqp_connection:open_channel(Conn),
amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
durable = true,
arguments = XArgs}),
catch amqp_channel:close(Ch2)

after
catch amqp_channel:close(Ch)
end.

check_queue(Conn, Queue, _XArgs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
passive = true})
after
catch amqp_channel:close(Ch)
end.

fields_fun(X, Key, _SrcURI, _DestURI, P0) ->
P1 = case X of
none -> P0;
_ -> P0#'basic.publish'{exchange = X}
end,
case Key of
none -> P1;
_ -> P1#'basic.publish'{routing_key = Key}
end.

props_fun(Table0, Table2, SetProps, AddHeaders, SourceHeaders, AddTimestampHeader,
SrcURI, DestURI, P0) ->
P = set_properties(P0, SetProps),
P1 = case AddHeaders of
true -> rabbit_shovel_util:update_headers(
Table0, SourceHeaders ++ Table2,
SrcURI, DestURI, P);
false -> P
end,
case AddTimestampHeader of
true -> rabbit_shovel_util:add_timestamp_header(P1);
false -> P1
end.

set_properties(Props, []) ->
Props;
set_properties(Props, [{Ix, V} | Rest]) ->
set_properties(setelement(Ix, Props, V), Rest).

%% TODO headers?
validate_properties(Name, Term0) ->
Term = case Term0 of
T when is_map(T) ->
rabbit_data_coercion:to_proplist(Term0);
T when is_list(T) ->
rabbit_data_coercion:to_proplist(Term0);
Other -> Other
end,
Str = fun rabbit_parameter_validation:binary/2,
Num = fun rabbit_parameter_validation:number/2,
rabbit_parameter_validation:proplist(
Name, [{<<"content_type">>, Str, optional},
{<<"content_encoding">>, Str, optional},
{<<"delivery_mode">>, Num, optional},
{<<"priority">>, Num, optional},
{<<"correlation_id">>, Str, optional},
{<<"reply_to">>, Str, optional},
{<<"expiration">>, Str, optional},
{<<"message_id">>, Str, optional},
{<<"timestamp">>, Num, optional},
{<<"type">>, Str, optional},
{<<"user_id">>, Str, optional},
{<<"app_id">>, Str, optional},
{<<"cluster_id">>, Str, optional}], Term).
Loading
Loading