Skip to content

Commit 0c2b6a1

Browse files
committed
Force checkpoint in all members
1 parent 7d3292c commit 0c2b6a1

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2119,15 +2119,17 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
21192119
ok.
21202120

21212121
force_checkpoint_on_queue(QName) ->
2122-
Node = node(),
21232122
QNameFmt = rabbit_misc:rs(QName),
21242123
case rabbit_db_queue:get_durable(QName) of
21252124
{ok, Q} when ?amqqueue_is_classic(Q) ->
21262125
{error, classic_queue_not_supported};
21272126
{ok, Q} when ?amqqueue_is_quorum(Q) ->
21282127
{RaName, _} = amqqueue:get_pid(Q),
21292128
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
2130-
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT);
2129+
Nodes = amqqueue:get_nodes(Q),
2130+
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
2131+
|| Node <- Nodes],
2132+
ok;
21312133
{ok, _Q} ->
21322134
{error, not_quorum_queue};
21332135
{error, _} = E ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1343,7 +1343,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
13431343
end || Q <- QQs, VHost <- VHosts].
13441344

13451345
force_checkpoint_on_queue(Config) ->
1346-
[Server0, _Server1, _Server2] =
1346+
[Server0, Server1, Server2] =
13471347
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
13481348
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
13491349
QQ = ?config(queue_name, Config),
@@ -1364,6 +1364,18 @@ force_checkpoint_on_queue(Config) ->
13641364
#{log := #{latest_checkpoint_index := LCI}} = State,
13651365
LCI =:= undefined
13661366
end),
1367+
rabbit_ct_helpers:await_condition(
1368+
fun() ->
1369+
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
1370+
#{log := #{latest_checkpoint_index := LCI}} = State,
1371+
LCI =:= undefined
1372+
end),
1373+
rabbit_ct_helpers:await_condition(
1374+
fun() ->
1375+
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
1376+
#{log := #{latest_checkpoint_index := LCI}} = State,
1377+
LCI =:= undefined
1378+
end),
13671379

13681380
{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
13691381
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),
@@ -1380,6 +1392,20 @@ force_checkpoint_on_queue(Config) ->
13801392
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
13811393
#{log := #{latest_checkpoint_index := LCI}} = State,
13821394
(LCI =/= undefined) andalso (LCI >= N)
1395+
end),
1396+
rabbit_ct_helpers:await_condition(
1397+
fun() ->
1398+
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
1399+
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1400+
#{log := #{latest_checkpoint_index := LCI}} = State,
1401+
(LCI =/= undefined) andalso (LCI >= N)
1402+
end),
1403+
rabbit_ct_helpers:await_condition(
1404+
fun() ->
1405+
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
1406+
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
1407+
#{log := #{latest_checkpoint_index := LCI}} = State,
1408+
(LCI =/= undefined) andalso (LCI >= N)
13831409
end).
13841410

13851411
force_checkpoint(Config) ->

0 commit comments

Comments
 (0)