Skip to content

Commit 9f2ff6c

Browse files
committed
Speed up Direct Reply-To
Speed up Direct Reply-To in AMQP 0.9.1 because prior to this PR sending each RPC reply was bottlenecked by the slowest network link between the RabbitMQ node the responder connected to and all other RabbitMQ nodes. This regression was introduced in RabbitMQ 3.12.0 via d656371
1 parent 67ac485 commit 9f2ff6c

File tree

3 files changed

+7
-8
lines changed

3 files changed

+7
-8
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ send_command(Pid, Msg) ->
300300

301301
-spec deliver_reply(binary(), mc:state()) -> 'ok'.
302302
deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
303-
Nodes = rabbit_nodes:all_running_with_hashes(),
303+
Nodes = nodes_with_hashes(),
304304
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
305305
{ok, Pid, Key} ->
306306
delegate:invoke_no_result(
@@ -323,7 +323,7 @@ deliver_reply_local(Pid, Key, Message) ->
323323
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
324324
exists;
325325
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
326-
Nodes = rabbit_nodes:all_running_with_hashes(),
326+
Nodes = nodes_with_hashes(),
327327
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
328328
{ok, Pid, Key} ->
329329
Msg = {declare_fast_reply_to, Key},
@@ -336,6 +336,9 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
336336
declare_fast_reply_to(_) ->
337337
not_found.
338338

339+
nodes_with_hashes() ->
340+
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.
341+
339342
-spec list() -> [pid()].
340343

341344
list() ->

deps/rabbit/src/rabbit_direct_reply_to.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ compute_key_and_suffix(Pid) ->
2020
PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid),
2121
%% Note: we hash the entire node name. This is sufficient for our needs of shortening node name
2222
%% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member
23-
%% in rabbit_nodes:all_running_with_hashes/0.
23+
%% in rabbit_channel:nodes_with_hashes/0.
2424
%%
2525
%% We also use a synthetic node prefix because the hash alone will be sufficient to
2626
NodeHash = erlang:phash2(Node),

deps/rabbit/src/rabbit_nodes.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
await_running_count/2, is_single_node_cluster/0,
2929
boot/0]).
3030
-export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]).
31-
-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0,
31+
-export([all/0, target_cluster_size_hint/0, reached_target_cluster_size/0,
3232
if_reached_target_cluster_size/2]).
3333
-export([lock_id/1, lock_retries/0]).
3434
-export([me_in_nodes/1, nodes_incl_me/1, nodes_excl_me/1]).
@@ -614,10 +614,6 @@ await_running_count_with_retries(TargetCount, Retries) ->
614614
await_running_count_with_retries(TargetCount, Retries - 1)
615615
end.
616616

617-
-spec all_running_with_hashes() -> #{non_neg_integer() => node()}.
618-
all_running_with_hashes() ->
619-
maps:from_list([{erlang:phash2(Node), Node} || Node <- list_running()]).
620-
621617
-spec target_cluster_size_hint() -> non_neg_integer().
622618
target_cluster_size_hint() ->
623619
cluster_formation_key_or_default(target_cluster_size_hint, ?DEFAULT_TARGET_CLUSTER_SIZE).

0 commit comments

Comments
 (0)