Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -442,17 +442,25 @@ become_leader0(QName, Name) ->
all_replica_states() ->
Rows0 = ets:tab2list(ra_state),
Rows = lists:map(fun
({K, follower, promotable}) ->
{K, promotable};
({K, follower, non_voter}) ->
{K, non_voter};
({K, S, _}) ->
%% voter or unknown
{K, S};
(T) ->
T
(T = {K, _, _}) ->
case rabbit_process:is_registered_process_alive(K) of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whereis/1 will do here, you are already on the node where the process is running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i avoided the rabbit_process:is_process_alive/1 which does the extra distribution work. But also noticed we have rabbit_process:is_registered_process_alive/1 which is already doing whereis/1 with is_pid/1 guard check on the local node, which i'd still have to do on the case statement. seems to be doing same thing (if i'm seeing correct)

Copy link
Contributor

@kjnilsson kjnilsson Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case whereis(K) of
    undefined ->
         ...
    Pid -> 
        to_replica_state(T)
end

I'm not sure using a helper function for the above is worth it, just causes read indirection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k cool, definitely easier to read. sorry for the back-&-forth on this small item (tendency has been check/use rabbit helpers 1st 😊 after finding myself rewriting some existing utils in the past 😅 ). Yep, the read indirection not necessary here 👍

true ->
to_replica_state(T);
false ->
[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about returning nil here. is that expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think the noproc issue, if my understanding this right, is when we have an entry in ra_state of a member which had already terminated, but wasnt cleaned up from ets from FSM terminate callback. Then its still listed here as if it were still active, yet not alive. But if alive=false, we should handle it as if it had been removed from ra_state, so returning nil here. (i'm not sure if we should also be doing a deletion from ra_state here when the fsm proc is not alive)

Copy link
Contributor

@kjnilsson kjnilsson Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all other code paths return a tuple {RegName, Status} so returning [] feels a bit odd, could you either return {K, noproc} or use lists:filtermap to remove the entry all together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, lists:filtermap sounds good. this was removing/flattening out the [] on the conversion to a map, but no need for that extra step. updated 👍

end;
(_T) ->
[]
end, Rows0),
{node(), maps:from_list(Rows)}.
{node(), maps:from_list(lists:flatten(Rows))}.

to_replica_state({K, follower, promotable}) ->
{K, promotable};
to_replica_state({K, follower, non_voter}) ->
{K, non_voter};
to_replica_state({K, S, _}) ->
%% voter or unknown
{K, S}.

-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
list_with_minimum_quorum() ->
Expand Down
51 changes: 50 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ groups() ->
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
policy_repair,
gh_12635
gh_12635,
replica_states
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
Expand Down Expand Up @@ -4352,6 +4353,54 @@ requeue_multiple_false(Config) ->
?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})).

replica_states(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),

[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- [<<"Q1">>, <<"Q2">>, <<"Q3">>]],

Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),

[Q1_ClusterName, Q2_ClusterName, Q3_ClusterName] =
[begin
{ClusterName, _} = amqqueue:get_pid(Q),
ClusterName
end
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue],

Result1 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []),
ct:pal("all replica states: ~tp", [Result1]),

lists:map(fun({_Node, ReplicaStates}) ->
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q2_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q3_ClusterName, ReplicaStates))
end, Result1),

%% Unregister a few queues (same outcome of 'noproc')
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q2_ClusterName]),
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q3_ClusterName]),

?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q2_ClusterName])),
?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q3_ClusterName])),

Result2 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []),
ct:pal("replica states with a node missing Q1 and Q2: ~tp", [Result2]),

lists:map(fun({Node, ReplicaStates}) ->
if Node == Server ->
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
?assertNot(maps:is_key(Q2_ClusterName, ReplicaStates)),
?assertNot(maps:is_key(Q3_ClusterName, ReplicaStates));
true ->
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q2_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q3_ClusterName, ReplicaStates))
end
end, Result2).

%%----------------------------------------------------------------------------

same_elements(L1, L2)
Expand Down
Loading