66% % refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
77% %
88
9+ % % TODO
10+ % %
11+ % % Opérations sur le cluster :
12+ % %
13+ % % En commun :
14+ % % 1. On stoppe "rabbit" sur le nœud à ajouter/retirer
15+ % % 2. On lock le registre des feature flags
16+ % %
17+ % % Join :
18+ % % * reset du nœud (cf ci-dessous)
19+ % % * copie des feature flags (+ reset registre)
20+ % % * Mnesia :
21+ % % * init avec checks
22+ % % Khepri :
23+ % % * setup
24+ % % * préparation
25+ % % * join
26+ % % * reset registre feature flags (utile ?)
27+ % % * notify_joined_cluster
28+ % %
29+ % % Reset/forget member, depuis un autre nœud que celui retiré :
30+ % % * liste des membres
31+ % % * Mnesia :
32+ % % * online :
33+ % % * offline :
34+ % % Khepri :
35+ % % * online :
36+ % % * offline :
37+ % % * amqqueue:forget_all()
38+ % % * quorum queue shrink_all
39+ % % * stream queue delete_all_replicas
40+ % % * stream coordinator forget_node
41+ % % * notify_left_cluster
42+ % % * suppression des fichiers
43+ % % * reset registre feature flags
44+ % % * cluster status
45+ % % * déconnexion du nœud (?)
46+ % %
47+ % % En commun :
48+ % % 1. On unlock le registre des feature flags
49+ % % 2. On redémarre "rabbit"
50+ % %
51+ % % À exécuter depuis quel nœud ?
52+ % % [ ] depuis le nœud qu’ajoute/retire
53+ % % [x] depuis le cluster qui est modifié
54+ % % (doutes : risqué si le nœud à ajouter fait partie d’un autre cluster avant
55+ % % son reset)
56+ % %
57+ % % JOIN (depuis le nœud qu’on ajoute)
58+ % %
59+ % % * est-ce que le nœud fait partie du cluster cible ?
60+ % % * sinon :
61+ % %
62+ % % * stoppe rabbit
63+ % %
64+ % % * reset (cf ci-dessous)
65+ % %
66+ % % * lock registre feature flags
67+ % % * can join ?
68+ % % * copy feature flags + reset registre
69+ % % * (Mnesia/Khepri specific)
70+ % % * notify_joined_cluster
71+ % % * unlock registre feature flags
72+ % %
73+ % % * start rabbit
74+ % %
75+ % % RESET
76+ % %
77+ % % * stoppe rabbit
78+ % %
79+ % % * lock registre feature flags
80+ % % * en cluster ? oui -> forget member locked (cf ci-dessous)
81+ % % * suppression des fichiers
82+ % % * reset registre feature flags
83+ % % * cluster status
84+ % % * unlock registre feature flags
85+ % % * déco des anciens collègues de cluster
86+ % %
87+ % % * start rabbit
88+ % %
89+ % % FORGET MEMBER
90+ % %
91+ % % * stoppe rabbit sur nœud distant
92+ % % * lock registre feature flags
93+ % %
94+ % % * (Mnesia/Khepri specific)
95+ % % * amqqueue:forget_all()
96+ % % * quorum queue shrink_all
97+ % % * stream queue delete_all_replicas
98+ % % * stream coordinator forget_node
99+ % % * notify_left_cluster
100+ % %
101+ % % * unlock registre feature flags
102+ % % * start rabbit sur nœud distant
103+ % % * déco de l’ancien membre
104+ % %
105+ % % (on ne redémarre pas rabbit ; est-ce qu’on arrête la VM ?)
106+
9107-module (rabbit_db_cluster ).
10108
11109-include_lib (" kernel/include/logger.hrl" ).
110+ -include_lib (" stdlib/include/assert.hrl" ).
12111
13112-include_lib (" rabbit_common/include/logging.hrl" ).
14113
@@ -253,28 +352,48 @@ join_using_khepri(_ClusterNodes, ram = NodeType) ->
253352 RemoveWhenOffline :: boolean ().
254353% % @doc Removes `Node' from the cluster.
255354
256- forget_member (Node , RemoveWhenOffline ) ->
257- case forget_member0 (Node , RemoveWhenOffline ) of
258- ok ->
259- rabbit_node_monitor :notify_left_cluster (Node );
260- Error ->
261- Error
262- end .
263-
264- forget_member0 (Node , RemoveWhenOffline ) ->
355+ forget_member (Node , RemoveWhenOffline ) when is_atom (Node ) ->
265356 case rabbit :is_running (Node ) of
266357 false ->
267- ? LOG_DEBUG (
268- " DB: removing cluster member `~ts `" , [Node ],
269- #{domain => ? RMQLOG_DOMAIN_DB }),
270- case rabbit_khepri :is_enabled () of
271- true -> forget_member_using_khepri (Node , RemoveWhenOffline );
272- false -> forget_member_using_mnesia (Node , RemoveWhenOffline )
358+ {ok , InitialState } = lock_cluster_changes (Node ),
359+ try
360+ forget_member_locked (Node , RemoveWhenOffline )
361+ after
362+ unlock_cluster_changes (InitialState )
273363 end ;
274364 true ->
275365 {error , {failed_to_remove_node , Node , rabbit_still_running }}
276366 end .
277367
368+ forget_member_locked (Node , RemoveWhenOffline )
369+ when is_atom (Node ) andalso Node =/= node () ->
370+ ? LOG_DEBUG (
371+ " DB: removing cluster member `~ts `" , [Node ],
372+ #{domain => ? RMQLOG_DOMAIN_DB }),
373+ ? assertNot (rabbit :is_running (Node )),
374+ Ret = case rabbit_khepri :is_enabled () of
375+ true -> forget_member_using_khepri (Node , RemoveWhenOffline );
376+ false -> forget_member_using_mnesia (Node , RemoveWhenOffline )
377+ end ,
378+ case Ret of
379+ ok -> post_forget_member_locked (Node , RemoveWhenOffline );
380+ _ -> ok
381+ end ,
382+ Ret .
383+
384+ post_forget_member_locked (Node , false = _RemoveWhenOffline ) ->
385+ ? LOG_DEBUG (
386+ " DB: removing node `~s ` from various Ra clusters" , [Node ],
387+ #{domain => ? RMQLOG_DOMAIN_DB }),
388+ _ = rabbit_amqqueue :forget_all (Node ),
389+ _ = rabbit_quorum_queue :shrink_all (Node ),
390+ _ = rabbit_stream_queue :delete_all_replicas (Node ),
391+ _ = rabbit_stream_coordinator :forget_node (Node ),
392+ rabbit_node_monitor :notify_left_cluster (Node ),
393+ ok ;
394+ post_forget_member_locked (_Node , true = _RemoveWhenOffline ) ->
395+ ok .
396+
278397forget_member_using_mnesia (Node , RemoveWhenOffline ) ->
279398 rabbit_mnesia :forget_cluster_node (Node , RemoveWhenOffline ).
280399
@@ -287,6 +406,104 @@ forget_member_using_khepri(_Node, true) ->
287406forget_member_using_khepri (Node , false = _RemoveWhenOffline ) ->
288407 rabbit_khepri :remove_member (Node ).
289408
409+ lock_cluster_changes (ChangingNode ) ->
410+ RabbitWasRunning = stop_rabbit_if_running (ChangingNode ),
411+ InitialState = #{changing_node => ChangingNode ,
412+ rabbit_was_running => RabbitWasRunning },
413+
414+ % % We acquire the feature flags registry reload lock because between
415+ % % the time we reset the registry (as part of `rabbit_db:reset/0' and
416+ % % the states copy from the remote node, there could be a concurrent
417+ % % reload of the registry (for instance because of peer discovery on
418+ % % another node) with the default/empty states.
419+ % %
420+ % % To make this work, the lock is also acquired from
421+ % % `rabbit_ff_registry_wrapper'.
422+ ? LOG_DEBUG (
423+ " DB: lock feature flags registry to avoid concurrent changes to the "
424+ " cluster from a feature flag callback" ,
425+ #{domain => ? RMQLOG_DOMAIN_DB }),
426+ rabbit_ff_registry_factory :acquire_state_change_lock (),
427+ {ok , InitialState }.
428+
429+ stop_rabbit_if_running (ThisNode ) when ThisNode =:= node () ->
430+ RabbitWasRunning = rabbit :is_running (),
431+ case RabbitWasRunning of
432+ true ->
433+ ? LOG_DEBUG (
434+ " DB: stop \" rabbit\" on this node (~ts ) before making changes "
435+ " to the cluster" ,
436+ [ThisNode ],
437+ #{domain => ? RMQLOG_DOMAIN_DB }),
438+ ok = rabbit :stop ();
439+ false ->
440+ ? LOG_DEBUG (
441+ " DB: \" rabbit\" already stopped on this node (~ts ), ready for "
442+ " changes to the cluster" ,
443+ [ThisNode ],
444+ #{domain => ? RMQLOG_DOMAIN_DB }),
445+ ok
446+ end ,
447+ RabbitWasRunning ;
448+ stop_rabbit_if_running (RemoteNode ) when is_atom (RemoteNode ) ->
449+ try
450+ RabbitWasRunning = erpc :call (RemoteNode , rabbit , is_running , []),
451+ case RabbitWasRunning of
452+ true ->
453+ ? LOG_DEBUG (
454+ " DB: stop \" rabbit\" on node `~ts ` before making changes "
455+ " to the cluster" ,
456+ [RemoteNode ],
457+ #{domain => ? RMQLOG_DOMAIN_DB }),
458+ ok = erpc :call (RemoteNode , rabbit , stop , []);
459+ false ->
460+ ? LOG_DEBUG (
461+ " DB: \" rabbit\" already stopped on node `~ts `, ready for "
462+ " changes to the cluster" ,
463+ [RemoteNode ],
464+ #{domain => ? RMQLOG_DOMAIN_DB }),
465+ ok
466+ end ,
467+ RabbitWasRunning
468+ catch
469+ error :{erpc , noconnection } ->
470+ ? LOG_DEBUG (
471+ " DB: node `~ts ` unreachable, considering that \" rabbit\" is "
472+ " stopped on it, ready for changes to the cluster" ,
473+ [RemoteNode ],
474+ #{domain => ? RMQLOG_DOMAIN_DB }),
475+ false
476+ end .
477+
478+ unlock_cluster_changes (
479+ #{changing_node := ChangingNode ,
480+ rabbit_was_running := RabbitWasRunning }) ->
481+ rabbit_ff_registry_factory :release_state_change_lock (),
482+ start_rabbit_if_was_running (ChangingNode , RabbitWasRunning ),
483+ ok .
484+
485+ start_rabbit_if_was_running (ChangingNode , false = _RabbitWasRunning ) ->
486+ ? LOG_DEBUG (
487+ " DB: leaving \" rabbit\" stopped on node `~ts ` after changes to the "
488+ " cluster" ,
489+ [ChangingNode ],
490+ #{domain => ? RMQLOG_DOMAIN_DB }),
491+ ok ;
492+ start_rabbit_if_was_running (ThisNode , true = _RabbitWasRunning )
493+ when ThisNode =:= node () ->
494+ ? LOG_DEBUG (
495+ " DB: restart \" rabbit\" on this node (~ts ) after changes to the "
496+ " cluster" ,
497+ [ThisNode ],
498+ #{domain => ? RMQLOG_DOMAIN_DB }),
499+ rabbit :start ();
500+ start_rabbit_if_was_running (RemoteNode , true = _RabbitWasRunning ) ->
501+ ? LOG_DEBUG (
502+ " DB: restart \" rabbit\" on node `~ts ` after changes to the cluster" ,
503+ [RemoteNode ],
504+ #{domain => ? RMQLOG_DOMAIN_DB }),
505+ erpc :call (RemoteNode , rabbit , start , []).
506+
290507% % -------------------------------------------------------------------
291508% % Cluster update.
292509% % -------------------------------------------------------------------
0 commit comments