diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index a55e9a5e38e3..3f283c71c9ca 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -26,8 +26,7 @@ normalize/1, append_node_prefix/1, node_prefix/0]). --export([do_query_node_props/1, - group_leader_proxy/2]). +-export([do_query_node_props/2]). -ifdef(TEST). -export([query_node_props/1, @@ -378,7 +377,8 @@ check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType) %% @private query_node_props(Nodes) when Nodes =/= [] -> - {Prefix, Suffix} = rabbit_nodes_common:parts(node()), + ThisNode = node(), + {Prefix, Suffix} = rabbit_nodes_common:parts(ThisNode), PeerName = peer:random_name(Prefix), %% We go through a temporary hidden node to query all other discovered %% peers properties, instead of querying them directly. @@ -440,7 +440,12 @@ query_node_props(Nodes) when Nodes =/= [] -> [Peer], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), try - peer:call(Pid, ?MODULE, do_query_node_props, [Nodes], 180000) + NodesAndProps1 = peer:call( + Pid, + ?MODULE, do_query_node_props, + [Nodes, ThisNode], 180000), + NodesAndProps2 = sort_nodes_and_props(NodesAndProps1), + NodesAndProps2 after peer:stop(Pid) end; @@ -563,80 +568,31 @@ maybe_add_tls_arguments(VMArgs) -> end, VMArgs2. -do_query_node_props(Nodes) when Nodes =/= [] -> +do_query_node_props(Nodes, FromNode) when Nodes =/= [] -> %% Make sure all log messages are forwarded from this temporary hidden %% node to the upstream node, regardless of their level. _ = logger:set_primary_config(level, debug), - %% The group leader for all processes on this temporary hidden node is the - %% calling process' group leader on the upstream node. - %% - %% When we use `erpc:call/4' (or the multicall equivalent) to execute code - %% on one of the `Nodes', the remotely executed code will also use the - %% calling process' group leader by default. - %% - %% We use this temporary hidden node to ensure the downstream node will - %% not connected to the upstream node. Therefore, we must change the group - %% leader as well, otherwise any I/O from the downstream node will send a - %% message to the upstream node's group leader and thus open a connection. - %% This would defeat the entire purpose of this temporary hidden node. - %% - %% To avoid this, we start a proxy process which we will use as a group - %% leader. This process will send all messages it receives to the group - %% leader on the upstream node. - %% - %% There is one caveat: the logger (local to the temporary hidden node) - %% forwards log messages to the upstream logger (on the upstream node) - %% only if the group leader of that message is a remote PID. Because we - %% set a local PID, it stops forwarding log messages originating from that - %% temporary hidden node. That's why we use `with_group_leader_proxy/2' to - %% set the group leader to our proxy only around the use of `erpc'. - %% - %% That's a lot just to keep logging working while not reveal the upstream - %% node to the downstream node... - Parent = self(), - UpstreamGroupLeader = erlang:group_leader(), - ProxyGroupLeader = spawn_link( - ?MODULE, group_leader_proxy, - [Parent, UpstreamGroupLeader]), - %% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest %% supported version has it. - MembersPerNode = with_group_leader_proxy( - ProxyGroupLeader, - fun() -> - erpc:multicall(Nodes, rabbit_nodes, all, []) - end), - query_node_props1(Nodes, MembersPerNode, [], ProxyGroupLeader). - -with_group_leader_proxy(ProxyGroupLeader, Fun) -> - UpstreamGroupLeader = erlang:group_leader(), - try - true = erlang:group_leader(ProxyGroupLeader, self()), - Fun() - after - true = erlang:group_leader(UpstreamGroupLeader, self()) - end. - -group_leader_proxy(Parent, UpstreamGroupLeader) -> - receive - stop_proxy -> - erlang:unlink(Parent), - Parent ! proxy_stopped; - Message -> - UpstreamGroupLeader ! Message, - group_leader_proxy(Parent, UpstreamGroupLeader) - end. + MembersPerNode = [try + {ok, + erpc_call(Node, rabbit_nodes, all, [], FromNode)} + catch + Class:Reason -> + {Class, Reason} + end || Node <- Nodes], + query_node_props1(Nodes, MembersPerNode, [], FromNode). query_node_props1( [Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps, - ProxyGroupLeader) -> + FromNode) -> NodeAndProps = {Node, Members}, NodesAndProps1 = [NodeAndProps | NodesAndProps], - query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ProxyGroupLeader); + query_node_props1(Nodes, MembersPerNode, NodesAndProps1, FromNode); query_node_props1( - [Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps, - ProxyGroupLeader) -> + [Node | Nodes], [{_, _} = Error | MembersPerNode], NodesAndProps, + FromNode) -> %% We consider that an error means the remote node is unreachable or not %% ready. Therefore, we exclude it from the list of discovered nodes as we %% won't be able to join it anyway. @@ -645,55 +601,51 @@ query_node_props1( "Peer discovery: node '~ts' excluded from the discovered nodes", [Node, Error, Node], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - query_node_props1(Nodes, MembersPerNode, NodesAndProps, ProxyGroupLeader); -query_node_props1([], [], NodesAndProps, ProxyGroupLeader) -> + query_node_props1(Nodes, MembersPerNode, NodesAndProps, FromNode); +query_node_props1([], [], NodesAndProps, FromNode) -> NodesAndProps1 = lists:reverse(NodesAndProps), - query_node_props2(NodesAndProps1, [], ProxyGroupLeader). - -query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) -> - try - erpc:call( - Node, logger, debug, - ["Peer discovery: temporary hidden node '~ts' queries properties " - "from node '~ts'", [node(), Node]]), - StartTime = get_node_start_time(Node, microsecond, ProxyGroupLeader), - IsReady = is_node_db_ready(Node, ProxyGroupLeader), - NodeAndProps = {Node, Members, StartTime, IsReady}, - NodesAndProps1 = [NodeAndProps | NodesAndProps], - query_node_props2(Rest, NodesAndProps1, ProxyGroupLeader) - catch - _:Error:_ -> - %% If one of the erpc calls we use to get the start time fails, - %% there is something wrong with the remote node because it - %% doesn't depend on RabbitMQ. We exclude it from the discovered - %% nodes. - ?LOG_DEBUG( - "Peer discovery: failed to query start time of node '~ts': " - "~0tp~n" - "Peer discovery: node '~ts' excluded from the discovered nodes", - [Node, Error, Node], - #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - query_node_props2(Rest, NodesAndProps, ProxyGroupLeader) - end; -query_node_props2([], NodesAndProps, ProxyGroupLeader) -> + query_node_props2(NodesAndProps1, [], FromNode). + +query_node_props2([{Node, Members} | Rest], NodesAndProps, FromNode) -> + NodesAndProps2 = try + erpc_call( + Node, logger, debug, + ["Peer discovery: temporary hidden node '~ts' " + "queries properties from node '~ts'", + [node(), Node]], FromNode), + StartTime = get_node_start_time( + Node, microsecond, FromNode), + IsReady = is_node_db_ready(Node, FromNode), + NodeAndProps = {Node, Members, StartTime, IsReady}, + NodesAndProps1 = [NodeAndProps | NodesAndProps], + NodesAndProps1 + catch + _:Error:_ -> + %% If one of the erpc calls we use to get the + %% start time fails, there is something wrong with + %% the remote node because it doesn't depend on + %% RabbitMQ. We exclude it from the discovered + %% nodes. + ?LOG_DEBUG( + "Peer discovery: failed to query start time " + "+ DB readyness of node '~ts': ~0tp~n" + "Peer discovery: node '~ts' excluded from the " + "discovered nodes", + [Node, Error, Node], + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + NodesAndProps + end, + query_node_props2(Rest, NodesAndProps2, FromNode); +query_node_props2([], NodesAndProps, _FromNode) -> NodesAndProps1 = lists:reverse(NodesAndProps), - NodesAndProps2 = sort_nodes_and_props(NodesAndProps1), - %% Wait for the proxy group leader to flush its inbox. - ProxyGroupLeader ! stop_proxy, - receive - proxy_stopped -> - ok - after 120_000 -> - ok - end, ?assertEqual([], nodes()), - ?assert(length(NodesAndProps2) =< length(nodes(hidden))), - NodesAndProps2. + ?assert(length(NodesAndProps1) =< length(nodes(hidden))), + NodesAndProps1. --spec get_node_start_time(Node, Unit, ProxyGroupLeader) -> StartTime when +-spec get_node_start_time(Node, Unit, FromNode) -> StartTime when Node :: node(), Unit :: erlang:time_unit(), - ProxyGroupLeader :: pid(), + FromNode :: node(), StartTime :: non_neg_integer(). %% @doc Returns the start time of the given `Node' in `Unit'. %% @@ -713,52 +665,60 @@ query_node_props2([], NodesAndProps, ProxyGroupLeader) -> %% %% @private -get_node_start_time(Node, Unit, ProxyGroupLeader) -> - with_group_leader_proxy( - ProxyGroupLeader, - fun() -> - NativeStartTime = erpc:call( - Node, erlang, system_info, [start_time]), - TimeOffset = erpc:call(Node, erlang, time_offset, []), - SystemStartTime = NativeStartTime + TimeOffset, - StartTime = erpc:call( - Node, erlang, convert_time_unit, - [SystemStartTime, native, Unit]), - StartTime - end). - --spec is_node_db_ready(Node, ProxyGroupLeader) -> IsReady when +get_node_start_time(Node, Unit, FromNode) -> + NativeStartTime = erpc_call( + Node, erlang, system_info, [start_time], FromNode), + TimeOffset = erpc_call(Node, erlang, time_offset, [], FromNode), + SystemStartTime = NativeStartTime + TimeOffset, + StartTime = erpc_call( + Node, erlang, convert_time_unit, + [SystemStartTime, native, Unit], FromNode), + StartTime. + +-spec is_node_db_ready(Node, FromNode) -> IsReady when Node :: node(), - ProxyGroupLeader :: pid(), + FromNode :: node(), IsReady :: boolean() | undefined. %% @doc Returns if the node's DB layer is ready or not. %% %% @private -is_node_db_ready(Node, ProxyGroupLeader) -> - %% This code is running from a temporary hidden node. We derive the real - %% node interested in the properties from the group leader. - UpstreamGroupLeader = erlang:group_leader(), - ThisNode = node(UpstreamGroupLeader), - case Node of - ThisNode -> - %% The current node is running peer discovery, thus way before we - %% mark the DB layer as ready. Consider it ready in this case, - %% otherwise if the current node is selected, it will loop forever - %% waiting for itself to be ready. - true; - _ -> - with_group_leader_proxy( - ProxyGroupLeader, - fun() -> - try - erpc:call(Node, rabbit_db, is_init_finished, []) - catch - _:{exception, undef, - [{rabbit_db, is_init_finished, _, _} | _]} -> - undefined - end - end) +is_node_db_ready(FromNode, FromNode) -> + %% The function is called for rhe current node running peer discovery, thus + %% way before we mark the DB layer as ready. Consider it ready in this + %% case, otherwise if the current node is selected, it will loop forever + %% waiting for itself to be ready. + true; +is_node_db_ready(Node, FromNode) -> + try + erpc_call(Node, rabbit_db, is_init_finished, [], FromNode) + catch + _:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} -> + undefined + end. + +erpc_call(Node, Mod, Fun, Args, FromNode) -> + erpc_call(Node, Mod, Fun, Args, FromNode, 10000). + +erpc_call(Node, Mod, Fun, Args, FromNode, Timeout) when Timeout >= 0 -> + try + erpc:call(Node, Mod, Fun, Args) + catch + error:{erpc, _} = Reason:Stacktrace -> + Peer = node(), + _ = catch erpc:call( + FromNode, + logger, debug, + ["Peer discovery: temporary hidden node '~ts' " + "failed to connect to '~ts': ~0p", + [Peer, Node, Reason]]), + Sleep = 1000, + timer:sleep(Sleep), + NewTimeout = Timeout - Sleep, + case NewTimeout >= 0 of + true -> erpc_call(Node, Mod, Fun, Args, FromNode, NewTimeout); + false -> erlang:raise(error, Reason, Stacktrace) + end end. -spec sort_nodes_and_props(NodesAndProps) ->