Skip to content

Commit 1f6341c

Browse files
committed
quorum_queue_SUITE: Use Khepri fence before checking number of replicas
[Why] When `wait_for_messages_ready/3` returns, we are sure that the replicas are in the expected state. However, the `#amqqueue{}` record is updated in Khepri, we don't know when all Khepri store members will be up-to-date. It can happen that `Server0` is not up-to-date when we query that record to get the list of replicass, leading to a test failure. [How] First, the check is moved to its own function is `queue_utils`. Then, if Khepri is being used, we use a Khepri fence to ensure previous operations were applied on the given server. This way, we get a consistent view of the `#amqqueue{}` record and thus the list of replicas.
1 parent 46dbb1f commit 1f6341c

File tree

2 files changed

+39
-36
lines changed

2 files changed

+39
-36
lines changed

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,27 +1306,20 @@ force_shrink_member_to_current_member(Config) ->
13061306
RaName = ra_name(QQ),
13071307
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
13081308
wait_for_messages_ready([Server0], RaName, 3),
1309-
1310-
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1311-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1312-
?assertEqual(3, length(Nodes0)),
1309+
queue_utils:assert_number_of_replicas(
1310+
Config, Server0, <<"/">>, QQ, 3),
13131311

13141312
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
13151313
force_shrink_member_to_current_member, [<<"/">>, QQ]),
13161314

13171315
wait_for_messages_ready([Server0], RaName, 3),
1318-
1319-
{ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1320-
#{nodes := Nodes1} = amqqueue:get_type_state(Q1),
1321-
?assertEqual(1, length(Nodes1)),
1316+
queue_utils:assert_number_of_replicas(
1317+
Config, Server0, <<"/">>, QQ, 1),
13221318

13231319
%% grow queues back to all nodes
13241320
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]],
1325-
1326-
wait_for_messages_ready([Server0], RaName, 3),
1327-
{ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1328-
#{nodes := Nodes2} = amqqueue:get_type_state(Q2),
1329-
?assertEqual(3, length(Nodes2))
1321+
queue_utils:assert_number_of_replicas(
1322+
Config, Server0, <<"/">>, QQ, 3)
13301323
end.
13311324

13321325
force_all_queues_shrink_member_to_current_member(Config) ->
@@ -1351,9 +1344,8 @@ force_all_queues_shrink_member_to_current_member(Config) ->
13511344
RaName = ra_name(Q),
13521345
rabbit_ct_client_helpers:publish(Ch, Q, 3),
13531346
wait_for_messages_ready([Server0], RaName, 3),
1354-
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1355-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1356-
?assertEqual(3, length(Nodes0))
1347+
queue_utils:assert_number_of_replicas(
1348+
Config, Server0, <<"/">>, Q, 3)
13571349
end || Q <- QQs],
13581350

13591351
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
@@ -1362,9 +1354,8 @@ force_all_queues_shrink_member_to_current_member(Config) ->
13621354
[begin
13631355
RaName = ra_name(Q),
13641356
wait_for_messages_ready([Server0], RaName, 3),
1365-
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1366-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1367-
?assertEqual(1, length(Nodes0))
1357+
queue_utils:assert_number_of_replicas(
1358+
Config, Server0, <<"/">>, Q, 1)
13681359
end || Q <- QQs],
13691360

13701361
%% grow queues back to all nodes
@@ -1373,9 +1364,8 @@ force_all_queues_shrink_member_to_current_member(Config) ->
13731364
[begin
13741365
RaName = ra_name(Q),
13751366
wait_for_messages_ready([Server0], RaName, 3),
1376-
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1377-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1378-
?assertEqual(3, length(Nodes0))
1367+
queue_utils:assert_number_of_replicas(
1368+
Config, Server0, <<"/">>, Q, 3)
13791369
end || Q <- QQs]
13801370
end.
13811371

@@ -1417,9 +1407,8 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
14171407
QQRes = rabbit_misc:r(VHost, queue, Q),
14181408
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
14191409
wait_for_messages_ready([Server0], RaName, 3),
1420-
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1421-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1422-
?assertEqual(3, length(Nodes0))
1410+
queue_utils:assert_number_of_replicas(
1411+
Config, Server0, VHost, Q, 3)
14231412
end || Q <- QQs, VHost <- VHosts],
14241413

14251414
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
@@ -1429,11 +1418,13 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
14291418
QQRes = rabbit_misc:r(VHost, queue, Q),
14301419
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
14311420
wait_for_messages_ready([Server0], RaName, 3),
1432-
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1433-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
14341421
case VHost of
1435-
VHost1 -> ?assertEqual(3, length(Nodes0));
1436-
VHost2 -> ?assertEqual(1, length(Nodes0))
1422+
VHost1 ->
1423+
queue_utils:assert_number_of_replicas(
1424+
Config, Server0, VHost, Q, 3);
1425+
VHost2 ->
1426+
queue_utils:assert_number_of_replicas(
1427+
Config, Server0, VHost, Q, 1)
14371428
end
14381429
end || Q <- QQs, VHost <- VHosts],
14391430

@@ -1444,9 +1435,8 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
14441435
QQRes = rabbit_misc:r(VHost, queue, Q),
14451436
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
14461437
wait_for_messages_ready([Server0], RaName, 3),
1447-
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1448-
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1449-
?assertEqual(3, length(Nodes0))
1438+
queue_utils:assert_number_of_replicas(
1439+
Config, Server0, VHost, Q, 3)
14501440
end || Q <- QQs, VHost <- VHosts]
14511441
end.
14521442

@@ -2946,9 +2936,8 @@ delete_member_member_already_deleted(Config) ->
29462936
?assertEqual(ok,
29472937
rpc:call(Server, rabbit_quorum_queue, delete_member,
29482938
[<<"/">>, QQ, Server2])),
2949-
{ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
2950-
#{nodes := Nodes} = amqqueue:get_type_state(Q),
2951-
?assertEqual(1, length(Nodes)),
2939+
queue_utils:assert_number_of_replicas(
2940+
Config, Server, <<"/">>, QQ, 1),
29522941
ok.
29532942

29542943
delete_member_during_node_down(Config) ->

deps/rabbitmq_ct_helpers/src/queue_utils.erl

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
ra_name/1,
1616
ra_machines_use_same_version/3,
1717
wait_for_local_stream_member/4,
18-
has_local_stream_member_rpc/1
18+
has_local_stream_member_rpc/1,
19+
assert_number_of_replicas/5
1920
]).
2021

2122
-define(WFM_SLEEP, 256).
@@ -191,3 +192,16 @@ has_local_stream_member_rpc(QName) ->
191192
{error, _} ->
192193
false
193194
end.
195+
196+
assert_number_of_replicas(Config, Server, VHost, QQ, Count) ->
197+
_ = case rabbit_ct_broker_helpers:configured_metadata_store(Config) of
198+
khepri ->
199+
rabbit_ct_broker_helpers:rpc(
200+
Config, Server, rabbit_khepri, fence, [30000]);
201+
mnesia ->
202+
ok
203+
end,
204+
{ok, Q} = rabbit_ct_broker_helpers:rpc(
205+
Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]),
206+
#{nodes := Nodes} = amqqueue:get_type_state(Q),
207+
?assertEqual(Count, length(Nodes)).

0 commit comments

Comments
 (0)