2424 update_mirrors /2 , update_mirrors /1 , validate_policy /1 ,
2525 maybe_auto_sync /1 , maybe_drop_master_after_sync /1 ,
2626 sync_batch_size /1 , log_info /3 , log_warning /3 ]).
27+ -export ([stop_all_slaves /5 ]).
2728
2829-export ([sync_queue /1 , cancel_sync_queue /1 ]).
2930
4748 [policy_validator , <<" ha-sync-batch-size" >>, ? MODULE ]}},
4849 {mfa , {rabbit_registry , register ,
4950 [policy_validator , <<" ha-promote-on-shutdown" >>, ? MODULE ]}},
51+ {mfa , {rabbit_registry , register ,
52+ [policy_validator , <<" ha-promote-on-failure" >>, ? MODULE ]}},
5053 {requires , rabbit_registry },
5154 {enables , recovery }]}).
5255
@@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
8588 [] -> {error , not_found };
8689 [Q = # amqqueue { pid = QPid ,
8790 slave_pids = SPids ,
91+ sync_slave_pids = SyncSPids ,
8892 gm_pids = GMPids }] ->
8993 {DeadGM , AliveGM } = lists :partition (
9094 fun ({GM , _ }) ->
@@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
104108 {QPid , SPids };
105109 _ -> promote_slave (Alive )
106110 end ,
107- Extra =
108- case {{QPid , SPids }, {QPid1 , SPids1 }} of
109- {Same , Same } ->
110- [];
111- _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
112- % % Either master hasn't changed, so
113- % % we're ok to update mnesia; or we have
114- % % become the master. If gm altered,
115- % % we have no choice but to proceed.
116- Q1 = Q # amqqueue {pid = QPid1 ,
117- slave_pids = SPids1 ,
118- gm_pids = AliveGM },
119- store_updated_slaves (Q1 ),
120- % % If we add and remove nodes at the
121- % % same time we might tell the old
122- % % master we need to sync and then
123- % % shut it down. So let's check if
124- % % the new master needs to sync.
125- maybe_auto_sync (Q1 ),
126- slaves_to_start_on_failure (Q1 , DeadGMPids );
127- _ ->
128- % % Master has changed, and we're not it.
129- % % [1].
130- Q1 = Q # amqqueue {slave_pids = Alive ,
131- gm_pids = AliveGM },
132- store_updated_slaves (Q1 ),
133- []
134- end ,
135- {ok , QPid1 , DeadPids , Extra }
111+ DoNotPromote = SyncSPids =:= [] andalso
112+ rabbit_policy :get (<<" ha-promote-on-failure" >>, Q ) =:= <<" when-synced" >>,
113+ case {{QPid , SPids }, {QPid1 , SPids1 }} of
114+ {Same , Same } ->
115+ {ok , QPid1 , DeadPids , []};
116+ _ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true ->
117+ % % We have been promoted to master
118+ % % but there are no synchronised mirrors
119+ % % hence this node is not synchronised either
120+ % % Bailing out.
121+ {error , {not_synced , SPids1 }};
122+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
123+ % % Either master hasn't changed, so
124+ % % we're ok to update mnesia; or we have
125+ % % become the master. If gm altered,
126+ % % we have no choice but to proceed.
127+ Q1 = Q # amqqueue {pid = QPid1 ,
128+ slave_pids = SPids1 ,
129+ gm_pids = AliveGM },
130+ store_updated_slaves (Q1 ),
131+ % % If we add and remove nodes at the
132+ % % same time we might tell the old
133+ % % master we need to sync and then
134+ % % shut it down. So let's check if
135+ % % the new master needs to sync.
136+ maybe_auto_sync (Q1 ),
137+ {ok , QPid1 , DeadPids , slaves_to_start_on_failure (Q1 , DeadGMPids )};
138+ _ ->
139+ % % Master has changed, and we're not it.
140+ % % [1].
141+ Q1 = Q # amqqueue {slave_pids = Alive ,
142+ gm_pids = AliveGM },
143+ store_updated_slaves (Q1 ),
144+ {ok , QPid1 , DeadPids , []}
145+ end
136146 end
137147 end ).
138148% % [1] We still update mnesia here in case the slave that is supposed
@@ -305,6 +315,44 @@ update_recoverable(SPids, RS) ->
305315 DelNodes = RunningNodes -- SNodes , % % i.e. running with no slave
306316 (RS -- DelNodes ) ++ AddNodes .
307317
318+ stop_all_slaves (Reason , SPids , QName , GM , WaitTimeout ) ->
319+ PidsMRefs = [{Pid , erlang :monitor (process , Pid )} || Pid <- [GM | SPids ]],
320+ ok = gm :broadcast (GM , {delete_and_terminate , Reason }),
321+ % % It's possible that we could be partitioned from some slaves
322+ % % between the lookup and the broadcast, in which case we could
323+ % % monitor them but they would not have received the GM
324+ % % message. So only wait for slaves which are still
325+ % % not-partitioned.
326+ PendingSlavePids = lists :foldl (fun ({Pid , MRef }, Acc ) ->
327+ case rabbit_mnesia :on_running_node (Pid ) of
328+ true ->
329+ receive
330+ {'DOWN' , MRef , process , _Pid , _Info } ->
331+ Acc
332+ after WaitTimeout ->
333+ rabbit_mirror_queue_misc :log_warning (
334+ QName , " Missing 'DOWN' message from ~p in"
335+ " node ~p~n " , [Pid , node (Pid )]),
336+ [Pid | Acc ]
337+ end ;
338+ false ->
339+ Acc
340+ end
341+ end , [], PidsMRefs ),
342+ % % Normally when we remove a slave another slave or master will
343+ % % notice and update Mnesia. But we just removed them all, and
344+ % % have stopped listening ourselves. So manually clean up.
345+ rabbit_misc :execute_mnesia_transaction (fun () ->
346+ [Q ] = mnesia :read ({rabbit_queue , QName }),
347+ rabbit_mirror_queue_misc :store_updated_slaves (
348+ Q # amqqueue { gm_pids = [], slave_pids = [],
349+ % % Restarted slaves on running nodes can
350+ % % ensure old incarnations are stopped using
351+ % % the pending slave pids.
352+ slave_pids_pending_shutdown = PendingSlavePids })
353+ end ),
354+ ok = gm :forget_group (QName ).
355+
308356% %----------------------------------------------------------------------------
309357
310358promote_slave ([SPid | SPids ]) ->
@@ -478,10 +526,12 @@ validate_policy(KeyList) ->
478526 <<" ha-sync-batch-size" >>, KeyList , none ),
479527 PromoteOnShutdown = proplists :get_value (
480528 <<" ha-promote-on-shutdown" >>, KeyList , none ),
481- case {Mode , Params , SyncMode , SyncBatchSize , PromoteOnShutdown } of
482- {none , none , none , none , none } ->
529+ PromoteOnFailure = proplists :get_value (
530+ <<" ha-promote-on-failure" >>, KeyList , none ),
531+ case {Mode , Params , SyncMode , SyncBatchSize , PromoteOnShutdown , PromoteOnFailure } of
532+ {none , none , none , none , none , none } ->
483533 ok ;
484- {none , _ , _ , _ , _ } ->
534+ {none , _ , _ , _ , _ , _ } ->
485535 {error , " ha-mode must be specified to specify ha-params, "
486536 " ha-sync-mode or ha-promote-on-shutdown" , []};
487537 _ ->
@@ -490,7 +540,8 @@ validate_policy(KeyList) ->
490540 {Params , ha_params_validator (Mode )},
491541 {SyncMode , fun validate_sync_mode /1 },
492542 {SyncBatchSize , fun validate_sync_batch_size /1 },
493- {PromoteOnShutdown , fun validate_pos /1 }])
543+ {PromoteOnShutdown , fun validate_pos /1 },
544+ {PromoteOnFailure , fun validate_pof /1 }])
494545 end .
495546
496547ha_params_validator (Mode ) ->
@@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) ->
532583 Mode -> {error , " ha-promote-on-shutdown must be "
533584 " \" always\" or \" when-synced\" , got ~p " , [Mode ]}
534585 end .
586+
587+ validate_pof (PromoteOnShutdown ) ->
588+ case PromoteOnShutdown of
589+ <<" always" >> -> ok ;
590+ <<" when-synced" >> -> ok ;
591+ none -> ok ;
592+ Mode -> {error , " ha-promote-on-failure must be "
593+ " \" always\" or \" when-synced\" , got ~p " , [Mode ]}
594+ end .
0 commit comments