Skip to content

Commit 1997e95

Browse files
Reconcile QQ node dead during delete and redeclare
Co-authored-by: Péter Gömöri <[email protected]>
1 parent a09383d commit 1997e95

File tree

2 files changed

+219
-23
lines changed

2 files changed

+219
-23
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,12 @@ start_cluster(Q) ->
270270
{LeaderNode, FollowerNodes} =
271271
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
272272
LeaderId = {RaName, LeaderNode},
273+
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
274+
|| Node <- [LeaderNode | FollowerNodes]]),
273275
NewQ0 = amqqueue:set_pid(Q, LeaderId),
274276
NewQ1 = amqqueue:set_type_state(NewQ0,
275-
#{nodes => [LeaderNode | FollowerNodes]}),
277+
#{nodes => [LeaderNode | FollowerNodes],
278+
uids => UIDs}),
276279

277280
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
278281
rabbit_fifo, version, [],
@@ -790,6 +793,24 @@ recover(_Vhost, Queues) ->
790793
ServerId = {Name, node()},
791794
QName = amqqueue:get_name(Q0),
792795
MutConf = make_mutable_config(Q0),
796+
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
797+
QTypeState0 = amqqueue:get_type_state(Q0),
798+
RaUIds = maps:get(uids, QTypeState0, undefined),
799+
QTypeState = case RaUIds of
800+
undefined ->
801+
%% Queue is not aware of node to uid mapping, do nothing
802+
QTypeState0;
803+
#{node() := RaUId} ->
804+
%% Queue is aware and uid for current node is correct, do nothing
805+
QTypeState0;
806+
_ ->
807+
%% Queue is aware but either current node has no UId or it
808+
%% does not match the one returned by ra_directory, regen uid
809+
maybe_delete_data_dir(RaUId),
810+
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
811+
QTypeState0#{uids := RaUIds#{node() => NewRaUId}}
812+
end,
813+
Q = amqqueue:set_type_state(Q0, QTypeState),
793814
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
794815
ok ->
795816
% queue was restarted, good
@@ -802,7 +823,7 @@ recover(_Vhost, Queues) ->
802823
[rabbit_misc:rs(QName), Err1]),
803824
% queue was never started on this node
804825
% so needs to be started from scratch.
805-
case start_server(make_ra_conf(Q0, ServerId)) of
826+
case start_server(make_ra_conf(Q, ServerId)) of
806827
ok -> ok;
807828
Err2 ->
808829
?LOG_WARNING("recover: quorum queue ~w could not"
@@ -824,8 +845,7 @@ recover(_Vhost, Queues) ->
824845
%% present in the rabbit_queue table and not just in
825846
%% rabbit_durable_queue
826847
%% So many code paths are dependent on this.
827-
ok = rabbit_db_queue:set_dirty(Q0),
828-
Q = Q0,
848+
ok = rabbit_db_queue:set_dirty(Q),
829849
case Res of
830850
ok ->
831851
{[Q | R0], F0};
@@ -1208,12 +1228,17 @@ cleanup_data_dir() ->
12081228
maybe_delete_data_dir(UId) ->
12091229
_ = ra_directory:unregister_name(?RA_SYSTEM, UId),
12101230
Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
1211-
{ok, Config} = ra_log:read_config(Dir),
1212-
case maps:get(machine, Config) of
1213-
{module, rabbit_fifo, _} ->
1214-
ra_lib:recursive_delete(Dir);
1215-
_ ->
1216-
ok
1231+
case filelib:is_dir(Dir) of
1232+
false ->
1233+
ok;
1234+
true ->
1235+
{ok, Config} = ra_log:read_config(Dir),
1236+
case maps:get(machine, Config) of
1237+
{module, rabbit_fifo, _} ->
1238+
ra_lib:recursive_delete(Dir);
1239+
_ ->
1240+
ok
1241+
end
12171242
end.
12181243

12191244
policy_changed(Q) ->
@@ -1378,16 +1403,30 @@ add_member(Q, Node, Membership) ->
13781403
do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT).
13791404

13801405

