Skip to content

Commit ef51251

Browse files
committed
rabbit_peer_discovery: Remove the use of group leader proxy
1 parent bca2fb0 commit ef51251

File tree

1 file changed

+43
-124
lines changed

1 file changed

+43
-124
lines changed

deps/rabbit/src/rabbit_peer_discovery.erl

Lines changed: 43 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
normalize/1,
2727
append_node_prefix/1,
2828
node_prefix/0]).
29-
-export([do_query_node_props/1,
30-
group_leader_proxy/2]).
29+
-export([do_query_node_props/2]).
3130

3231
-ifdef(TEST).
3332
-export([query_node_props/1,
@@ -382,7 +381,8 @@ check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
382381
%% @private
383382

384383
query_node_props(Nodes) when Nodes =/= [] ->
385-
{Prefix, Suffix} = rabbit_nodes_common:parts(node()),
384+
ThisNode = node(),
385+
{Prefix, Suffix} = rabbit_nodes_common:parts(ThisNode),
386386
PeerName = peer:random_name(Prefix),
387387
%% We go through a temporary hidden node to query all other discovered
388388
%% peers properties, instead of querying them directly.
@@ -444,7 +444,7 @@ query_node_props(Nodes) when Nodes =/= [] ->
444444
[Peer],
445445
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
446446
try
447-
peer:call(Pid, ?MODULE, do_query_node_props, [Nodes], 180000)
447+
peer:call(Pid, ?MODULE, do_query_node_props, [Nodes, ThisNode], 180000)
448448
after
449449
peer:stop(Pid)
450450
end;
@@ -567,80 +567,25 @@ maybe_add_tls_arguments(VMArgs) ->
567567
end,
568568
VMArgs2.
569569

570-
do_query_node_props(Nodes) when Nodes =/= [] ->
570+
do_query_node_props(Nodes, ThisNode) when Nodes =/= [] ->
571571
%% Make sure all log messages are forwarded from this temporary hidden
572572
%% node to the upstream node, regardless of their level.
573573
_ = logger:set_primary_config(level, debug),
574574

575-
%% The group leader for all processes on this temporary hidden node is the
576-
%% calling process' group leader on the upstream node.
577-
%%
578-
%% When we use `erpc:call/4' (or the multicall equivalent) to execute code
579-
%% on one of the `Nodes', the remotely executed code will also use the
580-
%% calling process' group leader by default.
581-
%%
582-
%% We use this temporary hidden node to ensure the downstream node will
583-
%% not connected to the upstream node. Therefore, we must change the group
584-
%% leader as well, otherwise any I/O from the downstream node will send a
585-
%% message to the upstream node's group leader and thus open a connection.
586-
%% This would defeat the entire purpose of this temporary hidden node.
587-
%%
588-
%% To avoid this, we start a proxy process which we will use as a group
589-
%% leader. This process will send all messages it receives to the group
590-
%% leader on the upstream node.
591-
%%
592-
%% There is one caveat: the logger (local to the temporary hidden node)
593-
%% forwards log messages to the upstream logger (on the upstream node)
594-
%% only if the group leader of that message is a remote PID. Because we
595-
%% set a local PID, it stops forwarding log messages originating from that
596-
%% temporary hidden node. That's why we use `with_group_leader_proxy/2' to
597-
%% set the group leader to our proxy only around the use of `erpc'.
598-
%%
599-
%% That's a lot just to keep logging working while not reveal the upstream
600-
%% node to the downstream node...
601-
Parent = self(),
602-
UpstreamGroupLeader = erlang:group_leader(),
603-
ProxyGroupLeader = spawn_link(
604-
?MODULE, group_leader_proxy,
605-
[Parent, UpstreamGroupLeader]),
606-
607575
%% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
608576
%% supported version has it.
609-
MembersPerNode = with_group_leader_proxy(
610-
ProxyGroupLeader,
611-
fun() ->
612-
erpc:multicall(Nodes, rabbit_nodes, all, [])
613-
end),
614-
query_node_props1(Nodes, MembersPerNode, [], ProxyGroupLeader).
615-
616-
with_group_leader_proxy(ProxyGroupLeader, Fun) ->
617-
UpstreamGroupLeader = erlang:group_leader(),
618-
try
619-
true = erlang:group_leader(ProxyGroupLeader, self()),
620-
Fun()
621-
after
622-
true = erlang:group_leader(UpstreamGroupLeader, self())
623-
end.
624-
625-
group_leader_proxy(Parent, UpstreamGroupLeader) ->
626-
receive
627-
stop_proxy ->
628-
erlang:unlink(Parent),
629-
Parent ! proxy_stopped;
630-
Message ->
631-
UpstreamGroupLeader ! Message,
632-
group_leader_proxy(Parent, UpstreamGroupLeader)
633-
end.
577+
MembersPerNode = erpc:multicall(Nodes, rabbit_nodes, all, []),
578+
query_node_props1(Nodes, MembersPerNode, [], ThisNode).
634579

635580
query_node_props1(
636581
[Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps,
637-
ProxyGroupLeader) ->
582+
ThisNode) ->
638583
NodeAndProps = {Node, Members},
639584
NodesAndProps1 = [NodeAndProps | NodesAndProps],
640-
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ProxyGroupLeader);
585+
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ThisNode);
641586
query_node_props1(
642587
[Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps,
643-
ProxyGroupLeader) ->
588+
ThisNode) ->
644589
%% We consider that an error means the remote node is unreachable or not
645590
%% ready. Therefore, we exclude it from the list of discovered nodes as we
646591
%% won't be able to join it anyway.
@@ -649,22 +594,22 @@ query_node_props1(
649594
"Peer discovery: node '~ts' excluded from the discovered nodes",
650595
[Node, Error, Node],
651596
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
652-
query_node_props1(Nodes, MembersPerNode, NodesAndProps, ProxyGroupLeader);
653-
query_node_props1([], [], NodesAndProps, ProxyGroupLeader) ->
597+
query_node_props1(Nodes, MembersPerNode, NodesAndProps, ThisNode);
598+
query_node_props1([], [], NodesAndProps, ThisNode) ->
654599
NodesAndProps1 = lists:reverse(NodesAndProps),
655-
query_node_props2(NodesAndProps1, [], ProxyGroupLeader).
600+
query_node_props2(NodesAndProps1, [], ThisNode).
656601

657-
query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) ->
602+
query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) ->
658603
try
659604
erpc:call(
660605
Node, logger, debug,
661606
["Peer discovery: temporary hidden node '~ts' queries properties "
662607
"from node '~ts'", [node(), Node]]),
663-
StartTime = get_node_start_time(Node, microsecond, ProxyGroupLeader),
664-
IsReady = is_node_db_ready(Node, ProxyGroupLeader),
608+
StartTime = get_node_start_time(Node, microsecond),
609+
IsReady = is_node_db_ready(Node, ThisNode),
665610
NodeAndProps = {Node, Members, StartTime, IsReady},
666611
NodesAndProps1 = [NodeAndProps | NodesAndProps],
667-
query_node_props2(Rest, NodesAndProps1, ProxyGroupLeader)
612+
query_node_props2(Rest, NodesAndProps1, ThisNode)
668613
catch
669614
_:Error:_ ->
670615
%% If one of the erpc calls we use to get the start time fails,
@@ -677,27 +622,18 @@ query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) ->
677622
"Peer discovery: node '~ts' excluded from the discovered nodes",
678623
[Node, Error, Node],
679624
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
680-
query_node_props2(Rest, NodesAndProps, ProxyGroupLeader)
625+
query_node_props2(Rest, NodesAndProps, ThisNode)
681626
end;
682-
query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
627+
query_node_props2([], NodesAndProps, _ThisNode) ->
683628
NodesAndProps1 = lists:reverse(NodesAndProps),
684629
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
685-
%% Wait for the proxy group leader to flush its inbox.
686-
ProxyGroupLeader ! stop_proxy,
687-
receive
688-
proxy_stopped ->
689-
ok
690-
after 120_000 ->
691-
ok
692-
end,
693630
?assertEqual([], nodes()),
694631
?assert(length(NodesAndProps2) =< length(nodes(hidden))),
695632
NodesAndProps2.
696633

697-
-spec get_node_start_time(Node, Unit, ProxyGroupLeader) -> StartTime when
634+
-spec get_node_start_time(Node, Unit) -> StartTime when
698635
Node :: node(),
699636
Unit :: erlang:time_unit(),
700-
ProxyGroupLeader :: pid(),
701637
StartTime :: non_neg_integer().
702638
%% @doc Returns the start time of the given `Node' in `Unit'.
703639
%%
@@ -717,52 +653,35 @@ query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
717653
%%
718654
%% @private
719655

720-
get_node_start_time(Node, Unit, ProxyGroupLeader) ->
721-
with_group_leader_proxy(
722-
ProxyGroupLeader,
723-
fun() ->
724-
NativeStartTime = erpc:call(
725-
Node, erlang, system_info, [start_time]),
726-
TimeOffset = erpc:call(Node, erlang, time_offset, []),
727-
SystemStartTime = NativeStartTime + TimeOffset,
728-
StartTime = erpc:call(
729-
Node, erlang, convert_time_unit,
730-
[SystemStartTime, native, Unit]),
731-
StartTime
732-
end).
733-
734-
-spec is_node_db_ready(Node, ProxyGroupLeader) -> IsReady when
656+
get_node_start_time(Node, Unit) ->
657+
NativeStartTime = erpc:call(Node, erlang, system_info, [start_time]),
658+
TimeOffset = erpc:call(Node, erlang, time_offset, []),
659+
SystemStartTime = NativeStartTime + TimeOffset,
660+
StartTime = erpc:call(
661+
Node, erlang, convert_time_unit,
662+
[SystemStartTime, native, Unit]),
663+
StartTime.
664+
665+
-spec is_node_db_ready(Node, ThisNode) -> IsReady when
735666
Node :: node(),
736-
ProxyGroupLeader :: pid(),
667+
ThisNode :: node(),
737668
IsReady :: boolean() | undefined.
738669
%% @doc Returns if the node's DB layer is ready or not.
739670
%%
740671
%% @private
741672

742-
is_node_db_ready(Node, ProxyGroupLeader) ->
743-
%% This code is running from a temporary hidden node. We derive the real
744-
%% node interested in the properties from the group leader.
745-
UpstreamGroupLeader = erlang:group_leader(),
746-
ThisNode = node(UpstreamGroupLeader),
747-
case Node of
748-
ThisNode ->
749-
%% The current node is running peer discovery, thus way before we
750-
%% mark the DB layer as ready. Consider it ready in this case,
751-
%% otherwise if the current node is selected, it will loop forever
752-
%% waiting for itself to be ready.
753-
true;
754-
_ ->
755-
with_group_leader_proxy(
756-
ProxyGroupLeader,
757-
fun() ->
758-
try
759-
erpc:call(Node, rabbit_db, is_init_finished, [])
760-
catch
761-
_:{exception, undef,
762-
[{rabbit_db, is_init_finished, _, _} | _]} ->
763-
undefined
764-
end
765-
end)
673+
is_node_db_ready(ThisNode, ThisNode) ->
674+
%% The current node is running peer discovery, thus way before we mark the
675+
%% DB layer as ready. Consider it ready in this case, otherwise if the
676+
%% current node is selected, it will loop forever waiting for itself to be
677+
%% ready.
678+
true;
679+
is_node_db_ready(Node, _ThisNode) ->
680+
try
681+
erpc:call(Node, rabbit_db, is_init_finished, [])
682+
catch
683+
_:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} ->
684+
undefined
766685
end.
767686

768687
-spec sort_nodes_and_props(NodesAndProps) ->

0 commit comments

Comments
 (0)