Skip to content

Commit 2acb576

Browse files
committed
Merge branch 'rabbitmq-stream-publisher-consumer-cli'
2 parents 098b69d + 2f90384 commit 2acb576

10 files changed

+763
-177
lines changed

deps/rabbitmq_stream/include/rabbit_stream.hrl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,23 @@
7171
connected_at
7272
]).
7373

74+
-define(CONSUMER_INFO_ITEMS, [
75+
stream,
76+
connection_pid,
77+
subscription_id,
78+
credits,
79+
messages,
80+
offset
81+
]).
82+
83+
-define(PUBLISHER_INFO_ITEMS, [
84+
stream,
85+
connection_pid,
86+
publisher_id,
87+
reference,
88+
messages_published,
89+
messages_confirmed,
90+
messages_errored
91+
]).
92+
7493
-define(STREAM_GUIDE_URL, <<"https://rabbitmq.com/stream.html">>).

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ aliases() ->
4747
[{'V', verbose}].
4848

4949
description() ->
50-
<<"Lists stream connections on the target node">>.
50+
<<"Lists stream connections">>.
5151

5252
help_section() ->
5353
{plugin, stream}.
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
17+
18+
-include("rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-export([formatter/0,
23+
scopes/0,
24+
switches/0,
25+
aliases/0,
26+
usage/0,
27+
usage_additional/0,
28+
usage_doc_guides/0,
29+
banner/2,
30+
validate/2,
31+
merge_defaults/2,
32+
run/2,
33+
output/2,
34+
description/0,
35+
help_section/0]).
36+
37+
formatter() ->
38+
'Elixir.RabbitMQ.CLI.Formatters.Table'.
39+
40+
scopes() ->
41+
[ctl, diagnostics, streams].
42+
43+
switches() ->
44+
[{verbose, boolean}].
45+
46+
aliases() ->
47+
[{'V', verbose}].
48+
49+
description() ->
50+
<<"Lists all stream consumers for a vhost">>.
51+
52+
help_section() ->
53+
{plugin, stream}.
54+
55+
validate(Args, _) ->
56+
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
57+
?CONSUMER_INFO_ITEMS)
58+
of
59+
{ok, _} ->
60+
ok;
61+
Error ->
62+
Error
63+
end.
64+
65+
merge_defaults([], Opts) ->
66+
merge_defaults([rabbit_data_coercion:to_binary(Item)
67+
|| Item <- ?CONSUMER_INFO_ITEMS],
68+
Opts);
69+
merge_defaults(Args, Opts) ->
70+
{Args, maps:merge(#{verbose => false, vhost => <<"/">>}, Opts)}.
71+
72+
usage() ->
73+
<<"list_stream_consumers [--vhost <vhost>] [<column> "
74+
"...]">>.
75+
76+
usage_additional() ->
77+
Prefix = <<" must be one of ">>,
78+
InfoItems =
79+
'Elixir.Enum':join(
80+
lists:usort(?CONSUMER_INFO_ITEMS), <<", ">>),
81+
[{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}].
82+
83+
usage_doc_guides() ->
84+
[?STREAM_GUIDE_URL].
85+
86+
run(Args,
87+
#{node := NodeName,
88+
vhost := VHost,
89+
timeout := Timeout,
90+
verbose := Verbose}) ->
91+
InfoKeys =
92+
case Verbose of
93+
true ->
94+
?CONSUMER_INFO_ITEMS;
95+
false ->
96+
'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
97+
end,
98+
Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
99+
100+
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(NodeName,
101+
rabbit_stream,
102+
emit_consumer_info_all,
103+
[Nodes, VHost,
104+
InfoKeys],
105+
Timeout,
106+
InfoKeys,
107+
length(Nodes)).
108+
109+
banner(_, _) ->
110+
<<"Listing stream consumers ...">>.
111+
112+
output(Result, _Opts) ->
113+
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
17+
18+
-include("rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-export([formatter/0,
23+
scopes/0,
24+
switches/0,
25+
aliases/0,
26+
usage/0,
27+
usage_additional/0,
28+
usage_doc_guides/0,
29+
banner/2,
30+
validate/2,
31+
merge_defaults/2,
32+
run/2,
33+
output/2,
34+
description/0,
35+
help_section/0]).
36+
37+
formatter() ->
38+
'Elixir.RabbitMQ.CLI.Formatters.Table'.
39+
40+
scopes() ->
41+
[ctl, diagnostics, streams].
42+
43+
switches() ->
44+
[{verbose, boolean}].
45+
46+
aliases() ->
47+
[{'V', verbose}].
48+
49+
description() ->
50+
<<"Lists all stream publishers for a vhost">>.
51+
52+
help_section() ->
53+
{plugin, stream}.
54+
55+
validate(Args, _) ->
56+
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
57+
?PUBLISHER_INFO_ITEMS)
58+
of
59+
{ok, _} ->
60+
ok;
61+
Error ->
62+
Error
63+
end.
64+
65+
merge_defaults([], Opts) ->
66+
merge_defaults([rabbit_data_coercion:to_binary(Item)
67+
|| Item <- ?PUBLISHER_INFO_ITEMS],
68+
Opts);
69+
merge_defaults(Args, Opts) ->
70+
{Args, maps:merge(#{verbose => false, vhost => <<"/">>}, Opts)}.
71+
72+
usage() ->
73+
<<"list_stream_publishers [--vhost <vhost>] [<column> "
74+
"...]">>.
75+
76+
usage_additional() ->
77+
Prefix = <<" must be one of ">>,
78+
InfoItems =
79+
'Elixir.Enum':join(
80+
lists:usort(?CONSUMER_INFO_ITEMS), <<", ">>),
81+
[{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}].
82+
83+
usage_doc_guides() ->
84+
[?STREAM_GUIDE_URL].
85+
86+
run(Args,
87+
#{node := NodeName,
88+
vhost := VHost,
89+
timeout := Timeout,
90+
verbose := Verbose}) ->
91+
InfoKeys =
92+
case Verbose of
93+
true ->
94+
?PUBLISHER_INFO_ITEMS;
95+
false ->
96+
'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
97+
end,
98+
Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
99+
100+
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(NodeName,
101+
rabbit_stream,
102+
emit_publisher_info_all,
103+
[Nodes, VHost,
104+
InfoKeys],
105+
Timeout,
106+
InfoKeys,
107+
length(Nodes)).
108+
109+
banner(_, _) ->
110+
<<"Listing stream publishers ...">>.
111+
112+
output(Result, _Opts) ->
113+
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
-export([stop/1]).
2626
-export([emit_connection_info_local/3,
2727
emit_connection_info_all/4,
28-
list/0]).
28+
emit_consumer_info_all/5,
29+
emit_consumer_info_local/4,
30+
emit_publisher_info_all/5,
31+
emit_publisher_info_local/4,
32+
list/1]).
2933

