diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl index 1fd720e527f..c1f771cb669 100644 --- a/deps/rabbit/src/rabbit_db_cluster.erl +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -6,13 +6,113 @@ %% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% +%% TODO +%% +%% Opérations sur le cluster : +%% +%% En commun : +%% 1. On stoppe "rabbit" sur le nœud à ajouter/retirer +%% 2. On lock le registre des feature flags +%% +%% Join : +%% * reset du nœud (cf ci-dessous) +%% * copie des feature flags (+ reset registre) +%% * Mnesia : +%% * init avec checks +%% Khepri : +%% * setup +%% * préparation +%% * join +%% * reset registre feature flags (utile ?) +%% * notify_joined_cluster +%% +%% Reset/forget member, depuis un autre nœud que celui retiré : +%% * liste des membres +%% * Mnesia : +%% * online : +%% * offline : +%% Khepri : +%% * online : +%% * offline : +%% * amqqueue:forget_all() +%% * quorum queue shrink_all +%% * stream queue delete_all_replicas +%% * stream coordinator forget_node +%% * notify_left_cluster +%% * suppression des fichiers +%% * reset registre feature flags +%% * cluster status +%% * déconnexion du nœud (?) +%% +%% En commun : +%% 1. On unlock le registre des feature flags +%% 2. On redémarre "rabbit" +%% +%% À exécuter depuis quel nœud ? +%% [ ] depuis le nœud qu’ajoute/retire +%% [x] depuis le cluster qui est modifié +%% (doutes : risqué si le nœud à ajouter fait partie d’un autre cluster avant +%% son reset) +%% +%% JOIN (depuis le nœud qu’on ajoute) +%% +%% * est-ce que le nœud fait partie du cluster cible ? +%% * sinon : +%% +%% * stoppe rabbit +%% +%% * reset (cf ci-dessous) +%% +%% * lock registre feature flags +%% * can join ? +%% * copy feature flags + reset registre +%% * (Mnesia/Khepri specific) +%% * notify_joined_cluster +%% * unlock registre feature flags +%% +%% * start rabbit +%% +%% RESET +%% +%% * stoppe rabbit +%% +%% * lock registre feature flags +%% * en cluster ? oui -> forget member locked (cf ci-dessous) +%% * suppression des fichiers +%% * reset registre feature flags +%% * cluster status +%% * unlock registre feature flags +%% * déco des anciens collègues de cluster +%% +%% * start rabbit +%% +%% FORGET MEMBER +%% +%% * stoppe rabbit sur nœud distant +%% * lock registre feature flags +%% +%% * (Mnesia/Khepri specific) +%% * amqqueue:forget_all() +%% * quorum queue shrink_all +%% * stream queue delete_all_replicas +%% * stream coordinator forget_node +%% * notify_left_cluster +%% +%% * unlock registre feature flags +%% * start rabbit sur nœud distant +%% * déco de l’ancien membre +%% +%% (on ne redémarre pas rabbit ; est-ce qu’on arrête la VM ?) + -module(rabbit_db_cluster). -include_lib("kernel/include/logger.hrl"). +-include_lib("stdlib/include/assert.hrl"). -include_lib("rabbit_common/include/logging.hrl"). -export([ensure_feature_flags_are_in_sync/2, + % add_member/1, remove_member/1, join/2, forget_member/2]). -export([change_node_type/1]). @@ -47,6 +147,102 @@ ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) -> {error, Reason} -> throw({error, {incompatible_feature_flags, Reason}}) end. +% add_member(NodeToAdd, NodeType) +% when is_atom(NodeToAdd) andalso NodeToAdd =/= node() andalso +% ?IS_NODE_TYPE(NoteType) -> +% InitialState = prepare_cluster_change(NodeToAdd), +% try +% %% XXX Run the whole code below on the node to add. +% case can_join(NodeToAdd) of +% {ok, ClusterNodes} when is_list(ClusterNodes) -> +% % XXX Si le nœud est dans un autre cluster, on se retrouve à +% % connecter deux clusters ! Attention à l’utilisation de +% % global… +% rabbit_db:reset(), % XXX Include notify_left_cluster + wipe +% +% rabbit_feature_flags:copy_feature_states_after_reset( +% RemoteNode), +% +% ?LOG_INFO( +% "DB: joining cluster using remote nodes:~n~tp", [ClusterNodes], +% #{domain => ?RMQLOG_DOMAIN_DB}), +% Ret = case rabbit_khepri:is_enabled(RemoteNode) of +% true -> join_using_khepri(ClusterNodes, NodeType); +% false -> join_using_mnesia(ClusterNodes, NodeType) +% end, +% +% case Ret of +% ok -> +% ok; +% {error, _} -> +% %% We reset feature flags states again and make sure the +% %% recorded states on disk are deleted. +% rabbit_feature_flags:reset() +% end, +% +% %% Restart RabbitMQ afterwards, if it was running before the join. +% %% Likewise for the Feature flags controller and Mnesia (if we +% %% still need it). +% case RestartRabbit of +% true -> +% rabbit:start(); +% false -> +% case RestartFFCtl of +% true -> +% ok = rabbit_sup:start_child(rabbit_ff_controller); +% false -> +% ok +% end, +% NeedMnesia = not rabbit_khepri:is_enabled(), +% case RestartMnesia andalso NeedMnesia of +% true -> rabbit_mnesia:start_mnesia(false); +% false -> ok +% end +% end, +% +% case Ret of +% ok -> +% rabbit_node_monitor:notify_joined_cluster(), +% ok; +% {error, _} = Error -> +% Error +% end; +% {ok, already_member} -> +% {ok, already_member}; +% {error, {inconsistent_cluster, _Msg}} = Error -> +% case rabbit_khepri:is_enabled() of +% true -> +% Error; +% false -> +% %% rabbit_mnesia:can_join_cluster/1 notice +% %% inconsistent_cluster, as RemoteNode thinks this node is +% %% already in the cluster. Attempt to leave the RemoteNode +% %% cluster, the discovery cluster, and simply retry the +% %% operation. +% ?LOG_INFO( +% "Mnesia: node ~tp thinks it's clustered " +% "with node ~tp, but ~tp disagrees. ~tp will ask " +% "to leave the cluster and try again.", +% [RemoteNode, node(), node(), node()]), +% ok = rabbit_mnesia:leave_then_rediscover_cluster( +% RemoteNode), +% join(RemoteNode, NodeType) +% end; +% {error, _} = Error -> +% Error +% after +% finish_cluster_change(InitialState) +% end. +% +% remove_member(NodeToRemove) +% when is_atom(NodeToRemove) andalso NodeToRemove =/= node() -> +% InitialState = prepare_cluster_change(NodeToRemove), +% try +% ok +% after +% finish_cluster_change(InitialState) +% end. + -spec can_join(RemoteNode) -> Ret when RemoteNode :: node(), Ret :: Ok | Error, @@ -253,28 +449,51 @@ join_using_khepri(_ClusterNodes, ram = NodeType) -> RemoveWhenOffline :: boolean(). %% @doc Removes `Node' from the cluster. -forget_member(Node, RemoveWhenOffline) -> - case forget_member0(Node, RemoveWhenOffline) of - ok -> - rabbit_node_monitor:notify_left_cluster(Node); - Error -> - Error - end. - -forget_member0(Node, RemoveWhenOffline) -> +forget_member(Node, RemoveWhenOffline) + when is_atom(Node) andalso Node =/= node() -> case rabbit:is_running(Node) of false -> - ?LOG_DEBUG( - "DB: removing cluster member `~ts`", [Node], - #{domain => ?RMQLOG_DOMAIN_DB}), - case rabbit_khepri:is_enabled() of - true -> forget_member_using_khepri(Node, RemoveWhenOffline); - false -> forget_member_using_mnesia(Node, RemoveWhenOffline) + {ok, InitialState} = lock_cluster_changes(Node), + try + forget_member_locked(Node, RemoveWhenOffline) + after + unlock_cluster_changes(InitialState) end; true -> {error, {failed_to_remove_node, Node, rabbit_still_running}} end. +forget_member_locked(Node, RemoveWhenOffline) + when is_atom(Node) andalso Node =/= node() -> + ?LOG_ALERT( + "DB: removing cluster member `~ts`", [Node], + #{domain => ?RMQLOG_DOMAIN_DB}), + ?assertNot(rabbit:is_running(Node)), + Ret = case rabbit_khepri:is_enabled() of + true -> forget_member_using_khepri(Node, RemoveWhenOffline); + false -> forget_member_using_mnesia(Node, RemoveWhenOffline) + end, + case Ret of + ok -> + post_forget_member_locked(Node, RemoveWhenOffline); + _ -> + ok + end, + Ret. + +post_forget_member_locked(Node, false = _RemoveWhenOffline) -> + ?LOG_DEBUG( + "DB: removing node `~s` from various Ra clusters", [Node], + #{domain => ?RMQLOG_DOMAIN_DB}), + _ = rabbit_amqqueue:forget_all(Node), + _ = rabbit_quorum_queue:shrink_all(Node), + _ = rabbit_stream_queue:delete_all_replicas(Node), + _ = rabbit_stream_coordinator:forget_node(Node), + rabbit_node_monitor:notify_left_cluster(Node), + ok; +post_forget_member_locked(_Node, true = _RemoveWhenOffline) -> + ok. + forget_member_using_mnesia(Node, RemoveWhenOffline) -> rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline). @@ -287,6 +506,104 @@ forget_member_using_khepri(_Node, true) -> forget_member_using_khepri(Node, false = _RemoveWhenOffline) -> rabbit_khepri:remove_member(Node). +lock_cluster_changes(ChangingNode) -> + RabbitWasRunning = stop_rabbit_if_running(ChangingNode), + InitialState = #{changing_node => ChangingNode, + rabbit_was_running => RabbitWasRunning}, + + %% We acquire the feature flags registry reload lock because between + %% the time we reset the registry (as part of `rabbit_db:reset/0' and + %% the states copy from the remote node, there could be a concurrent + %% reload of the registry (for instance because of peer discovery on + %% another node) with the default/empty states. + %% + %% To make this work, the lock is also acquired from + %% `rabbit_ff_registry_wrapper'. + ?LOG_DEBUG( + "DB: lock feature flags registry to avoid concurrent changes to the " + "cluster from a feature flag callback", + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit_ff_registry_factory:acquire_state_change_lock(), + {ok, InitialState}. + +stop_rabbit_if_running(ThisNode) when ThisNode =:= node() -> + RabbitWasRunning = rabbit:is_running(), + case RabbitWasRunning of + true -> + ?LOG_DEBUG( + "DB: stop \"rabbit\" on this node (~ts) before making changes " + "to the cluster", + [ThisNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok = rabbit:stop(); + false -> + ?LOG_DEBUG( + "DB: \"rabbit\" already stopped on this node (~ts), ready for " + "changes to the cluster", + [ThisNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok + end, + RabbitWasRunning; +stop_rabbit_if_running(RemoteNode) when is_atom(RemoteNode) -> + try + RabbitWasRunning = erpc:call(RemoteNode, rabbit, is_running, []), + case RabbitWasRunning of + true -> + ?LOG_ALERT( + "DB: stop \"rabbit\" on node `~ts` before making changes " + "to the cluster", + [RemoteNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok = erpc:call(RemoteNode, rabbit, stop, []); + false -> + ?LOG_ALERT( + "DB: \"rabbit\" already stopped on node `~ts`, ready for " + "changes to the cluster", + [RemoteNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok + end, + RabbitWasRunning + catch + error:{erpc, noconnection} -> + ?LOG_ALERT( + "DB: node `~ts` unreachable, considering that \"rabbit\" is " + "stopped on it, ready for changes to the cluster", + [RemoteNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + false + end. + +unlock_cluster_changes( + #{changing_node := ChangingNode, + rabbit_was_running := RabbitWasRunning}) -> + rabbit_ff_registry_factory:release_state_change_lock(), + start_rabbit_if_was_running(ChangingNode, RabbitWasRunning), + ok. + +start_rabbit_if_was_running(ChangingNode, false = _RabbitWasRunning) -> + ?LOG_ALERT( + "DB: leaving \"rabbit\" stopped on node `~ts` after changes to the " + "cluster", + [ChangingNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok; +start_rabbit_if_was_running(ThisNode, true = _RabbitWasRunning) + when ThisNode =:= node() -> + ?LOG_ALERT( + "DB: restart \"rabbit\" on this node (~ts) after changes to the " + "cluster", + [ThisNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + rabbit:start(); +start_rabbit_if_was_running(RemoteNode, true = _RabbitWasRunning) -> + ?LOG_ALERT( + "DB: restart \"rabbit\" on node `~ts` after changes to the cluster", + [RemoteNode], + #{domain => ?RMQLOG_DOMAIN_DB}), + erpc:call(RemoteNode, rabbit, start, []). + %% ------------------------------------------------------------------- %% Cluster update. %% -------------------------------------------------------------------