Skip to content

Commit 467f14d

Browse files
Merge pull request #2165 from rabbitmq/mgmt-less-improvements
Add format/1 for classic queues and totals/type_specific info for all queue types (cherry picked from commit ddc9878)
1 parent cca5d9b commit 467f14d

File tree

3 files changed

+66
-4
lines changed

3 files changed

+66
-4
lines changed

src/rabbit_amqqueue.erl

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
-export([mark_local_durable_queues_stopped/1]).
5555

5656
-export([rebalance/3]).
57+
-export([collect_info_all/2]).
5758

5859
%% internal
5960
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -1097,7 +1098,7 @@ is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) ->
10971098
end.
10981099

10991100
format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q);
1100-
format(_) -> [].
1101+
format(Q) -> rabbit_amqqueue_process:format(Q).
11011102

11021103
-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
11031104

@@ -1128,7 +1129,7 @@ info_down(Q, DownReason) ->
11281129
info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason).
11291130

11301131
info_down(Q, Items, DownReason) ->
1131-
[{Item, i_down(Item, Q, DownReason)} || Item <- Items].
1132+
[{Item, i_down(Item, Q, DownReason)} || Item <- Items, Item =/= totals, Item =/= type_specific].
11321133

11331134
i_down(name, Q, _) -> amqqueue:get_name(Q);
11341135
i_down(durable, Q, _) -> amqqueue:is_durable(Q);
@@ -1165,6 +1166,26 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
11651166
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
11661167
rabbit_control_misc:await_emitters_termination(Pids).
11671168

1169+
collect_info_all(VHostPath, Items) ->
1170+
Nodes = rabbit_mnesia:cluster_nodes(running),
1171+
Ref = make_ref(),
1172+
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
1173+
rabbit_control_misc:await_emitters_termination(Pids),
1174+
wait_for_queues(Ref, length(Pids), []).
1175+
1176+
wait_for_queues(Ref, N, Acc) ->
1177+
receive
1178+
{Ref, finished} when N == 1 ->
1179+
Acc;
1180+
{Ref, finished} ->
1181+
wait_for_queues(Ref, N - 1, Acc);
1182+
{Ref, Items, continue} ->
1183+
wait_for_queues(Ref, N, [Items | Acc])
1184+
after
1185+
1000 ->
1186+
Acc
1187+
end.
1188+
11681189
emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
11691190
rabbit_control_misc:emitting_map_with_exit_handler(
11701191
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,

src/rabbit_amqqueue_process.erl

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
3333
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
3434
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
35+
-export([format/1]).
3536

3637
%% Queue's state
3738
-record(q, {
@@ -1066,7 +1067,16 @@ stop(State) -> stop(noreply, State).
10661067
stop(noreply, State) -> {stop, normal, State};
10671068
stop(Reply, State) -> {stop, normal, Reply, State}.
10681069

1069-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
1070+
infos(Items, #q{q = Q} = State) ->
1071+
lists:foldr(fun(totals, Acc) ->
1072+
[{messages_ready, i(messages_ready, State)},
1073+
{messages, i(messages, State)},
1074+
{messages_unacknowledged, i(messages_unacknowledged, State)}] ++ Acc;
1075+
(type_specific, Acc) ->
1076+
format(Q) ++ Acc;
1077+
(Item, Acc) ->
1078+
[{Item, i(Item, State)} | Acc]
1079+
end, [], Items).
10701080

10711081
i(name, #q{q = Q}) -> amqqueue:get_name(Q);
10721082
i(durable, #q{q = Q}) -> amqqueue:is_durable(Q);
@@ -1761,6 +1771,18 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
17611771

17621772
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
17631773

1774+
format(Q) when ?is_amqqueue(Q) ->
1775+
case rabbit_mirror_queue_misc:is_mirrored(Q) of
1776+
false ->
1777+
[{node, node(amqqueue:get_pid(Q))}];
1778+
true ->
1779+
Slaves = amqqueue:get_slave_pids(Q),
1780+
SSlaves = amqqueue:get_sync_slave_pids(Q),
1781+
[{slave_nodes, [node(S) || S <- Slaves]},
1782+
{synchronised_slave_nodes, [node(S) || S <- SSlaves]},
1783+
{node, node(amqqueue:get_pid(Q))}]
1784+
end.
1785+
17641786
log_delete_exclusive({ConPid, _ConRef}, State) ->
17651787
log_delete_exclusive(ConPid, State);
17661788
log_delete_exclusive(ConPid, #q{ q = Q }) ->

src/rabbit_quorum_queue.erl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,13 @@ infos(QName) ->
592592
-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
593593

594594
info(Q, Items) ->
595-
[{Item, i(Item, Q)} || Item <- Items].
595+
lists:foldr(fun(totals, Acc) ->
596+
i_totals(Q) ++ Acc;
597+
(type_specific, Acc) ->
598+
format(Q) ++ Acc;
599+
(Item, Acc) ->
600+
[{Item, i(Item, Q)} | Acc]
601+
end, [], Items).
596602

597603
-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
598604

@@ -973,6 +979,19 @@ find_quorum_queues(VHost) ->
973979
amqqueue:qnode(Q) == Node]))
974980
end).
975981

982+
i_totals(Q) when ?is_amqqueue(Q) ->
983+
QName = amqqueue:get_name(Q),
984+
case ets:lookup(queue_coarse_metrics, QName) of
985+
[{_, MR, MU, M, _}] ->
986+
[{messages_ready, MR},
987+
{messages_unacknowledged, MU},
988+
{messages, M}];
989+
[] ->
990+
[{messages_ready, 0},
991+
{messages_unacknowledged, 0},
992+
{messages, 0}]
993+
end.
994+
976995
i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q);
977996
i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q);
978997
i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q);

0 commit comments

Comments
 (0)