Skip to content

Commit e9cbf3b

Browse files
committed
WIP
1 parent 74a68ff commit e9cbf3b

File tree

1 file changed

+279
-17
lines changed

1 file changed

+279
-17
lines changed

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 279 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,104 @@
66
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
77
%%
88

9+
%% TODO
10+
%%
11+
%% Opérations sur le cluster :
12+
%%
13+
%% En commun :
14+
%% 1. On stoppe "rabbit" sur le nœud à ajouter/retirer
15+
%% 2. On lock le registre des feature flags
16+
%%
17+
%% Join :
18+
%% * reset du nœud (cf ci-dessous)
19+
%% * copie des feature flags (+ reset registre)
20+
%% * Mnesia :
21+
%% * init avec checks
22+
%% Khepri :
23+
%% * setup
24+
%% * préparation
25+
%% * join
26+
%% * reset registre feature flags (utile ?)
27+
%% * notify_joined_cluster
28+
%%
29+
%% Reset/forget member, depuis un autre nœud que celui retiré :
30+
%% * liste des membres
31+
%% * Mnesia :
32+
%% * online :
33+
%% * offline :
34+
%% Khepri :
35+
%% * online :
36+
%% * offline :
37+
%% * amqqueue:forget_all()
38+
%% * quorum queue shrink_all
39+
%% * stream queue delete_all_replicas
40+
%% * stream coordinator forget_node
41+
%% * notify_left_cluster
42+
%% * suppression des fichiers
43+
%% * reset registre feature flags
44+
%% * cluster status
45+
%% * déconnexion du nœud (?)
46+
%%
47+
%% En commun :
48+
%% 1. On unlock le registre des feature flags
49+
%% 2. On redémarre "rabbit"
50+
%%
51+
%% À exécuter depuis quel nœud ?
52+
%% [ ] depuis le nœud qu’ajoute/retire
53+
%% [x] depuis le cluster qui est modifié
54+
%% (doutes : risqué si le nœud à ajouter fait partie d’un autre cluster avant
55+
%% son reset)
56+
%%
57+
%% JOIN (depuis le nœud qu’on ajoute)
58+
%%
59+
%% * est-ce que le nœud fait partie du cluster cible ?
60+
%% * sinon :
61+
%%
62+
%% * stoppe rabbit
63+
%%
64+
%% * reset (cf ci-dessous)
65+
%%
66+
%% * lock registre feature flags
67+
%% * can join ?
68+
%% * copy feature flags + reset registre
69+
%% * (Mnesia/Khepri specific)
70+
%% * notify_joined_cluster
71+
%% * unlock registre feature flags
72+
%%
73+
%% * start rabbit
74+
%%
75+
%% RESET
76+
%%
77+
%% * stoppe rabbit
78+
%%
79+
%% * lock registre feature flags
80+
%% * en cluster ? oui -> forget member locked (cf ci-dessous)
81+
%% * suppression des fichiers
82+
%% * reset registre feature flags
83+
%% * cluster status
84+
%% * unlock registre feature flags
85+
%% * déco des anciens collègues de cluster
86+
%%
87+
%% * start rabbit
88+
%%
89+
%% FORGET MEMBER
90+
%%
91+
%% * stoppe rabbit sur nœud distant
92+
%% * lock registre feature flags
93+
%%
94+
%% * (Mnesia/Khepri specific)
95+
%% * amqqueue:forget_all()
96+
%% * quorum queue shrink_all
97+
%% * stream queue delete_all_replicas
98+
%% * stream coordinator forget_node
99+
%% * notify_left_cluster
100+
%%
101+
%% * unlock registre feature flags
102+
%% * start rabbit sur nœud distant
103+
%% * déco de l’ancien membre
104+
%%
105+
%% (on ne redémarre pas rabbit ; est-ce qu’on arrête la VM ?)
106+
9107
-module(rabbit_db_cluster).
10108

