Skip to content

Commit e37705b

Browse files
committed
Introduce a new policy: ha-promote-on-failure (always by default)
This new policy controls if unsynchronised slaves should be promoted after master crash. If set to `when-synced`, unsynchronised slaves will not be promoted, keeping the state of the queue, but making it unavailable until master node returns. This change is supposed to make the cluster shutdown safier, because queues can fail or be killed on shutdown. The queues without master will be available from the management UI and can be deleted and redeclared, but will not automatically loose messages. Trying to declare or passively declare the queue will result in a timeout error. Same way as if the master was gracefully stopped with ha-promote-on-shutdown: when-synced [#156811690]
1 parent 9161218 commit e37705b

File tree

4 files changed

+110
-75
lines changed

4 files changed

+110
-75
lines changed

src/rabbit_mirror_queue_coordinator.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids},
365365
DeadPids),
366366
{stop, shutdown, State};
367367
{error, not_found} ->
368-
{stop, normal, State}
368+
{stop, normal, State};
369+
{error, {not_synced, _}} ->
370+
rabbit_log:error("Mirror queue ~p in unexpected state after gm_deaths."
371+
" Promoted to master but already a master.",
372+
[QueueName]),
373+
error(unexpected_mirrored_state)
369374
end;
370375

371376
handle_cast(request_depth, State = #state { depth_fun = DepthFun,

src/rabbit_mirror_queue_master.erl

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
212212

213213
stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
214214
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
215-
PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
216-
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
217-
%% It's possible that we could be partitioned from some slaves
218-
%% between the lookup and the broadcast, in which case we could
219-
%% monitor them but they would not have received the GM
220-
%% message. So only wait for slaves which are still
221-
%% not-partitioned.
222-
PendingSlavePids =
223-
lists:foldl(
224-
fun({Pid, MRef}, Acc) ->
225-
case rabbit_mnesia:on_running_node(Pid) of
226-
true ->
227-
receive
228-
{'DOWN', MRef, process, _Pid, _Info} ->
229-
Acc
230-
after WT ->
231-
rabbit_mirror_queue_misc:log_warning(
232-
QName, "Missing 'DOWN' message from ~p in"
233-
" node ~p~n", [Pid, node(Pid)]),
234-
[Pid | Acc]
235-
end;
236-
false ->
237-
Acc
238-
end
239-
end, [], PidsMRefs),
240-
%% Normally when we remove a slave another slave or master will
241-
%% notice and update Mnesia. But we just removed them all, and
242-
%% have stopped listening ourselves. So manually clean up.
243-
rabbit_misc:execute_mnesia_transaction(
244-
fun () ->
245-
[Q] = mnesia:read({rabbit_queue, QName}),
246-
rabbit_mirror_queue_misc:store_updated_slaves(
247-
Q #amqqueue { gm_pids = [], slave_pids = [],
248-
%% Restarted slaves on running nodes can
249-
%% ensure old incarnations are stopped using
250-
%% the pending slave pids.
251-
slave_pids_pending_shutdown = PendingSlavePids})
252-
end),
253-
ok = gm:forget_group(QName).
215+
rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).
254216

255217
purge(State = #state { gm = GM,
256218
backing_queue = BQ,

src/rabbit_mirror_queue_misc.erl

Lines changed: 93 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
update_mirrors/2, update_mirrors/1, validate_policy/1,
2525
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
2626
sync_batch_size/1, log_info/3, log_warning/3]).
27+
-export([stop_all_slaves/5]).
2728

2829
-export([sync_queue/1, cancel_sync_queue/1]).
2930