3034
-include_lib("rabbit_common/include/rabbit.hrl").
3135

@@ -108,9 +112,47 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
108112
rabbit_stream_reader:info(Pid,
109113
Items)
110114
end,
111-
list()).
115+
list(undefined)).
112116

113-
list() ->
117+
emit_consumer_info_all(Nodes, VHost, Items, Ref, AggregatorPid) ->
118+
Pids =
119+
[spawn_link(Node,
120+
rabbit_stream,
121+
emit_consumer_info_local,
122+
[VHost, Items, Ref, AggregatorPid])
123+
|| Node <- Nodes],
124+
rabbit_control_misc:await_emitters_termination(Pids),
125+
ok.
126+
127+
emit_consumer_info_local(VHost, Items, Ref, AggregatorPid) ->
128+
rabbit_control_misc:emitting_map_with_exit_handler(AggregatorPid,
129+
Ref,
130+
fun(Pid) ->
131+
rabbit_stream_reader:consumers_info(Pid,
132+
Items)
133+
end,
134+
list(VHost)).
135+
136+
emit_publisher_info_all(Nodes, VHost, Items, Ref, AggregatorPid) ->
137+
Pids =
138+
[spawn_link(Node,
139+
rabbit_stream,
140+
emit_publisher_info_local,
141+
[VHost, Items, Ref, AggregatorPid])
142+
|| Node <- Nodes],
143+
rabbit_control_misc:await_emitters_termination(Pids),
144+
ok.
145+
146+
emit_publisher_info_local(VHost, Items, Ref, AggregatorPid) ->
147+
rabbit_control_misc:emitting_map_with_exit_handler(AggregatorPid,
148+
Ref,
149+
fun(Pid) ->
150+
rabbit_stream_reader:publishers_info(Pid,
151+
Items)
152+
end,
153+
list(VHost)).
154+
155+
list(VHost) ->
114156
[Client
115157
|| {_, ListSupPid, _, _}
116158
<- supervisor2:which_children(rabbit_stream_sup),
@@ -120,4 +162,5 @@ list() ->
120162
<- supervisor:which_children(RanchSup),
121163
{_, CliSup, _, _} <- supervisor:which_children(ConnSup),
122164
{rabbit_stream_reader, Client, _, _}
123-
<- supervisor:which_children(CliSup)].
165+
<- supervisor:which_children(CliSup),
166+
rabbit_stream_reader:in_vhost(Client, VHost)].

0 commit comments

Comments
 (0)