Skip to content

Commit bdb1c1c

Browse files
Ayanda-Dmergify[bot]
authored andcommitted
Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster
wide consistency, ensuring only the leader is active/running (cherry picked from commit b675ce2) (cherry picked from commit d9de6d9)
1 parent 6fca3b7 commit bdb1c1c

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
13031303
_ = rabbit_amqqueue:update(QName, Fun),
13041304
case ra:force_delete_server(?RA_SYSTEM, ServerId) of
13051305
ok ->
1306+
rabbit_log:info("Deleted a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]),
13061307
ok;
13071308
{error, {badrpc, nodedown}} ->
13081309
ok;
@@ -1893,13 +1894,15 @@ force_shrink_member_to_current_member(VHost, Name) ->
18931894
case rabbit_amqqueue:lookup(QName) of
18941895
{ok, Q} when ?is_amqqueue(Q) ->
18951896
{RaName, _} = amqqueue:get_pid(Q),
1897+
OtherNodes = lists:delete(Node, get_nodes(Q)),
18961898
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
18971899
Fun = fun (Q0) ->
18981900
TS0 = amqqueue:get_type_state(Q0),
18991901
TS = TS0#{nodes => [Node]},
19001902
amqqueue:set_type_state(Q, TS)
19011903
end,
19021904
_ = rabbit_amqqueue:update(QName, Fun),
1905+
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes],
19031906
rabbit_log:warning("Disaster recovery procedure: shrinking finished");
19041907
_ ->
19051908
rabbit_log:warning("Disaster recovery procedure: shrinking failed, queue ~p not found at vhost ~p", [Name, VHost]),
@@ -1912,14 +1915,16 @@ force_all_queues_shrink_member_to_current_member() ->
19121915
_ = [begin
19131916
QName = amqqueue:get_name(Q),
19141917
{RaName, _} = amqqueue:get_pid(Q),
1918+
OtherNodes = lists:delete(Node, get_nodes(Q)),
19151919
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
19161920
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
19171921
Fun = fun (QQ) ->
19181922
TS0 = amqqueue:get_type_state(QQ),
19191923
TS = TS0#{nodes => [Node]},
19201924
amqqueue:set_type_state(QQ, TS)
19211925
end,
1922-
_ = rabbit_amqqueue:update(QName, Fun)
1926+
_ = rabbit_amqqueue:update(QName, Fun),
1927+
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes]
19231928
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
19241929
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
19251930
ok.

0 commit comments

Comments
 (0)