@@ -47,6 +48,8 @@
4748
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
4849
{mfa, {rabbit_registry, register,
4950
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
51+
{mfa, {rabbit_registry, register,
52+
[policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
5053
{requires, rabbit_registry},
5154
{enables, recovery}]}).
5255

@@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
8588
[] -> {error, not_found};
8689
[Q = #amqqueue { pid = QPid,
8790
slave_pids = SPids,
91+
sync_slave_pids = SyncSPids,
8892
gm_pids = GMPids }] ->
8993
{DeadGM, AliveGM} = lists:partition(
9094
fun ({GM, _}) ->
@@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
104108
{QPid, SPids};
105109
_ -> promote_slave(Alive)
106110
end,
107-
Extra =
108-
case {{QPid, SPids}, {QPid1, SPids1}} of
109-
{Same, Same} ->
110-
[];
111-
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
112-
%% Either master hasn't changed, so
113-
%% we're ok to update mnesia; or we have
114-
%% become the master. If gm altered,
115-
%% we have no choice but to proceed.
116-
Q1 = Q#amqqueue{pid = QPid1,
117-
slave_pids = SPids1,
118-
gm_pids = AliveGM},
119-
store_updated_slaves(Q1),
120-
%% If we add and remove nodes at the
121-
%% same time we might tell the old
122-
%% master we need to sync and then
123-
%% shut it down. So let's check if
124-
%% the new master needs to sync.
125-
maybe_auto_sync(Q1),
126-
slaves_to_start_on_failure(Q1, DeadGMPids);
127-
_ ->
128-
%% Master has changed, and we're not it.
129-
%% [1].
130-
Q1 = Q#amqqueue{slave_pids = Alive,
131-
gm_pids = AliveGM},
132-
store_updated_slaves(Q1),
133-
[]
134-
end,
135-
{ok, QPid1, DeadPids, Extra}
111+
DoNotPromote = SyncSPids =:= [] andalso
112+
rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>,
113+
case {{QPid, SPids}, {QPid1, SPids1}} of
114+
{Same, Same} ->
115+
{ok, QPid1, DeadPids, []};
116+
_ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true ->
117+
%% We have been promoted to master
118+
%% but there are no synchronised mirrors
119+
%% hence this node is not synchronised either
120+
%% Bailing out.
121+
{error, {not_synced, SPids1}};
122+
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
123+
%% Either master hasn't changed, so
124+
%% we're ok to update mnesia; or we have
125+
%% become the master. If gm altered,
126+
%% we have no choice but to proceed.
127+
Q1 = Q#amqqueue{pid = QPid1,
128+
slave_pids = SPids1,
129+
gm_pids = AliveGM},
130+
store_updated_slaves(Q1),
131+
%% If we add and remove nodes at the
132+
%% same time we might tell the old
133+
%% master we need to sync and then
134+
%% shut it down. So let's check if
135+
%% the new master needs to sync.
136+
maybe_auto_sync(Q1),
137+
{ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)};
138+
_ ->
139+
%% Master has changed, and we're not it.
140+
%% [1].
141+
Q1 = Q#amqqueue{slave_pids = Alive,
142+
gm_pids = AliveGM},
143+
store_updated_slaves(Q1),
144+
{ok, QPid1, DeadPids, []}
145+
end
136146
end
137147
end).
138148
%% [1] We still update mnesia here in case the slave that is supposed
@@ -305,6 +315,44 @@ update_recoverable(SPids, RS) ->
305315
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
306316
(RS -- DelNodes) ++ AddNodes.
307317

