Skip to content

Commit 171c1b5

Browse files
committed
Use delegate for Direct Reply-To
For Direct Reply-To in AMQP 0.9.1, for `queue.declare` use the same pattern as for message delivery.
1 parent f2313b6 commit 171c1b5

File tree

1 file changed

+35
-10
lines changed

1 file changed

+35
-10
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@
6363
format_message_queue/2]).
6464

6565
%% Internal
66-
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
66+
-export([list_local/0,
67+
emit_info_local/3,
68+
deliver_reply_local/3,
69+
declare_fast_reply_to_local/2]).
6770
-export([get_vhost/1, get_user/1]).
6871
%% For testing
6972
-export([build_topic_variable_map/3]).
@@ -313,34 +316,56 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
313316

314317
%% We want to ensure people can't use this mechanism to send a message
315318
%% to an arbitrary process and kill it!
316-
317319
-spec deliver_reply_local(pid(), binary(), mc:state()) -> 'ok'.
318-
319320
deliver_reply_local(Pid, Key, Message) ->
320-
case pg_local:in_group(rabbit_channels, Pid) of
321-
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
322-
false -> ok
321+
case is_local(Pid) of
322+
true ->
323+
gen_server2:cast(Pid, {deliver_reply, Key, Message});
324+
false ->
325+
ok
323326
end.
324327

328+
-spec declare_fast_reply_to(binary()) ->
329+
exists | not_found.
325330
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
326331
exists;
327332
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
328333
Nodes = nodes_with_hashes(),
329334
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
330335
{ok, Pid, Key} ->
331-
Msg = {declare_fast_reply_to, Key},
332-
rabbit_misc:with_exit_handler(
333-
rabbit_misc:const(not_found),
334-
fun() -> gen_server2:call(Pid, Msg, infinity) end);
336+
Req = {declare_fast_reply_to, Key},
337+
MFA = {?MODULE, declare_fast_reply_to_local, [Req]},
338+
try delegate:invoke(Pid, MFA)
339+
catch error:undef ->
340+
%% We are backward compat without introducing a feature flag.
341+
rabbit_misc:with_exit_handler(
342+
rabbit_misc:const(not_found),
343+
fun() -> gen_server2:call(Pid, Req, infinity) end);
344+
_:_ ->
345+
not_found
346+
end;
335347
{error, _} ->
336348
not_found
337349
end;
338350
declare_fast_reply_to(_) ->
339351
not_found.
340352

353+
-spec declare_fast_reply_to_local(pid(), binary()) ->
354+
exists | not_found.
355+
declare_fast_reply_to_local(Pid, Request) ->
356+
case is_local(Pid) of
357+
true ->
358+
gen_server2:call(Pid, Request, infinity);
359+
false ->
360+
not_found
361+
end.
362+
341363
nodes_with_hashes() ->
342364
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.
343365

366+
is_local(Pid) ->
367+
pg_local:in_group(rabbit_channels, Pid).
368+
344369
-spec list() -> [pid()].
345370

346371
list() ->

0 commit comments

Comments
 (0)