From 9851f9b49b95fabcbff47b1f2d5ebbf41d8fedf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 21 Nov 2024 15:05:35 +0100 Subject: [PATCH 1/3] rabbit_peer_discovery: Remove the use of group leader proxy [Why] This was the first solution put in place to prevent that the temporary hidden node connects to the node that started it to write any printed messages. Because of this, the nodes that the temporary hidden node queried found out about the parent node and they opened an Erlang distribution connection to it. This polluted the known nodes list. However later, the temporary hidden node was started with the `standard_io` connection option. This prevented the temporary hidden node from knowing about the node that started it, solving the problem in a cleaner way. [How] This commit garbage-collects that piece of code that is now useless. It makes the query code way simpler to understand. (cherry picked from commit 62f22a7655d09e8dbcd42a8582f7ae578df4f2af) --- deps/rabbit/src/rabbit_peer_discovery.erl | 167 ++++++---------------- 1 file changed, 43 insertions(+), 124 deletions(-) diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index a55e9a5e38e3..a64e3ef9ca5c 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -26,8 +26,7 @@ normalize/1, append_node_prefix/1, node_prefix/0]). --export([do_query_node_props/1, - group_leader_proxy/2]). +-export([do_query_node_props/2]). -ifdef(TEST). -export([query_node_props/1, @@ -378,7 +377,8 @@ check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType) %% @private query_node_props(Nodes) when Nodes =/= [] -> - {Prefix, Suffix} = rabbit_nodes_common:parts(node()), + ThisNode = node(), + {Prefix, Suffix} = rabbit_nodes_common:parts(ThisNode), PeerName = peer:random_name(Prefix), %% We go through a temporary hidden node to query all other discovered %% peers properties, instead of querying them directly. @@ -440,7 +440,7 @@ query_node_props(Nodes) when Nodes =/= [] -> [Peer], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), try - peer:call(Pid, ?MODULE, do_query_node_props, [Nodes], 180000) + peer:call(Pid, ?MODULE, do_query_node_props, [Nodes, ThisNode], 180000) after peer:stop(Pid) end; @@ -563,80 +563,25 @@ maybe_add_tls_arguments(VMArgs) -> end, VMArgs2. -do_query_node_props(Nodes) when Nodes =/= [] -> +do_query_node_props(Nodes, ThisNode) when Nodes =/= [] -> %% Make sure all log messages are forwarded from this temporary hidden %% node to the upstream node, regardless of their level. _ = logger:set_primary_config(level, debug), - %% The group leader for all processes on this temporary hidden node is the - %% calling process' group leader on the upstream node. - %% - %% When we use `erpc:call/4' (or the multicall equivalent) to execute code - %% on one of the `Nodes', the remotely executed code will also use the - %% calling process' group leader by default. - %% - %% We use this temporary hidden node to ensure the downstream node will - %% not connected to the upstream node. Therefore, we must change the group - %% leader as well, otherwise any I/O from the downstream node will send a - %% message to the upstream node's group leader and thus open a connection. - %% This would defeat the entire purpose of this temporary hidden node. - %% - %% To avoid this, we start a proxy process which we will use as a group - %% leader. This process will send all messages it receives to the group - %% leader on the upstream node. - %% - %% There is one caveat: the logger (local to the temporary hidden node) - %% forwards log messages to the upstream logger (on the upstream node) - %% only if the group leader of that message is a remote PID. Because we - %% set a local PID, it stops forwarding log messages originating from that - %% temporary hidden node. That's why we use `with_group_leader_proxy/2' to - %% set the group leader to our proxy only around the use of `erpc'. - %% - %% That's a lot just to keep logging working while not reveal the upstream - %% node to the downstream node... - Parent = self(), - UpstreamGroupLeader = erlang:group_leader(), - ProxyGroupLeader = spawn_link( - ?MODULE, group_leader_proxy, - [Parent, UpstreamGroupLeader]), - %% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest %% supported version has it. - MembersPerNode = with_group_leader_proxy( - ProxyGroupLeader, - fun() -> - erpc:multicall(Nodes, rabbit_nodes, all, []) - end), - query_node_props1(Nodes, MembersPerNode, [], ProxyGroupLeader). - -with_group_leader_proxy(ProxyGroupLeader, Fun) -> - UpstreamGroupLeader = erlang:group_leader(), - try - true = erlang:group_leader(ProxyGroupLeader, self()), - Fun() - after - true = erlang:group_leader(UpstreamGroupLeader, self()) - end. - -group_leader_proxy(Parent, UpstreamGroupLeader) -> - receive - stop_proxy -> - erlang:unlink(Parent), - Parent ! proxy_stopped; - Message -> - UpstreamGroupLeader ! Message, - group_leader_proxy(Parent, UpstreamGroupLeader) - end. + MembersPerNode = erpc:multicall(Nodes, rabbit_nodes, all, []), + query_node_props1(Nodes, MembersPerNode, [], ThisNode). query_node_props1( [Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps, - ProxyGroupLeader) -> + ThisNode) -> NodeAndProps = {Node, Members}, NodesAndProps1 = [NodeAndProps | NodesAndProps], - query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ProxyGroupLeader); + query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ThisNode); query_node_props1( [Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps, - ProxyGroupLeader) -> + ThisNode) -> %% We consider that an error means the remote node is unreachable or not %% ready. Therefore, we exclude it from the list of discovered nodes as we %% won't be able to join it anyway. @@ -645,22 +590,22 @@ query_node_props1( "Peer discovery: node '~ts' excluded from the discovered nodes", [Node, Error, Node], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - query_node_props1(Nodes, MembersPerNode, NodesAndProps, ProxyGroupLeader); -query_node_props1([], [], NodesAndProps, ProxyGroupLeader) -> + query_node_props1(Nodes, MembersPerNode, NodesAndProps, ThisNode); +query_node_props1([], [], NodesAndProps, ThisNode) -> NodesAndProps1 = lists:reverse(NodesAndProps), - query_node_props2(NodesAndProps1, [], ProxyGroupLeader). + query_node_props2(NodesAndProps1, [], ThisNode). -query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) -> +query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) -> try erpc:call( Node, logger, debug, ["Peer discovery: temporary hidden node '~ts' queries properties " "from node '~ts'", [node(), Node]]), - StartTime = get_node_start_time(Node, microsecond, ProxyGroupLeader), - IsReady = is_node_db_ready(Node, ProxyGroupLeader), + StartTime = get_node_start_time(Node, microsecond), + IsReady = is_node_db_ready(Node, ThisNode), NodeAndProps = {Node, Members, StartTime, IsReady}, NodesAndProps1 = [NodeAndProps | NodesAndProps], - query_node_props2(Rest, NodesAndProps1, ProxyGroupLeader) + query_node_props2(Rest, NodesAndProps1, ThisNode) catch _:Error:_ -> %% If one of the erpc calls we use to get the start time fails, @@ -673,27 +618,18 @@ query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) -> "Peer discovery: node '~ts' excluded from the discovered nodes", [Node, Error, Node], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - query_node_props2(Rest, NodesAndProps, ProxyGroupLeader) + query_node_props2(Rest, NodesAndProps, ThisNode) end; -query_node_props2([], NodesAndProps, ProxyGroupLeader) -> +query_node_props2([], NodesAndProps, _ThisNode) -> NodesAndProps1 = lists:reverse(NodesAndProps), NodesAndProps2 = sort_nodes_and_props(NodesAndProps1), - %% Wait for the proxy group leader to flush its inbox. - ProxyGroupLeader ! stop_proxy, - receive - proxy_stopped -> - ok - after 120_000 -> - ok - end, ?assertEqual([], nodes()), ?assert(length(NodesAndProps2) =< length(nodes(hidden))), NodesAndProps2. --spec get_node_start_time(Node, Unit, ProxyGroupLeader) -> StartTime when +-spec get_node_start_time(Node, Unit) -> StartTime when Node :: node(), Unit :: erlang:time_unit(), - ProxyGroupLeader :: pid(), StartTime :: non_neg_integer(). %% @doc Returns the start time of the given `Node' in `Unit'. %% @@ -713,52 +649,35 @@ query_node_props2([], NodesAndProps, ProxyGroupLeader) -> %% %% @private -get_node_start_time(Node, Unit, ProxyGroupLeader) -> - with_group_leader_proxy( - ProxyGroupLeader, - fun() -> - NativeStartTime = erpc:call( - Node, erlang, system_info, [start_time]), - TimeOffset = erpc:call(Node, erlang, time_offset, []), - SystemStartTime = NativeStartTime + TimeOffset, - StartTime = erpc:call( - Node, erlang, convert_time_unit, - [SystemStartTime, native, Unit]), - StartTime - end). - --spec is_node_db_ready(Node, ProxyGroupLeader) -> IsReady when +get_node_start_time(Node, Unit) -> + NativeStartTime = erpc:call(Node, erlang, system_info, [start_time]), + TimeOffset = erpc:call(Node, erlang, time_offset, []), + SystemStartTime = NativeStartTime + TimeOffset, + StartTime = erpc:call( + Node, erlang, convert_time_unit, + [SystemStartTime, native, Unit]), + StartTime. + +-spec is_node_db_ready(Node, ThisNode) -> IsReady when Node :: node(), - ProxyGroupLeader :: pid(), + ThisNode :: node(), IsReady :: boolean() | undefined. %% @doc Returns if the node's DB layer is ready or not. %% %% @private -is_node_db_ready(Node, ProxyGroupLeader) -> - %% This code is running from a temporary hidden node. We derive the real - %% node interested in the properties from the group leader. - UpstreamGroupLeader = erlang:group_leader(), - ThisNode = node(UpstreamGroupLeader), - case Node of - ThisNode -> - %% The current node is running peer discovery, thus way before we - %% mark the DB layer as ready. Consider it ready in this case, - %% otherwise if the current node is selected, it will loop forever - %% waiting for itself to be ready. - true; - _ -> - with_group_leader_proxy( - ProxyGroupLeader, - fun() -> - try - erpc:call(Node, rabbit_db, is_init_finished, []) - catch - _:{exception, undef, - [{rabbit_db, is_init_finished, _, _} | _]} -> - undefined - end - end) +is_node_db_ready(ThisNode, ThisNode) -> + %% The current node is running peer discovery, thus way before we mark the + %% DB layer as ready. Consider it ready in this case, otherwise if the + %% current node is selected, it will loop forever waiting for itself to be + %% ready. + true; +is_node_db_ready(Node, _ThisNode) -> + try + erpc:call(Node, rabbit_db, is_init_finished, []) + catch + _:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} -> + undefined end. -spec sort_nodes_and_props(NodesAndProps) -> From 869c2e32caa674c9323a620b629a81765da778fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 21 Nov 2024 18:14:25 +0100 Subject: [PATCH 2/3] rabbit_peer_discovery: Fix non-tail-recursive `query_node_props2()` [Why] This impacts what is reported by the catch because it caught exceptions emitted by code supposedly called later. An example is the assert in `query_node_props2/3` last clause. (cherry picked from commit 4d4985f254fb48d3ecf77c5907d4148ea09b4d6b) --- deps/rabbit/src/rabbit_peer_discovery.erl | 52 ++++++++++++----------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index a64e3ef9ca5c..78636bca82b9 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -596,30 +596,34 @@ query_node_props1([], [], NodesAndProps, ThisNode) -> query_node_props2(NodesAndProps1, [], ThisNode). query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) -> - try - erpc:call( - Node, logger, debug, - ["Peer discovery: temporary hidden node '~ts' queries properties " - "from node '~ts'", [node(), Node]]), - StartTime = get_node_start_time(Node, microsecond), - IsReady = is_node_db_ready(Node, ThisNode), - NodeAndProps = {Node, Members, StartTime, IsReady}, - NodesAndProps1 = [NodeAndProps | NodesAndProps], - query_node_props2(Rest, NodesAndProps1, ThisNode) - catch - _:Error:_ -> - %% If one of the erpc calls we use to get the start time fails, - %% there is something wrong with the remote node because it - %% doesn't depend on RabbitMQ. We exclude it from the discovered - %% nodes. - ?LOG_DEBUG( - "Peer discovery: failed to query start time of node '~ts': " - "~0tp~n" - "Peer discovery: node '~ts' excluded from the discovered nodes", - [Node, Error, Node], - #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - query_node_props2(Rest, NodesAndProps, ThisNode) - end; + NodesAndProps2 = try + erpc:call( + Node, logger, debug, + ["Peer discovery: temporary hidden node '~ts' " + "queries properties from node '~ts'", + [node(), Node]]), + StartTime = get_node_start_time(Node, microsecond), + IsReady = is_node_db_ready(Node, ThisNode), + NodeAndProps = {Node, Members, StartTime, IsReady}, + NodesAndProps1 = [NodeAndProps | NodesAndProps], + NodesAndProps1 + catch + _:Error:_ -> + %% If one of the erpc calls we use to get the + %% start time fails, there is something wrong with + %% the remote node because it doesn't depend on + %% RabbitMQ. We exclude it from the discovered + %% nodes. + ?LOG_DEBUG( + "Peer discovery: failed to query start time " + "+ DB readyness of node '~ts': ~0tp~n" + "Peer discovery: node '~ts' excluded from the " + "discovered nodes", + [Node, Error, Node], + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + NodesAndProps + end, + query_node_props2(Rest, NodesAndProps2, ThisNode); query_node_props2([], NodesAndProps, _ThisNode) -> NodesAndProps1 = lists:reverse(NodesAndProps), NodesAndProps2 = sort_nodes_and_props(NodesAndProps1), From 37566feb4bfc79525374ed7861f92a2c65f754dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 21 Nov 2024 15:50:19 +0100 Subject: [PATCH 3/3] rabbit_peer_discovery: Retry RPC calls [Why] In CI, we observe some timeouts in the Erlang distribution connections between the temporary hidden node and the nodes it queries. This affects peer discovery obviously. [How] We introduce some query retries to reduce the risk of an incomplete query. While here, we move the sorting of queried nodes from the `query_node_props2/3` last clause (executed in the temporary hidden node) to the function setting the temporary hidden node and asking for these queries. This way the debug messages from that sorting are logged by RabbitMQ out of the box. (cherry picked from commit f6314d06b34b76450c647a3161a13e161b90649d) --- deps/rabbit/src/rabbit_peer_discovery.erl | 109 +++++++++++++++------- 1 file changed, 73 insertions(+), 36 deletions(-) diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index 78636bca82b9..3f283c71c9ca 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -440,7 +440,12 @@ query_node_props(Nodes) when Nodes =/= [] -> [Peer], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), try - peer:call(Pid, ?MODULE, do_query_node_props, [Nodes, ThisNode], 180000) + NodesAndProps1 = peer:call( + Pid, + ?MODULE, do_query_node_props, + [Nodes, ThisNode], 180000), + NodesAndProps2 = sort_nodes_and_props(NodesAndProps1), + NodesAndProps2 after peer:stop(Pid) end; @@ -563,25 +568,31 @@ maybe_add_tls_arguments(VMArgs) -> end, VMArgs2. -do_query_node_props(Nodes, ThisNode) when Nodes =/= [] -> +do_query_node_props(Nodes, FromNode) when Nodes =/= [] -> %% Make sure all log messages are forwarded from this temporary hidden %% node to the upstream node, regardless of their level. _ = logger:set_primary_config(level, debug), %% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest %% supported version has it. - MembersPerNode = erpc:multicall(Nodes, rabbit_nodes, all, []), - query_node_props1(Nodes, MembersPerNode, [], ThisNode). + MembersPerNode = [try + {ok, + erpc_call(Node, rabbit_nodes, all, [], FromNode)} + catch + Class:Reason -> + {Class, Reason} + end || Node <- Nodes], + query_node_props1(Nodes, MembersPerNode, [], FromNode). query_node_props1( [Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps, - ThisNode) -> + FromNode) -> NodeAndProps = {Node, Members}, NodesAndProps1 = [NodeAndProps | NodesAndProps], - query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ThisNode); + query_node_props1(Nodes, MembersPerNode, NodesAndProps1, FromNode); query_node_props1( - [Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps, - ThisNode) -> + [Node | Nodes], [{_, _} = Error | MembersPerNode], NodesAndProps, + FromNode) -> %% We consider that an error means the remote node is unreachable or not %% ready. Therefore, we exclude it from the list of discovered nodes as we %% won't be able to join it anyway. @@ -590,20 +601,21 @@ query_node_props1( "Peer discovery: node '~ts' excluded from the discovered nodes", [Node, Error, Node], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), - query_node_props1(Nodes, MembersPerNode, NodesAndProps, ThisNode); -query_node_props1([], [], NodesAndProps, ThisNode) -> + query_node_props1(Nodes, MembersPerNode, NodesAndProps, FromNode); +query_node_props1([], [], NodesAndProps, FromNode) -> NodesAndProps1 = lists:reverse(NodesAndProps), - query_node_props2(NodesAndProps1, [], ThisNode). + query_node_props2(NodesAndProps1, [], FromNode). -query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) -> +query_node_props2([{Node, Members} | Rest], NodesAndProps, FromNode) -> NodesAndProps2 = try - erpc:call( + erpc_call( Node, logger, debug, ["Peer discovery: temporary hidden node '~ts' " "queries properties from node '~ts'", - [node(), Node]]), - StartTime = get_node_start_time(Node, microsecond), - IsReady = is_node_db_ready(Node, ThisNode), + [node(), Node]], FromNode), + StartTime = get_node_start_time( + Node, microsecond, FromNode), + IsReady = is_node_db_ready(Node, FromNode), NodeAndProps = {Node, Members, StartTime, IsReady}, NodesAndProps1 = [NodeAndProps | NodesAndProps], NodesAndProps1 @@ -623,17 +635,17 @@ query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) -> #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), NodesAndProps end, - query_node_props2(Rest, NodesAndProps2, ThisNode); -query_node_props2([], NodesAndProps, _ThisNode) -> + query_node_props2(Rest, NodesAndProps2, FromNode); +query_node_props2([], NodesAndProps, _FromNode) -> NodesAndProps1 = lists:reverse(NodesAndProps), - NodesAndProps2 = sort_nodes_and_props(NodesAndProps1), ?assertEqual([], nodes()), - ?assert(length(NodesAndProps2) =< length(nodes(hidden))), - NodesAndProps2. + ?assert(length(NodesAndProps1) =< length(nodes(hidden))), + NodesAndProps1. --spec get_node_start_time(Node, Unit) -> StartTime when +-spec get_node_start_time(Node, Unit, FromNode) -> StartTime when Node :: node(), Unit :: erlang:time_unit(), + FromNode :: node(), StartTime :: non_neg_integer(). %% @doc Returns the start time of the given `Node' in `Unit'. %% @@ -653,37 +665,62 @@ query_node_props2([], NodesAndProps, _ThisNode) -> %% %% @private -get_node_start_time(Node, Unit) -> - NativeStartTime = erpc:call(Node, erlang, system_info, [start_time]), - TimeOffset = erpc:call(Node, erlang, time_offset, []), +get_node_start_time(Node, Unit, FromNode) -> + NativeStartTime = erpc_call( + Node, erlang, system_info, [start_time], FromNode), + TimeOffset = erpc_call(Node, erlang, time_offset, [], FromNode), SystemStartTime = NativeStartTime + TimeOffset, - StartTime = erpc:call( + StartTime = erpc_call( Node, erlang, convert_time_unit, - [SystemStartTime, native, Unit]), + [SystemStartTime, native, Unit], FromNode), StartTime. --spec is_node_db_ready(Node, ThisNode) -> IsReady when +-spec is_node_db_ready(Node, FromNode) -> IsReady when Node :: node(), - ThisNode :: node(), + FromNode :: node(), IsReady :: boolean() | undefined. %% @doc Returns if the node's DB layer is ready or not. %% %% @private -is_node_db_ready(ThisNode, ThisNode) -> - %% The current node is running peer discovery, thus way before we mark the - %% DB layer as ready. Consider it ready in this case, otherwise if the - %% current node is selected, it will loop forever waiting for itself to be - %% ready. +is_node_db_ready(FromNode, FromNode) -> + %% The function is called for rhe current node running peer discovery, thus + %% way before we mark the DB layer as ready. Consider it ready in this + %% case, otherwise if the current node is selected, it will loop forever + %% waiting for itself to be ready. true; -is_node_db_ready(Node, _ThisNode) -> +is_node_db_ready(Node, FromNode) -> try - erpc:call(Node, rabbit_db, is_init_finished, []) + erpc_call(Node, rabbit_db, is_init_finished, [], FromNode) catch _:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} -> undefined end. +erpc_call(Node, Mod, Fun, Args, FromNode) -> + erpc_call(Node, Mod, Fun, Args, FromNode, 10000). + +erpc_call(Node, Mod, Fun, Args, FromNode, Timeout) when Timeout >= 0 -> + try + erpc:call(Node, Mod, Fun, Args) + catch + error:{erpc, _} = Reason:Stacktrace -> + Peer = node(), + _ = catch erpc:call( + FromNode, + logger, debug, + ["Peer discovery: temporary hidden node '~ts' " + "failed to connect to '~ts': ~0p", + [Peer, Node, Reason]]), + Sleep = 1000, + timer:sleep(Sleep), + NewTimeout = Timeout - Sleep, + case NewTimeout >= 0 of + true -> erpc_call(Node, Mod, Fun, Args, FromNode, NewTimeout); + false -> erlang:raise(error, Reason, Stacktrace) + end + end. + -spec sort_nodes_and_props(NodesAndProps) -> SortedNodesAndProps when NodesAndProps :: [node_and_props()],