|
63 | 63 | format_message_queue/2]). |
64 | 64 |
|
65 | 65 | %% Internal |
66 | | --export([list_local/0, emit_info_local/3, deliver_reply_local/3]). |
| 66 | +-export([list_local/0, |
| 67 | + emit_info_local/3, |
| 68 | + deliver_reply_local/3, |
| 69 | + declare_fast_reply_to_local/2]). |
67 | 70 | -export([get_vhost/1, get_user/1]). |
68 | 71 | %% For testing |
69 | 72 | -export([build_topic_variable_map/3]). |
@@ -313,34 +316,56 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) -> |
313 | 316 |
|
314 | 317 | %% We want to ensure people can't use this mechanism to send a message |
315 | 318 | %% to an arbitrary process and kill it! |
316 | | - |
317 | 319 | -spec deliver_reply_local(pid(), binary(), mc:state()) -> 'ok'. |
318 | | - |
319 | 320 | deliver_reply_local(Pid, Key, Message) -> |
320 | | - case pg_local:in_group(rabbit_channels, Pid) of |
321 | | - true -> gen_server2:cast(Pid, {deliver_reply, Key, Message}); |
322 | | - false -> ok |
| 321 | + case is_local(Pid) of |
| 322 | + true -> |
| 323 | + gen_server2:cast(Pid, {deliver_reply, Key, Message}); |
| 324 | + false -> |
| 325 | + ok |
323 | 326 | end. |
324 | 327 |
|
| 328 | +-spec declare_fast_reply_to(binary()) -> |
| 329 | + exists | not_found. |
325 | 330 | declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> |
326 | 331 | exists; |
327 | 332 | declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> |
328 | 333 | Nodes = nodes_with_hashes(), |
329 | 334 | case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of |
330 | 335 | {ok, Pid, Key} -> |
331 | | - Msg = {declare_fast_reply_to, Key}, |
332 | | - rabbit_misc:with_exit_handler( |
333 | | - rabbit_misc:const(not_found), |
334 | | - fun() -> gen_server2:call(Pid, Msg, infinity) end); |
| 336 | + Req = {declare_fast_reply_to, Key}, |
| 337 | + MFA = {?MODULE, declare_fast_reply_to_local, [Req]}, |
| 338 | + try delegate:invoke(Pid, MFA) |
| 339 | + catch error:undef -> |
| 340 | + %% We are backward compat without introducing a feature flag. |
| 341 | + rabbit_misc:with_exit_handler( |
| 342 | + rabbit_misc:const(not_found), |
| 343 | + fun() -> gen_server2:call(Pid, Req, infinity) end); |
| 344 | + _:_ -> |
| 345 | + not_found |
| 346 | + end; |
335 | 347 | {error, _} -> |
336 | 348 | not_found |
337 | 349 | end; |
338 | 350 | declare_fast_reply_to(_) -> |
339 | 351 | not_found. |
340 | 352 |
|
| 353 | +-spec declare_fast_reply_to_local(pid(), binary()) -> |
| 354 | + exists | not_found. |
| 355 | +declare_fast_reply_to_local(Pid, Request) -> |
| 356 | + case is_local(Pid) of |
| 357 | + true -> |
| 358 | + gen_server2:call(Pid, Request, infinity); |
| 359 | + false -> |
| 360 | + not_found |
| 361 | + end. |
| 362 | + |
341 | 363 | nodes_with_hashes() -> |
342 | 364 | #{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}. |
343 | 365 |
|
| 366 | +is_local(Pid) -> |
| 367 | + pg_local:in_group(rabbit_channels, Pid). |
| 368 | + |
344 | 369 | -spec list() -> [pid()]. |
345 | 370 |
|
346 | 371 | list() -> |
|
0 commit comments