11109
-include_lib("kernel/include/logger.hrl").
@@ -14,6 +112,7 @@
14112
-include_lib("rabbit_common/include/logging.hrl").
15113

16114
-export([ensure_feature_flags_are_in_sync/2,
115+
% add_member/1, remove_member/1,
17116
join/2,
18117
forget_member/2]).
19118
-export([change_node_type/1]).
@@ -48,6 +147,102 @@ ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) ->
48147
{error, Reason} -> throw({error, {incompatible_feature_flags, Reason}})
49148
end.
50149

150+
% add_member(NodeToAdd, NodeType)
151+
% when is_atom(NodeToAdd) andalso NodeToAdd =/= node() andalso
152+
% ?IS_NODE_TYPE(NoteType) ->
153+
% InitialState = prepare_cluster_change(NodeToAdd),
154+
% try
155+
% %% XXX Run the whole code below on the node to add.
156+
% case can_join(NodeToAdd) of
157+
% {ok, ClusterNodes} when is_list(ClusterNodes) ->
158+
% % XXX Si le nœud est dans un autre cluster, on se retrouve à
159+
% % connecter deux clusters ! Attention à l’utilisation de
160+
% % global…
161+
% rabbit_db:reset(), % XXX Include notify_left_cluster + wipe
162+
%
163+
% rabbit_feature_flags:copy_feature_states_after_reset(
164+
% RemoteNode),
165+
%
166+
% ?LOG_INFO(
167+
% "DB: joining cluster using remote nodes:~n~tp", [ClusterNodes],
168+
% #{domain => ?RMQLOG_DOMAIN_DB}),
169+
% Ret = case rabbit_khepri:is_enabled(RemoteNode) of
170+
% true -> join_using_khepri(ClusterNodes, NodeType);
171+
% false -> join_using_mnesia(ClusterNodes, NodeType)
172+
% end,
173+
%
174+
% case Ret of
175+
% ok ->
176+
% ok;
177+
% {error, _} ->
178+
% %% We reset feature flags states again and make sure the
179+
% %% recorded states on disk are deleted.
180+
% rabbit_feature_flags:reset()
181+
% end,
182+
%
183+
% %% Restart RabbitMQ afterwards, if it was running before the join.
184+
% %% Likewise for the Feature flags controller and Mnesia (if we
185+
% %% still need it).
186+
% case RestartRabbit of
187+
% true ->
188+
% rabbit:start();
189+
% false ->
190+
% case RestartFFCtl of
191+
% true ->
192+
% ok = rabbit_sup:start_child(rabbit_ff_controller);
193+
% false ->
194+
% ok
195+
% end,
196+
% NeedMnesia = not rabbit_khepri:is_enabled(),
197+
% case RestartMnesia andalso NeedMnesia of
198+
% true -> rabbit_mnesia:start_mnesia(false);
199+
% false -> ok
200+
% end
201+
% end,
202+
%
203+
% case Ret of
204+
% ok ->
205+
% rabbit_node_monitor:notify_joined_cluster(),
206+
% ok;
207+
% {error, _} = Error ->
208+
% Error
209+
% end;
210+
% {ok, already_member} ->
211+
% {ok, already_member};
212+
% {error, {inconsistent_cluster, _Msg}} = Error ->
213+
% case rabbit_khepri:is_enabled() of
214+
% true ->
215+
% Error;
216+
% false ->
217+
% %% rabbit_mnesia:can_join_cluster/1 notice
218+
% %% inconsistent_cluster, as RemoteNode thinks this node is
219+
% %% already in the cluster. Attempt to leave the RemoteNode
220+
% %% cluster, the discovery cluster, and simply retry the
221+
% %% operation.
222+
% ?LOG_INFO(
223+
% "Mnesia: node ~tp thinks it's clustered "
224+
% "with node ~tp, but ~tp disagrees. ~tp will ask "
225+
% "to leave the cluster and try again.",
226+
% [RemoteNode, node(), node(), node()]),
227+
% ok = rabbit_mnesia:leave_then_rediscover_cluster(
228+
% RemoteNode),
229+
% join(RemoteNode, NodeType)
230+
% end;
231+
% {error, _} = Error ->
232+
% Error
233+
% after
234+
% finish_cluster_change(InitialState)
235+
% end.
236+
%
237+
% remove_member(NodeToRemove)
238+
% when is_atom(NodeToRemove) andalso NodeToRemove =/= node() ->
239+
% InitialState = prepare_cluster_change(NodeToRemove),
240+
% try
241+
% ok
242+
% after
243+
% finish_cluster_change(InitialState)
244+
% end.
245+
51246
-spec can_join(RemoteNode) -> Ret when
52247
RemoteNode :: node(),
53248
Ret :: Ok | Error,
@@ -256,15 +451,21 @@ join_using_khepri(_ClusterNodes, ram = NodeType) ->
256451

257452
forget_member(Node, RemoveWhenOffline)
258453
when is_atom(Node) andalso Node =/= node() ->
259-
{ok, InitialState} = lock_cluster_changes(Node),
260-
try
261-
forget_member_locked(Node, RemoveWhenOffline)
262-
after
263-
unlock_cluster_changes(InitialState)
454+
case rabbit:is_running(Node) of
455+
false ->
456+
{ok, InitialState} = lock_cluster_changes(Node),
457+
try
458+
forget_member_locked(Node, RemoveWhenOffline)
459+
after
460+
unlock_cluster_changes(InitialState)
461+
end;
462+
true ->
463+
{error, {failed_to_remove_node, Node, rabbit_still_running}}
264464
end.
265465

266-
forget_member_locked(Node, RemoveWhenOffline) ->
267-
?LOG_DEBUG(
466+
forget_member_locked(Node, RemoveWhenOffline)
467+
when is_atom(Node) andalso Node =/= node() ->
468+
?LOG_ALERT(
268469
"DB: removing cluster member `~ts`", [Node],
269470
#{domain => ?RMQLOG_DOMAIN_DB}),
270471
?assertNot(rabbit:is_running(Node)),
@@ -273,11 +474,20 @@ forget_member_locked(Node, RemoveWhenOffline) ->
273474
false -> forget_member_using_mnesia(Node, RemoveWhenOffline)
274475
end,
275476

477+
post_forget_member_locked(Node, RemoveWhenOffline).
478+
479+
post_forget_member_locked(Node, false = _RemoveWhenOffline) ->
480+
?LOG_DEBUG(
481+
"DB: removing node `~s` from various Ra clusters", [Node],
482+
#{domain => ?RMQLOG_DOMAIN_DB}),
276483
rabbit_amqqueue:forget_all(Node),
277484
rabbit_quorum_queue:shrink_all(Node),
278485
rabbit_stream_queue:delete_all_replicas(Node),
279486
rabbit_stream_coordinator:forget_node(Node),
280-
rabbit_node_monitor:notify_left_cluster(Node).
487+
rabbit_node_monitor:notify_left_cluster(Node),
488+
ok;
489+
post_forget_member_locked(_Node, true = _RemoveWhenOffline) ->
490+
ok.
281491

282492
forget_member_using_mnesia(Node, RemoveWhenOffline) ->
283493
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).
@@ -304,23 +514,61 @@ lock_cluster_changes(ChangingNode) ->
304514
%%
305515
%% To make this work, the lock is also acquired from
306516
%% `rabbit_ff_registry_wrapper'.
517+
?LOG_DEBUG(
518+
"DB: lock feature flags registry to avoid concurrent changes to the "
519+
"cluster from a feature flag callback",
520+
#{domain => ?RMQLOG_DOMAIN_DB}),
307521
rabbit_ff_registry_factory:acquire_state_change_lock(),
308522
{ok, InitialState}.
309523

310524
stop_rabbit_if_running(ThisNode) when ThisNode =:= node() ->
311525
RabbitWasRunning = rabbit:is_running(),
312526
case RabbitWasRunning of
313-
true -> ok = rabbit:stop();
314-
false -> ok
527+
true ->
528+
?LOG_DEBUG(
529+
"DB: stop \"rabbit\" on this node (~ts) before making changes "
530+
"to the cluster",
531+
[ThisNode],
532+
#{domain => ?RMQLOG_DOMAIN_DB}),
533+
ok = rabbit:stop();
534+
false ->
535+
?LOG_DEBUG(
536+
"DB: \"rabbit\" already stopped on this node (~ts), ready for "
537+
"changes to the cluster",
538+
[ThisNode],
539+
#{domain => ?RMQLOG_DOMAIN_DB}),
540+
ok
315541
end,
316542
RabbitWasRunning;
317543
stop_rabbit_if_running(RemoteNode) when is_atom(RemoteNode) ->
318-
RabbitWasRunning = erpc:call(RemoteNode, rabbit, is_running, []),
319-
case RabbitWasRunning of
320-
true -> ok = erpc:call(RemoteNode, rabbit, stop, []);
321-
false -> ok
322-
end,
323-
RabbitWasRunning.
544+
try
545+
RabbitWasRunning = erpc:call(RemoteNode, rabbit, is_running, []),
546+
case RabbitWasRunning of
547+
true ->
548+
?LOG_ALERT(
549+
"DB: stop \"rabbit\" on node `~ts` before making changes "
550+
"to the cluster",
551+
[RemoteNode],
552+
#{domain => ?RMQLOG_DOMAIN_DB}),
553+
ok = erpc:call(RemoteNode, rabbit, stop, []);
554+
false ->
555+
?LOG_ALERT(
556+
"DB: \"rabbit\" already stopped on node `~ts`, ready for "
557+
"changes to the cluster",
558+
[RemoteNode],
559+
#{domain => ?RMQLOG_DOMAIN_DB}),
560+
ok
561+
end,
562+
RabbitWasRunning
563+
catch
564+
error:{erpc, noconnection} ->
565+
?LOG_ALERT(
566+
"DB: node `~ts` unreachable, considering that \"rabbit\" is "
567+
"stopped on it, ready for changes to the cluster",
568+
[RemoteNode],
569+
#{domain => ?RMQLOG_DOMAIN_DB}),
570+
false
571+
end.
324572

325573
unlock_cluster_changes(
326574
#{changing_node := ChangingNode,
@@ -329,12 +577,26 @@ unlock_cluster_changes(
329577
start_rabbit_if_was_running(ChangingNode, RabbitWasRunning),
330578
ok.
331579

332-
start_rabbit_if_was_running(_ChangingNode, false = _RabbitWasRunning) ->
580+
start_rabbit_if_was_running(ChangingNode, false = _RabbitWasRunning) ->
581+
?LOG_ALERT(
582+
"DB: leaving \"rabbit\" stopped on node `~ts` after changes to the "
583+
"cluster",
584+
[ChangingNode],
585+
#{domain => ?RMQLOG_DOMAIN_DB}),
333586
ok;
334587
start_rabbit_if_was_running(ThisNode, true = _RabbitWasRunning)
335588
when ThisNode =:= node() ->
589+
?LOG_ALERT(
590+
"DB: restart \"rabbit\" on this node (~ts) after changes to the "
591+
"cluster",
592+
[ThisNode],
593+
#{domain => ?RMQLOG_DOMAIN_DB}),
336594
rabbit:start();
337595
start_rabbit_if_was_running(RemoteNode, true = _RabbitWasRunning) ->
596+
?LOG_ALERT(
597+
"DB: restart \"rabbit\" on node `~ts` after changes to the cluster",
598+
[RemoteNode],
599+
#{domain => ?RMQLOG_DOMAIN_DB}),
338600
erpc:call(RemoteNode, rabbit, start, []).
339601

340602
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)