Skip to content

Commit 1e62449

Browse files
committed
Add list_stream_consumers CLI command
1 parent a559c79 commit 1e62449

9 files changed

+496
-177
lines changed

deps/rabbitmq_stream/include/rabbit_stream.hrl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,13 @@
7171
connected_at
7272
]).
7373

74+
-define(CONSUMER_INFO_ITEMS, [
75+
stream,
76+
connection_pid,
77+
subscription_id,
78+
credits,
79+
messages,
80+
offset
81+
]).
82+
7483
-define(STREAM_GUIDE_URL, <<"https://rabbitmq.com/stream.html">>).

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ aliases() ->
4747
[{'V', verbose}].
4848

4949
description() ->
50-
<<"Lists stream connections on the target node">>.
50+
<<"Lists stream connections">>.
5151

5252
help_section() ->
5353
{plugin, stream}.
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
17+
18+
-include("rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-export([formatter/0,
23+
scopes/0,
24+
switches/0,
25+
aliases/0,
26+
usage/0,
27+
usage_additional/0,
28+
usage_doc_guides/0,
29+
banner/2,
30+
validate/2,
31+
merge_defaults/2,
32+
run/2,
33+
output/2,
34+
description/0,
35+
help_section/0]).
36+
37+
formatter() ->
38+
'Elixir.RabbitMQ.CLI.Formatters.Table'.
39+
40+
scopes() ->
41+
[ctl, diagnostics, streams].
42+
43+
switches() ->
44+
[{verbose, boolean}].
45+
46+
aliases() ->
47+
[{'V', verbose}].
48+
49+
description() ->
50+
<<"Lists all stream consumers for a vhost">>.
51+
52+
help_section() ->
53+
{plugin, stream}.
54+
55+
validate(Args, _) ->
56+
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
57+
?CONSUMER_INFO_ITEMS)
58+
of
59+
{ok, _} ->
60+
ok;
61+
Error ->
62+
Error
63+
end.
64+
65+
merge_defaults([], Opts) ->
66+
merge_defaults([rabbit_data_coercion:to_binary(Item)
67+
|| Item <- ?CONSUMER_INFO_ITEMS],
68+
Opts);
69+
merge_defaults(Args, Opts) ->
70+
{Args, maps:merge(#{verbose => false, vhost => <<"/">>}, Opts)}.
71+
72+
usage() ->
73+
<<"list_stream_consumers [--vhost <vhost>] [<column> "
74+
"...]">>.
75+
76+
usage_additional() ->
77+
Prefix = <<" must be one of ">>,
78+
InfoItems =
79+
'Elixir.Enum':join(
80+
lists:usort(?INFO_ITEMS), <<", ">>),
81+
[{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}].
82+
83+
usage_doc_guides() ->
84+
[?STREAM_GUIDE_URL].
85+
86+
run(Args,
87+
#{node := NodeName,
88+
vhost := VHost,
89+
timeout := Timeout,
90+
verbose := Verbose}) ->
91+
InfoKeys =
92+
case Verbose of
93+
true ->
94+
?CONSUMER_INFO_ITEMS;
95+
false ->
96+
'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
97+
end,
98+
Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
99+
100+
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(NodeName,
101+
rabbit_stream,
102+
emit_consumer_info_all,
103+
[Nodes, VHost,
104+
InfoKeys],
105+
Timeout,
106+
InfoKeys,
107+
length(Nodes)).
108+
109+
banner(_, _) ->
110+
<<"Listing stream consumers ...">>.
111+
112+
output(Result, _Opts) ->
113+
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
-export([stop/1]).
2626
-export([emit_connection_info_local/3,
2727
emit_connection_info_all/4,
28-
list/0]).
28+
emit_consumer_info_all/5,
29+
emit_consumer_info_local/4,
30+
list/1]).
2931

3032
-include_lib("rabbit_common/include/rabbit.hrl").
3133

@@ -108,9 +110,28 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
108110
rabbit_stream_reader:info(Pid,
109111
Items)
110112
end,
111-
list()).
113+
list(undefined)).
112114

