Skip to content

Commit 782fccf

Browse files
Reconcile QQ node dead during delete and redeclare
Co-authored-by: Péter Gömöri <[email protected]>
1 parent b6dcca0 commit 782fccf

File tree

2 files changed

+219
-23
lines changed

2 files changed

+219
-23
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,12 @@ start_cluster(Q) ->
276276
{LeaderNode, FollowerNodes} =
277277
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
278278
LeaderId = {RaName, LeaderNode},
279+
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
280+
|| Node <- [LeaderNode | FollowerNodes]]),
279281
NewQ0 = amqqueue:set_pid(Q, LeaderId),
280282
NewQ1 = amqqueue:set_type_state(NewQ0,
281-
#{nodes => [LeaderNode | FollowerNodes]}),
283+
#{nodes => [LeaderNode | FollowerNodes],
284+
uids => UIDs}),
282285

283286
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
284287
rabbit_fifo, version, [],
@@ -792,6 +795,24 @@ recover(_Vhost, Queues) ->
792795
ServerId = {Name, node()},
793796
QName = amqqueue:get_name(Q0),
794797
MutConf = make_mutable_config(Q0),
798+
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
799+
QTypeState0 = amqqueue:get_type_state(Q0),
800+
RaUIds = maps:get(uids, QTypeState0, undefined),
801+
QTypeState = case RaUIds of
802+
undefined ->
803+
%% Queue is not aware of node to uid mapping, do nothing
804+
QTypeState0;
805+
#{node() := RaUId} ->
806+
%% Queue is aware and uid for current node is correct, do nothing
807+
QTypeState0;
808+
_ ->
809+
%% Queue is aware but either current node has no UId or it
810+
%% does not match the one returned by ra_directory, regen uid
811+
maybe_delete_data_dir(RaUId),
812+
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
813+
QTypeState0#{uids := RaUIds#{node() => NewRaUId}}
814+
end,
815+
Q = amqqueue:set_type_state(Q0, QTypeState),
795816
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
796817
ok ->
797818
% queue was restarted, good
@@ -804,7 +825,7 @@ recover(_Vhost, Queues) ->
804825
[rabbit_misc:rs(QName), Err1]),
805826
% queue was never started on this node
806827
% so needs to be started from scratch.
807-
case start_server(make_ra_conf(Q0, ServerId)) of
828+
case start_server(make_ra_conf(Q, ServerId)) of
808829
ok -> ok;
809830
Err2 ->
810831
?LOG_WARNING("recover: quorum queue ~w could not"
@@ -826,8 +847,7 @@ recover(_Vhost, Queues) ->
826847
%% present in the rabbit_queue table and not just in
827848
%% rabbit_durable_queue
828849
%% So many code paths are dependent on this.
829-
ok = rabbit_db_queue:set_dirty(Q0),
830-
Q = Q0,
850+
ok = rabbit_db_queue:set_dirty(Q),
831851
case Res of
832852
ok ->
833853
{[Q | R0], F0};
@@ -1208,12 +1228,17 @@ cleanup_data_dir() ->
12081228
maybe_delete_data_dir(UId) ->
12091229
_ = ra_directory:unregister_name(?RA_SYSTEM, UId),
12101230
Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
1211-
{ok, Config} = ra_log:read_config(Dir),
1212-
case maps:get(machine, Config) of
1213-
{module, rabbit_fifo, _} ->
1214-
ra_lib:recursive_delete(Dir);
1215-
_ ->
1216-
ok
1231+
case filelib:is_dir(Dir) of
1232+
false ->
1233+
ok;
1234+
true ->
1235+
{ok, Config} = ra_log:read_config(Dir),
1236+
case maps:get(machine, Config) of
1237+
{module, rabbit_fifo, _} ->
1238+
ra_lib:recursive_delete(Dir);
1239+
_ ->
1240+
ok
1241+
end
12171242
end.
12181243

12191244
policy_changed(Q) ->
@@ -1379,16 +1404,30 @@ add_member(Q, Node, Membership) ->
13791404
do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT).
13801405

13811406