318+
stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
319+
PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
320+
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
321+
%% It's possible that we could be partitioned from some slaves
322+
%% between the lookup and the broadcast, in which case we could
323+
%% monitor them but they would not have received the GM
324+
%% message. So only wait for slaves which are still
325+
%% not-partitioned.
326+
PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) ->
327+
case rabbit_mnesia:on_running_node(Pid) of
328+
true ->
329+
receive
330+
{'DOWN', MRef, process, _Pid, _Info} ->
331+
Acc
332+
after WaitTimeout ->
333+
rabbit_mirror_queue_misc:log_warning(
334+
QName, "Missing 'DOWN' message from ~p in"
335+
" node ~p~n", [Pid, node(Pid)]),
336+
[Pid | Acc]
337+
end;
338+
false ->
339+
Acc
340+
end
341+
end, [], PidsMRefs),
342+
%% Normally when we remove a slave another slave or master will
343+
%% notice and update Mnesia. But we just removed them all, and
344+
%% have stopped listening ourselves. So manually clean up.
345+
rabbit_misc:execute_mnesia_transaction(fun () ->
346+
[Q] = mnesia:read({rabbit_queue, QName}),
347+
rabbit_mirror_queue_misc:store_updated_slaves(
348+
Q #amqqueue { gm_pids = [], slave_pids = [],
349+
%% Restarted slaves on running nodes can
350+
%% ensure old incarnations are stopped using
351+
%% the pending slave pids.
352+
slave_pids_pending_shutdown = PendingSlavePids})
353+
end),
354+
ok = gm:forget_group(QName).
355+
308356
%%----------------------------------------------------------------------------
309357

310358
promote_slave([SPid | SPids]) ->
@@ -478,10 +526,12 @@ validate_policy(KeyList) ->
478526
<<"ha-sync-batch-size">>, KeyList, none),
479527
PromoteOnShutdown = proplists:get_value(
480528
<<"ha-promote-on-shutdown">>, KeyList, none),
481-
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
482-
{none, none, none, none, none} ->
529+
PromoteOnFailure = proplists:get_value(
530+
<<"ha-promote-on-failure">>, KeyList, none),
531+
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
532+
{none, none, none, none, none, none} ->
483533
ok;
484-
{none, _, _, _, _} ->
534+
{none, _, _, _, _, _} ->
485535
{error, "ha-mode must be specified to specify ha-params, "
486536
"ha-sync-mode or ha-promote-on-shutdown", []};
487537
_ ->
@@ -490,7 +540,8 @@ validate_policy(KeyList) ->
490540
{Params, ha_params_validator(Mode)},
491541
{SyncMode, fun validate_sync_mode/1},
492542
{SyncBatchSize, fun validate_sync_batch_size/1},
493-
{PromoteOnShutdown, fun validate_pos/1}])
543+
{PromoteOnShutdown, fun validate_pos/1},
544+
{PromoteOnFailure, fun validate_pof/1}])
494545
end.
495546

496547
ha_params_validator(Mode) ->
@@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) ->
532583
Mode -> {error, "ha-promote-on-shutdown must be "
533584
"\"always\" or \"when-synced\", got ~p", [Mode]}
534585
end.
586+
587+
validate_pof(PromoteOnShutdown) ->
588+
case PromoteOnShutdown of
589+
<<"always">> -> ok;
590+
<<"when-synced">> -> ok;
591+
none -> ok;
592+
Mode -> {error, "ha-promote-on-failure must be "
593+
"\"always\" or \"when-synced\", got ~p", [Mode]}
594+
end.

src/rabbit_mirror_queue_slave.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
223223
end;
224224

225225
handle_call({gm_deaths, DeadGMPids}, From,
226-
State = #state { gm = GM, q = Q = #amqqueue {
227-
name = QName, pid = MPid }}) ->
226+
State = #state{ gm = GM,
227+
q = Q = #amqqueue{ name = QName, pid = MPid },
228+
backing_queue = BQ,
229+
backing_queue_state = BQS}) ->
228230
Self = self(),
229231
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
230232
{error, not_found} ->
231233
gen_server2:reply(From, ok),
232234
{stop, normal, State};
235+
{error, {not_synced, SPids}} ->
236+
WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
237+
rabbit_mirror_queue_misc:stop_all_slaves(
238+
{error, not_synced}, SPids, QName, GM, WaitTimeout),
239+
BQ:delete_and_terminate({error, not_synced}, BQS),
240+
{stop, normal, State#state{backing_queue_state = undefined}};
233241
{ok, Pid, DeadPids, ExtraNodes} ->
234242
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
235243
DeadPids),

0 commit comments

Comments
 (0)