113-
list() ->
115+
emit_consumer_info_all(Nodes, VHost, Items, Ref, AggregatorPid) ->
116+
Pids =
117+
[spawn_link(Node,
118+
rabbit_stream,
119+
emit_consumer_info_local,
120+
[VHost, Items, Ref, AggregatorPid])
121+
|| Node <- Nodes],
122+
rabbit_control_misc:await_emitters_termination(Pids),
123+
ok.
124+
125+
emit_consumer_info_local(VHost, Items, Ref, AggregatorPid) ->
126+
rabbit_control_misc:emitting_map_with_exit_handler(AggregatorPid,
127+
Ref,
128+
fun(Pid) ->
129+
rabbit_stream_reader:consumers_info(Pid,
130+
Items)
131+
end,
132+
list(VHost)).
133+
134+
list(VHost) ->
114135
[Client
115136
|| {_, ListSupPid, _, _}
116137
<- supervisor2:which_children(rabbit_stream_sup),
@@ -120,4 +141,5 @@ list() ->
120141
<- supervisor:which_children(RanchSup),
121142
{_, CliSup, _, _} <- supervisor:which_children(ConnSup),
122143
{rabbit_stream_reader, Client, _, _}
123-
<- supervisor:which_children(CliSup)].
144+
<- supervisor:which_children(CliSup),
145+
rabbit_stream_reader:in_vhost(Client, VHost)].

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@
139139
%% API
140140
-export([start_link/4,
141141
init/1,
142-
info/2]).
142+
info/2,
143+
consumers_info/2,
144+
in_vhost/2]).
143145

144146
start_link(KeepaliveSup, Transport, Ref, Opts) ->
145147
Pid = proc_lib:spawn_link(?MODULE, init,
@@ -658,6 +660,9 @@ listen_loop_post_auth(Transport,
658660
{'$gen_call', From, {info, Items}} ->
659661
gen_server:reply(From, infos(Items, Connection, State)),
660662
listen_loop_post_auth(Transport, Connection, State, Configuration);
663+
{'$gen_call', From, {consumers_info, Items}} ->
664+
gen_server:reply(From, consumers_infos(Items, State)),
665+
listen_loop_post_auth(Transport, Connection, State, Configuration);
661666
emit_stats ->
662667
Connection1 = emit_stats(Connection, State),
663668
listen_loop_post_auth(Transport, Connection1, State, Configuration);
@@ -2397,6 +2402,42 @@ ensure_stats_timer(Connection = #stream_connection{}) ->
23972402
rabbit_event:ensure_stats_timer(Connection,
23982403
#stream_connection.stats_timer, emit_stats).
23992404

2405+
in_vhost(_Pid, undefined) ->
2406+
true;
2407+
in_vhost(Pid, VHost) ->
2408+
case info(Pid, [vhost]) of
2409+
[{vhost, VHost}] ->
2410+
true;
2411+
_ ->
2412+
false
2413+
end.
2414+
2415+
consumers_info(Pid, InfoItems) ->
2416+
case InfoItems -- ?CONSUMER_INFO_ITEMS of
2417+
[] ->
2418+
gen_server2:call(Pid, {consumers_info, InfoItems});
2419+
UnknownItems ->
2420+
throw({bad_argument, UnknownItems})
2421+
end.
2422+
2423+
consumers_infos(Items,
2424+
#stream_connection_state{consumers = Consumers}) ->
2425+
[[{Item, consumer_i(Item, Consumer)} || Item <- Items]
2426+
|| Consumer <- maps:values(Consumers)].
2427+
2428+
consumer_i(subscription_id, #consumer{subscription_id = SubId}) ->
2429+
SubId;
2430+
consumer_i(credits, #consumer{credit = Credits}) ->
2431+
Credits;
2432+
consumer_i(messages, #consumer{counters = Counters}) ->
2433+
messages_consumed(Counters);
2434+
consumer_i(offset, #consumer{counters = Counters}) ->
2435+
consumer_offset(Counters);
2436+
consumer_i(connection_pid, _) ->
2437+
self();
2438+
consumer_i(stream, #consumer{stream = S}) ->
2439+
S.
2440+
24002441
info(Pid, InfoItems) ->
24012442
case InfoItems -- ?INFO_ITEMS of
24022443
[] ->

deps/rabbitmq_stream/test/command_SUITE.erl

Lines changed: 0 additions & 145 deletions
This file was deleted.

0 commit comments

Comments
 (0)