diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index e3492653534f..cd647525c425 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -63,7 +63,10 @@ format_message_queue/2]). %% Internal --export([list_local/0, emit_info_local/3, deliver_reply_local/3]). +-export([list_local/0, + emit_info_local/3, + deliver_reply_local/3, + declare_fast_reply_to_local/2]). -export([get_vhost/1, get_user/1]). %% For testing -export([build_topic_variable_map/3]). @@ -313,34 +316,56 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) -> %% We want to ensure people can't use this mechanism to send a message %% to an arbitrary process and kill it! - -spec deliver_reply_local(pid(), binary(), mc:state()) -> 'ok'. - deliver_reply_local(Pid, Key, Message) -> - case pg_local:in_group(rabbit_channels, Pid) of - true -> gen_server2:cast(Pid, {deliver_reply, Key, Message}); - false -> ok + case is_local(Pid) of + true -> + gen_server2:cast(Pid, {deliver_reply, Key, Message}); + false -> + ok end. +-spec declare_fast_reply_to(binary()) -> + exists | not_found. declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> exists; declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> Nodes = nodes_with_hashes(), case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of {ok, Pid, Key} -> - Msg = {declare_fast_reply_to, Key}, - rabbit_misc:with_exit_handler( - rabbit_misc:const(not_found), - fun() -> gen_server2:call(Pid, Msg, infinity) end); + Req = {declare_fast_reply_to, Key}, + MFA = {?MODULE, declare_fast_reply_to_local, [Req]}, + try delegate:invoke(Pid, MFA) + catch error:undef -> + %% We are backward compat without introducing a feature flag. + rabbit_misc:with_exit_handler( + rabbit_misc:const(not_found), + fun() -> gen_server2:call(Pid, Req, infinity) end); + _:_ -> + not_found + end; {error, _} -> not_found end; declare_fast_reply_to(_) -> not_found. +-spec declare_fast_reply_to_local(pid(), binary()) -> + exists | not_found. +declare_fast_reply_to_local(Pid, Request) -> + case is_local(Pid) of + true -> + gen_server2:call(Pid, Request, infinity); + false -> + not_found + end. + nodes_with_hashes() -> #{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}. +is_local(Pid) -> + pg_local:in_group(rabbit_channels, Pid). + -spec list() -> [pid()]. list() ->