1382-
do_add_member(Q, Node, Membership, Timeout)
1383-
when ?is_amqqueue(Q) andalso
1384-
?amqqueue_is_quorum(Q) andalso
1407+
do_add_member(Q0, Node, Membership, Timeout)
1408+
when ?is_amqqueue(Q0) andalso
1409+
?amqqueue_is_quorum(Q0) andalso
13851410
is_atom(Node) ->
1386-
{RaName, _} = amqqueue:get_pid(Q),
1387-
QName = amqqueue:get_name(Q),
1411+
{RaName, _} = amqqueue:get_pid(Q0),
1412+
QName = amqqueue:get_name(Q0),
13881413
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13891414
ServerId = {RaName, Node},
1390-
Members = members(Q),
1391-
1415+
Members = members(Q0),
1416+
QTypeState0 = amqqueue:get_type_state(Q0),
1417+
RaUIds = maps:get(uids, QTypeState0, undefined),
1418+
QTypeState = case RaUIds of
1419+
undefined ->
1420+
%% Queue is not aware of node to uid mapping, do nothing
1421+
QTypeState0;
1422+
#{Node := _} ->
1423+
%% Queue is aware and uid for targeted node exists, do nothing
1424+
QTypeState0;
1425+
_ ->
1426+
%% Queue is aware but current node has no UId, regen uid
1427+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1428+
QTypeState0#{uids := RaUIds#{Node => NewRaUId}}
1429+
end,
1430+
Q = amqqueue:set_type_state(Q0, QTypeState),
13921431
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
13931432
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13941433
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1478,7 +1517,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
14781517
Fun = fun(Q1) ->
14791518
update_type_state(
14801519
Q1,
1481-
fun(#{nodes := Nodes} = Ts) ->
1520+
fun(#{nodes := Nodes,
1521+
uids := UIds} = Ts) ->
1522+
Ts#{nodes => lists:delete(Node, Nodes),
1523+
uids => maps:remove(Node, UIds)};
1524+
(#{nodes := Nodes} = Ts) ->
14821525
Ts#{nodes => lists:delete(Node, Nodes)}
14831526
end)
14841527
end,
@@ -1986,7 +2029,15 @@ make_ra_conf(Q, ServerId, TickTimeout,
19862029
#resource{name = QNameBin} = QName,
19872030
RaMachine = ra_machine(Q),
19882031
[{ClusterName, _} | _] = Members = members(Q),
1989-
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
2032+
{_, Node} = ServerId,
2033+
UId = case amqqueue:get_type_state(Q) of
2034+
#{uids := #{Node := Id}} ->
2035+
Id;
2036+
_ ->
2037+
%% Queue was declared on an older version of RabbitMQ
2038+
%% and does not have the node to uid mappings
2039+
ra:new_uid(ra_lib:to_binary(ClusterName))
2040+
end,
19902041
FName = rabbit_misc:rs(QName),
19912042
Formatter = {?MODULE, format_ra_event, [QName]},
19922043
LogCfg = #{uid => UId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 149 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ groups() ->
105105
force_checkpoint,
106106
policy_repair,
107107
gh_12635,
108-
replica_states
108+
replica_states,
109+
restart_after_queue_reincarnation,
110+
no_messages_after_queue_reincarnation
109111
]
110112
++ all_tests()},
111113
{cluster_size_5, [], [start_queue,
@@ -2792,15 +2794,21 @@ add_member_wrong_type(Config) ->
27922794
[<<"/">>, SQ, Server, voter, 5000])).
27932795

27942796
add_member_already_a_member(Config) ->
2795-
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2797+
[Server, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
27962798
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
27972799
QQ = ?config(queue_name, Config),
27982800
?assertEqual({'queue.declare_ok', QQ, 0, 0},
27992801
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2802+
R1 = rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}]),
28002803
%% idempotent by design
28012804
?assertEqual(ok,
28022805
rpc:call(Server, rabbit_quorum_queue, add_member,
2803-
[<<"/">>, QQ, Server, voter, 5000])).
2806+
[<<"/">>, QQ, Server, voter, 5000])),
2807+
?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])),
2808+
?assertEqual(ok,
2809+
rpc:call(Server, rabbit_quorum_queue, add_member,
2810+
[<<"/">>, QQ, Server2, voter, 5000])),
2811+
?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])).
28042812

28052813
add_member_not_found(Config) ->
28062814
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -4872,6 +4880,140 @@ replica_states(Config) ->
48724880
end
48734881
end, Result2).
48744882

