Skip to content

Commit 97c844f

Browse files
ansdmergify[bot]
authored andcommitted
Speed up Direct Reply-To
Speed up Direct Reply-To in AMQP 0.9.1 because prior to this PR sending each RPC reply was bottlenecked by the slowest network link between the RabbitMQ node the responder connected to and all other RabbitMQ nodes. This regression was introduced in RabbitMQ 3.12.0 via d656371 (cherry picked from commit 9f2ff6c) (cherry picked from commit 376d118)
1 parent 4e575be commit 97c844f

File tree

3 files changed

+7
-8
lines changed

3 files changed

+7
-8
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ send_command(Pid, Msg) ->
302302

303303
-spec deliver_reply(binary(), mc:state()) -> 'ok'.
304304
deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
305-
Nodes = rabbit_nodes:all_running_with_hashes(),
305+
Nodes = nodes_with_hashes(),
306306
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
307307
{ok, Pid, Key} ->
308308
delegate:invoke_no_result(
@@ -325,7 +325,7 @@ deliver_reply_local(Pid, Key, Message) ->
325325
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
326326
exists;
327327
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
328-
Nodes = rabbit_nodes:all_running_with_hashes(),
328+
Nodes = nodes_with_hashes(),
329329
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
330330
{ok, Pid, Key} ->
331331
Msg = {declare_fast_reply_to, Key},
@@ -338,6 +338,9 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
338338
declare_fast_reply_to(_) ->
339339
not_found.
340340

341+
nodes_with_hashes() ->
342+
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.
343+
341344
-spec list() -> [pid()].
342345

343346
list() ->

deps/rabbit/src/rabbit_direct_reply_to.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ compute_key_and_suffix(Pid) ->
2020
PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid),
2121
%% Note: we hash the entire node name. This is sufficient for our needs of shortening node name
2222
%% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member
23-
%% in rabbit_nodes:all_running_with_hashes/0.
23+
%% in rabbit_channel:nodes_with_hashes/0.
2424
%%
2525
%% We also use a synthetic node prefix because the hash alone will be sufficient to
2626
NodeHash = erlang:phash2(Node),

deps/rabbit/src/rabbit_nodes.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
await_running_count/2, is_single_node_cluster/0,
2929
boot/0]).
3030
-export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]).
31-
-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0,
31+
-export([all/0, target_cluster_size_hint/0, reached_target_cluster_size/0,
3232
if_reached_target_cluster_size/2]).
3333
-export([lock_id/1, lock_retries/0]).
3434
-export([me_in_nodes/1, nodes_incl_me/1, nodes_excl_me/1]).
@@ -614,10 +614,6 @@ await_running_count_with_retries(TargetCount, Retries) ->
614614
await_running_count_with_retries(TargetCount, Retries - 1)
615615
end.
616616

617-
-spec all_running_with_hashes() -> #{non_neg_integer() => node()}.
618-
all_running_with_hashes() ->
619-
maps:from_list([{erlang:phash2(Node), Node} || Node <- list_running()]).
620-
621617
-spec target_cluster_size_hint() -> non_neg_integer().
622618
target_cluster_size_hint() ->
623619
cluster_formation_key_or_default(target_cluster_size_hint, ?DEFAULT_TARGET_CLUSTER_SIZE).

0 commit comments

Comments
 (0)