Skip to content

Commit 79aa0bb

Browse files
dcorbachomergify[bot]
authored andcommitted
Force checkpoint in all members
(cherry picked from commit 0c2b6a1)
1 parent 901343d commit 79aa0bb

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2118,15 +2118,17 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
21182118
ok.
21192119

21202120
force_checkpoint_on_queue(QName) ->
2121-
Node = node(),
21222121
QNameFmt = rabbit_misc:rs(QName),
21232122
case rabbit_db_queue:get_durable(QName) of
21242123
{ok, Q} when ?amqqueue_is_classic(Q) ->
21252124
{error, classic_queue_not_supported};
21262125
{ok, Q} when ?amqqueue_is_quorum(Q) ->
21272126
{RaName, _} = amqqueue:get_pid(Q),
21282127
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2129-
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT);
2128+
Nodes = amqqueue:get_nodes(Q),
2129+
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
2130+
|| Node <- Nodes],
2131+
ok;
21302132
{ok, _Q} ->
21312133
{error, not_quorum_queue};
21322134
{error, _} = E ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1337,7 +1337,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
13371337
end || Q <- QQs, VHost <- VHosts].
13381338

13391339
force_checkpoint_on_queue(Config) ->
1340-
[Server0, _Server1, _Server2] =
1340+
[Server0, Server1, Server2] =
13411341
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
13421342
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
13431343
QQ = ?config(queue_name, Config),
@@ -1358,6 +1358,18 @@ force_checkpoint_on_queue(Config) ->
13581358
#{log := #{latest_checkpoint_index := LCI}} = State,
13591359
LCI =:= undefined
13601360
end),
1361+
rabbit_ct_helpers:await_condition(
1362+
fun() ->
1363+
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
1364+
#{log := #{latest_checkpoint_index := LCI}} = State,
1365+
LCI =:= undefined
1366+
end),
1367+
rabbit_ct_helpers:await_condition(
1368+
fun() ->
1369+
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
1370+
#{log := #{latest_checkpoint_index := LCI}} = State,
1371+
LCI =:= undefined
1372+
end),
13611373

13621374
{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
13631375
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),
@@ -1374,6 +1386,20 @@ force_checkpoint_on_queue(Config) ->
13741386
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
13751387
#{log := #{latest_checkpoint_index := LCI}} = State,
13761388
(LCI =/= undefined) andalso (LCI >= N)
1389+
end),
1390+
rabbit_ct_helpers:await_condition(
1391+
fun() ->
1392+
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
1393+
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1394+
#{log := #{latest_checkpoint_index := LCI}} = State,
1395+
(LCI =/= undefined) andalso (LCI >= N)
1396+
end),
1397+
rabbit_ct_helpers:await_condition(
1398+
fun() ->
1399+
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
1400+
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1401+
#{log := #{latest_checkpoint_index := LCI}} = State,
1402+
(LCI =/= undefined) andalso (LCI >= N)
13771403
end).
13781404

13791405
force_checkpoint(Config) ->

0 commit comments

Comments
 (0)