1381-
do_add_member(Q, Node, Membership, Timeout)
1382-
when ?is_amqqueue(Q) andalso
1383-
?amqqueue_is_quorum(Q) andalso
1406+
do_add_member(Q0, Node, Membership, Timeout)
1407+
when ?is_amqqueue(Q0) andalso
1408+
?amqqueue_is_quorum(Q0) andalso
13841409
is_atom(Node) ->
1385-
{RaName, _} = amqqueue:get_pid(Q),
1386-
QName = amqqueue:get_name(Q),
1410+
{RaName, _} = amqqueue:get_pid(Q0),
1411+
QName = amqqueue:get_name(Q0),
13871412
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13881413
ServerId = {RaName, Node},
1389-
Members = members(Q),
1390-
1414+
Members = members(Q0),
1415+
QTypeState0 = amqqueue:get_type_state(Q0),
1416+
RaUIds = maps:get(uids, QTypeState0, undefined),
1417+
QTypeState = case RaUIds of
1418+
undefined ->
1419+
%% Queue is not aware of node to uid mapping, do nothing
1420+
QTypeState0;
1421+
#{Node := _} ->
1422+
%% Queue is aware and uid for targeted node exists, do nothing
1423+
QTypeState0;
1424+
_ ->
1425+
%% Queue is aware but current node has no UId, regen uid
1426+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1427+
QTypeState0#{uids := RaUIds#{Node => NewRaUId}}
1428+
end,
1429+
Q = amqqueue:set_type_state(Q0, QTypeState),
13911430
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
13921431
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13931432
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1477,7 +1516,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
14771516
Fun = fun(Q1) ->
14781517
update_type_state(
14791518
Q1,
1480-
fun(#{nodes := Nodes} = Ts) ->
1519+
fun(#{nodes := Nodes,
1520+
uids := UIds} = Ts) ->
1521+
Ts#{nodes => lists:delete(Node, Nodes),
1522+
uids => maps:remove(Node, UIds)};
1523+
(#{nodes := Nodes} = Ts) ->
14811524
Ts#{nodes => lists:delete(Node, Nodes)}
14821525
end)
14831526
end,
@@ -1999,7 +2042,15 @@ make_ra_conf(Q, ServerId, TickTimeout,
19992042
#resource{name = QNameBin} = QName,
20002043
RaMachine = ra_machine(Q),
20012044
[{ClusterName, _} | _] = Members = members(Q),
2002-
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
2045+
{_, Node} = ServerId,
2046+
UId = case amqqueue:get_type_state(Q) of
2047+
#{uids := #{Node := Id}} ->
2048+
Id;
2049+
_ ->
2050+
%% Queue was declared on an older version of RabbitMQ
2051+
%% and does not have the node to uid mappings
2052+
ra:new_uid(ra_lib:to_binary(ClusterName))
2053+
end,
20032054
FName = rabbit_misc:rs(QName),
20042055
Formatter = {?MODULE, format_ra_event, [QName]},
20052056
LogCfg = #{uid => UId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 149 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ groups() ->
105105
force_checkpoint,
106106
policy_repair,
107107
gh_12635,
108-
replica_states
108+
replica_states,
109+
restart_after_queue_reincarnation,
110+
no_messages_after_queue_reincarnation
109111
]
110112
++ all_tests()},
111113
{cluster_size_5, [], [start_queue,
@@ -2795,15 +2797,21 @@ add_member_wrong_type(Config) ->
27952797
[<<"/">>, SQ, Server, voter, 5000])).
27962798

27972799
add_member_already_a_member(Config) ->
2798-
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2800+
[Server, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
27992801
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
28002802
QQ = ?config(queue_name, Config),
28012803
?assertEqual({'queue.declare_ok', QQ, 0, 0},
28022804
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2805+
R1 = rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}]),
28032806
%% idempotent by design
28042807
?assertEqual(ok,
28052808
rpc:call(Server, rabbit_quorum_queue, add_member,
2806-
[<<"/">>, QQ, Server, voter, 5000])).
2809+
[<<"/">>, QQ, Server, voter, 5000])),
2810+
?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])),
2811+
?assertEqual(ok,
2812+
rpc:call(Server, rabbit_quorum_queue, add_member,
2813+
[<<"/">>, QQ, Server2, voter, 5000])),
2814+
?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])).
28072815

28082816
add_member_not_found(Config) ->
28092817
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -4920,6 +4928,140 @@ replica_states(Config) ->
49204928
end
49214929
end, Result2).
49224930