4883+
% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/discussions/13131
4884+
restart_after_queue_reincarnation(Config) ->
4885+
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4886+
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
4887+
QName = <<"QQ">>,
4888+
4889+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4890+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4891+
4892+
[Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
4893+
VHost = amqqueue:get_vhost(Q),
4894+
4895+
MessagesPublished = 1000,
4896+
publish_many(Ch, QName, MessagesPublished),
4897+
4898+
%% Trigger a snapshot by purging the queue.
4899+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, purge, [Q]),
4900+
4901+
%% Stop S3
4902+
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3),
4903+
?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)),
4904+
4905+
%% Delete and re-declare queue with the same name.
4906+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]),
4907+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4908+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4909+
4910+
% Now S3 should have the old queue state, and S1 and S2 a new one.
4911+
St1 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]),
4912+
Status0 = [{proplists:get_value(<<"Node Name">>, S), S} || S <- St1],
4913+
S3_Status1 = proplists:get_value(S3, Status0),
4914+
Others_Status1 = [V || {_K, V} <- proplists:delete(S3, Status0)],
4915+
4916+
S3_LastLogIndex = proplists:get_value(<<"Last Log Index">>, S3_Status1),
4917+
S3_LastWritten = proplists:get_value(<<"Last Written">>, S3_Status1),
4918+
S3_LastApplied = proplists:get_value(<<"Last Applied">>, S3_Status1),
4919+
S3_CommitIndex = proplists:get_value(<<"Commit Index">>, S3_Status1),
4920+
S3_Term = proplists:get_value(<<"Term">>, S3_Status1),
4921+
4922+
?assertEqual(noproc, proplists:get_value(<<"Raft State">>, S3_Status1)),
4923+
?assertEqual(unknown, proplists:get_value(<<"Membership">>, S3_Status1)),
4924+
[begin
4925+
?assert(S3_LastLogIndex > proplists:get_value(<<"Last Log Index">>, O)),
4926+
?assert(S3_LastWritten > proplists:get_value(<<"Last Written">>, O)),
4927+
?assert(S3_LastApplied > proplists:get_value(<<"Last Applied">>, O)),
4928+
?assert(S3_CommitIndex > proplists:get_value(<<"Commit Index">>, O)),
4929+
?assertEqual(S3_Term, proplists:get_value(<<"Term">>, O))
4930+
end || O <- Others_Status1],
4931+
4932+
%% Bumping term in online nodes
4933+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]),
4934+
4935+
%% Restart S3
4936+
?assertEqual(ok, rabbit_control_helper:command(start_app, S3)),
4937+
4938+
timer:sleep(1000),
4939+
4940+
%% Now all three nodes should have the new state.
4941+
Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]),
4942+
% They are either leader or follower.
4943+
?assert(
4944+
lists:all(
4945+
fun(NodeStatus) ->
4946+
NodeRaftState = proplists:get_value(<<"Raft State">>, NodeStatus),
4947+
lists:member(NodeRaftState, [leader, follower])
4948+
end, Status2)),
4949+
% Remove "Node Name" and "Raft State" from the status.
4950+
Status3 = [NE1, NE2, NE3]= [
4951+
begin
4952+
R = proplists:delete(<<"Node Name">>, NodeEntry),
4953+
proplists:delete(<<"Raft State">>, R)
4954+
end || NodeEntry <- Status2],
4955+
% Check all other properties have same value on all nodes.
4956+
ct:pal("Status3: ~tp", [Status3]),
4957+
[
4958+
begin
4959+
?assertEqual(V, proplists:get_value(K, NE2)),
4960+
?assertEqual(V, proplists:get_value(K, NE3))
4961+
end || {K, V} <- NE1
4962+
].
4963+
4964+
% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/issues/12366
4965+
no_messages_after_queue_reincarnation(Config) ->
4966+
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4967+
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
4968+
QName = <<"QQ">>,
4969+
4970+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4971+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4972+
4973+
[Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
4974+
4975+
publish(Ch, QName, <<"msg1">>),
4976+
publish(Ch, QName, <<"msg2">>),
4977+
4978+
%% Stop S3
4979+
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3),
4980+
?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)),
4981+
4982+
qos(Ch, 1, false),
4983+
subscribe(Ch, QName, false, <<"tag0">>, [], 500),
4984+
DeliveryTag = receive
4985+
{#'basic.deliver'{delivery_tag = DT}, #amqp_msg{}} ->
4986+
receive
4987+
{#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} ->
4988+
ct:fail("did not expect the second one")
4989+
after 500 ->
4990+
DT
4991+
end
4992+
after 500 ->
4993+
ct:fail("Expected some delivery, but got none")
4994+
end,
4995+
4996+
%% Delete and re-declare queue with the same name.
4997+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]),
4998+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4999+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
5000+
5001+
%% Bumping term in online nodes
5002+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]),
5003+
5004+
%% Restart S3
5005+
?assertEqual(ok, rabbit_control_helper:command(start_app, S3)),
5006+
5007+
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
5008+
multiple = false}),
5009+
%% No message should be delivered after reincarnation
5010+
receive
5011+
{#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} ->
5012+
ct:fail("Expected no deliveries, but got one")
5013+
after 500 ->
5014+
ok
5015+
end.
5016+
48755017
%%----------------------------------------------------------------------------
48765018

48775019
same_elements(L1, L2)
@@ -4941,7 +5083,10 @@ consume_empty(Ch, Queue, NoAck) ->
49415083
subscribe(Ch, Queue, NoAck) ->
49425084
subscribe(Ch, Queue, NoAck, <<"ctag">>, []).
49435085

5086+
49445087
subscribe(Ch, Queue, NoAck, Tag, Args) ->
5088+
subscribe(Ch, Queue, NoAck, Tag, Args, ?TIMEOUT).
5089+
subscribe(Ch, Queue, NoAck, Tag, Args, Timeout) ->
49455090
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
49465091
no_ack = NoAck,
49475092
arguments = Args,
@@ -4950,7 +5095,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) ->
49505095
receive
49515096
#'basic.consume_ok'{consumer_tag = Tag} ->
49525097
ok
4953-
after ?TIMEOUT ->
5098+
after Timeout ->
49545099
flush(100),
49555100
exit(subscribe_timeout)
49565101
end.

0 commit comments

Comments
 (0)