Skip to content

Commit a5776a0

Browse files
Merge pull request #8270 from cloudamqp/fix_stream_status2
Don't crash stream_status cmd if member node is down
2 parents 465ad3c + 23b31e1 commit a5776a0

File tree

2 files changed

+23
-18
lines changed

2 files changed

+23
-18
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ query_pid(StreamId, MFA) when is_list(StreamId) ->
271271
-spec stream_overview(stream_id()) ->
272272
{ok, #{epoch := osiris:epoch(),
273273
members := #{node() := #{state := term(),
274-
role := writer | replica,
274+
role := {writer | replica, osiris:epoch()},
275275
current := term(),
276276
target := running | stopped}},
277277
num_listeners := non_neg_integer(),

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -650,49 +650,54 @@ status(Vhost, QueueName) ->
650650
{error, quorum_queue_not_supported};
651651
{ok, Q} when ?amqqueue_is_stream(Q) ->
652652
[begin
653-
[{role, Role},
653+
[get_key(role, C),
654654
get_key(node, C),
655655
get_key(epoch, C),
656656
get_key(offset, C),
657657
get_key(committed_offset, C),
658658
get_key(first_offset, C),
659659
get_key(readers, C),
660660
get_key(segments, C)]
661-
end || {Role, C} <- get_counters(Q)];
661+
end || C <- get_counters(Q)];
662662
{error, not_found} = E ->
663663
E
664664
end.
665665

666666
get_key(Key, Cnt) ->
667667
{Key, maps:get(Key, Cnt, undefined)}.
668668

669-
-spec is_writer({pid() | undefined, writer | replica}) -> boolean().
670-
is_writer({_, writer}) -> true;
671-
is_writer(_Member) -> false.
669+
-spec get_role({pid() | undefined, writer | replica}) -> writer | replica.
670+
get_role({_, Role}) -> Role.
672671

673672
get_counters(Q) ->
674673
#{name := StreamId} = amqqueue:get_type_state(Q),
675-
{ok, #{members := Members}} = rabbit_stream_coordinator:stream_overview(StreamId),
674+
{ok, Members} = rabbit_stream_coordinator:members(StreamId),
676675
%% split members to query the writer last
677676
%% this minimizes the risk of confusing output where replicas are ahead of the writer
678-
Writer = maps:keys(maps:filter(fun (_, M) -> is_writer(M) end, Members)),
679-
Replicas = maps:keys(maps:filter(fun (_, M) -> not is_writer(M) end, Members)),
677+
NodeRoles = [{Node, get_role(M)} || {Node, M} <- maps:to_list(Members)],
678+
{Writer, Replicas} = lists:partition(fun({_, Role}) -> Role =:= writer end, NodeRoles),
680679
QName = amqqueue:get_name(Q),
681-
Counters0 = [begin
682-
safe_get_overview(Node, QName)
683-
end || Node <- lists:append(Replicas, Writer)],
684-
Counters1 = lists:filter(fun (X) -> X =/= undefined end, Counters0),
680+
Counters = [safe_get_overview(Node, QName, Role)
681+
|| {Node, Role} <- lists:append(Replicas, Writer)],
685682
%% sort again in the original order (by node)
686-
lists:sort(fun ({_, M1}, {_, M2}) -> maps:get(node, M1) < maps:get(node, M2) end, Counters1).
683+
lists:sort(fun (M1, M2) -> maps:get(node, M1) < maps:get(node, M2) end, Counters).
687684

688-
safe_get_overview(Node, QName) ->
685+
-spec safe_get_overview(node(), rabbit_amqqueue:name(), writer | reader) ->
686+
map().
687+
safe_get_overview(Node, QName, Role) ->
689688
case rpc:call(Node, ?MODULE, get_overview, [QName]) of
690689
{badrpc, _} ->
691-
#{node => Node};
692-
Result ->
693-
Result
690+
#{role => Role,
691+
node => Node};
692+
undefined ->
693+
#{role => Role,
694+
node => Node};
695+
{Role, C} ->
696+
C#{role => Role}
694697
end.
695698

699+
-spec get_overview(rabbit_amqqueue:name()) ->
700+
{writer | reader, map()} | undefined.
696701
get_overview(QName) ->
697702
case osiris_counters:overview({osiris_writer, QName}) of
698703
undefined ->

0 commit comments

Comments
 (0)