4931+
% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/discussions/13131
4932+
restart_after_queue_reincarnation(Config) ->
4933+
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4934+
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
4935+
QName = <<"QQ">>,
4936+
4937+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4938+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4939+
4940+
[Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
4941+
VHost = amqqueue:get_vhost(Q),
4942+
4943+
MessagesPublished = 1000,
4944+
publish_many(Ch, QName, MessagesPublished),
4945+
4946+
%% Trigger a snapshot by purging the queue.
4947+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, purge, [Q]),
4948+
4949+
%% Stop S3
4950+
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3),
4951+
?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)),
4952+
4953+
%% Delete and re-declare queue with the same name.
4954+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]),
4955+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4956+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4957+
4958+
% Now S3 should have the old queue state, and S1 and S2 a new one.
4959+
St1 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]),
4960+
Status0 = [{proplists:get_value(<<"Node Name">>, S), S} || S <- St1],
4961+
S3_Status1 = proplists:get_value(S3, Status0),
4962+
Others_Status1 = [V || {_K, V} <- proplists:delete(S3, Status0)],
4963+
4964+
S3_LastLogIndex = proplists:get_value(<<"Last Log Index">>, S3_Status1),
4965+
S3_LastWritten = proplists:get_value(<<"Last Written">>, S3_Status1),
4966+
S3_LastApplied = proplists:get_value(<<"Last Applied">>, S3_Status1),
4967+
S3_CommitIndex = proplists:get_value(<<"Commit Index">>, S3_Status1),
4968+
S3_Term = proplists:get_value(<<"Term">>, S3_Status1),
4969+
4970+
?assertEqual(noproc, proplists:get_value(<<"Raft State">>, S3_Status1)),
4971+
?assertEqual(unknown, proplists:get_value(<<"Membership">>, S3_Status1)),
4972+
[begin
4973+
?assert(S3_LastLogIndex > proplists:get_value(<<"Last Log Index">>, O)),
4974+
?assert(S3_LastWritten > proplists:get_value(<<"Last Written">>, O)),
4975+
?assert(S3_LastApplied > proplists:get_value(<<"Last Applied">>, O)),
4976+
?assert(S3_CommitIndex > proplists:get_value(<<"Commit Index">>, O)),
4977+
?assertEqual(S3_Term, proplists:get_value(<<"Term">>, O))
4978+
end || O <- Others_Status1],
4979+
4980+
%% Bumping term in online nodes
4981+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]),
4982+
4983+
%% Restart S3
4984+
?assertEqual(ok, rabbit_control_helper:command(start_app, S3)),
4985+
4986+
timer:sleep(1000),
4987+
4988+
%% Now all three nodes should have the new state.
4989+
Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]),
4990+
% They are either leader or follower.
4991+
?assert(
4992+
lists:all(
4993+
fun(NodeStatus) ->
4994+
NodeRaftState = proplists:get_value(<<"Raft State">>, NodeStatus),
4995+
lists:member(NodeRaftState, [leader, follower])
4996+
end, Status2)),
4997+
% Remove "Node Name" and "Raft State" from the status.
4998+
Status3 = [NE1, NE2, NE3]= [
4999+
begin
5000+
R = proplists:delete(<<"Node Name">>, NodeEntry),
5001+
proplists:delete(<<"Raft State">>, R)
5002+
end || NodeEntry <- Status2],
5003+
% Check all other properties have same value on all nodes.
5004+
ct:pal("Status3: ~tp", [Status3]),
5005+
[
5006+
begin
5007+
?assertEqual(V, proplists:get_value(K, NE2)),
5008+
?assertEqual(V, proplists:get_value(K, NE3))
5009+
end || {K, V} <- NE1
5010+
].
5011+
5012+
% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/issues/12366
5013+
no_messages_after_queue_reincarnation(Config) ->
5014+
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
5015+
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
5016+
QName = <<"QQ">>,
5017+
5018+
?assertEqual({'queue.declare_ok', QName, 0, 0},
5019+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
5020+
5021+
[Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
5022+
5023+
publish(Ch, QName, <<"msg1">>),
5024+
publish(Ch, QName, <<"msg2">>),
5025+
5026+
%% Stop S3
5027+
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3),
5028+
?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)),
5029+
5030+
qos(Ch, 1, false),
5031+
subscribe(Ch, QName, false, <<"tag0">>, [], 500),
5032+
DeliveryTag = receive
5033+
{#'basic.deliver'{delivery_tag = DT}, #amqp_msg{}} ->
5034+
receive
5035+
{#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} ->
5036+
ct:fail("did not expect the second one")
5037+
after 500 ->
5038+
DT
5039+
end
5040+
after 500 ->
5041+
ct:fail("Expected some delivery, but got none")
5042+
end,
5043+
5044+
%% Delete and re-declare queue with the same name.
5045+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]),
5046+
?assertEqual({'queue.declare_ok', QName, 0, 0},
5047+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
5048+
5049+
%% Bumping term in online nodes
5050+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]),
5051+
5052+
%% Restart S3
5053+
?assertEqual(ok, rabbit_control_helper:command(start_app, S3)),
5054+
5055+
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
5056+
multiple = false}),
5057+
%% No message should be delivered after reincarnation
5058+
receive
5059+
{#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} ->
5060+
ct:fail("Expected no deliveries, but got one")
5061+
after 500 ->
5062+
ok
5063+
end.
5064+
49235065
%%----------------------------------------------------------------------------
49245066

49255067
same_elements(L1, L2)
@@ -4989,7 +5131,10 @@ consume_empty(Ch, Queue, NoAck) ->
49895131
subscribe(Ch, Queue, NoAck) ->
49905132
subscribe(Ch, Queue, NoAck, <<"ctag">>, []).
49915133

5134+
49925135
subscribe(Ch, Queue, NoAck, Tag, Args) ->
5136+
subscribe(Ch, Queue, NoAck, Tag, Args, ?TIMEOUT).
5137+
subscribe(Ch, Queue, NoAck, Tag, Args, Timeout) ->
49935138
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
49945139
no_ack = NoAck,
49955140
arguments = Args,
@@ -4998,7 +5143,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) ->
49985143
receive
49995144
#'basic.consume_ok'{consumer_tag = Tag} ->
50005145
ok
5001-
after ?TIMEOUT ->
5146+
after Timeout ->
50025147
flush(100),
50035148
exit(subscribe_timeout)
50045149
end.

0 commit comments

Comments
 (0)