Skip to content

Commit 2f90384

Browse files
committed
Add list_stream_publishers CLI command
1 parent 1e62449 commit 2f90384

File tree

6 files changed

+270
-3
lines changed

6 files changed

+270
-3
lines changed

deps/rabbitmq_stream/include/rabbit_stream.hrl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,14 @@
8080
offset
8181
]).
8282

83+
-define(PUBLISHER_INFO_ITEMS, [
84+
stream,
85+
connection_pid,
86+
publisher_id,
87+
reference,
88+
messages_published,
89+
messages_confirmed,
90+
messages_errored
91+
]).
92+
8393
-define(STREAM_GUIDE_URL, <<"https://rabbitmq.com/stream.html">>).

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ usage_additional() ->
7777
Prefix = <<" must be one of ">>,
7878
InfoItems =
7979
'Elixir.Enum':join(
80-
lists:usort(?INFO_ITEMS), <<", ">>),
80+
lists:usort(?CONSUMER_INFO_ITEMS), <<", ">>),
8181
[{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}].
8282

8383
usage_doc_guides() ->
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
17+
18+
-include("rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-export([formatter/0,
23+
scopes/0,
24+
switches/0,
25+
aliases/0,
26+
usage/0,
27+
usage_additional/0,
28+
usage_doc_guides/0,
29+
banner/2,
30+
validate/2,
31+
merge_defaults/2,
32+
run/2,
33+
output/2,
34+
description/0,
35+
help_section/0]).
36+
37+
formatter() ->
38+
'Elixir.RabbitMQ.CLI.Formatters.Table'.
39+
40+
scopes() ->
41+
[ctl, diagnostics, streams].
42+
43+
switches() ->
44+
[{verbose, boolean}].
45+
46+
aliases() ->
47+
[{'V', verbose}].
48+
49+
description() ->
50+
<<"Lists all stream publishers for a vhost">>.
51+
52+
help_section() ->
53+
{plugin, stream}.
54+
55+
validate(Args, _) ->
56+
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
57+
?PUBLISHER_INFO_ITEMS)
58+
of
59+
{ok, _} ->
60+
ok;
61+
Error ->
62+
Error
63+
end.
64+
65+
merge_defaults([], Opts) ->
66+
merge_defaults([rabbit_data_coercion:to_binary(Item)
67+
|| Item <- ?PUBLISHER_INFO_ITEMS],
68+
Opts);
69+
merge_defaults(Args, Opts) ->
70+
{Args, maps:merge(#{verbose => false, vhost => <<"/">>}, Opts)}.
71+
72+
usage() ->
73+
<<"list_stream_publishers [--vhost <vhost>] [<column> "
74+
"...]">>.
75+
76+
usage_additional() ->
77+
Prefix = <<" must be one of ">>,
78+
InfoItems =
79+
'Elixir.Enum':join(
80+
lists:usort(?CONSUMER_INFO_ITEMS), <<", ">>),
81+
[{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}].
82+
83+
usage_doc_guides() ->
84+
[?STREAM_GUIDE_URL].
85+
86+
run(Args,
87+
#{node := NodeName,
88+
vhost := VHost,
89+
timeout := Timeout,
90+
verbose := Verbose}) ->
91+
InfoKeys =
92+
case Verbose of
93+
true ->
94+
?PUBLISHER_INFO_ITEMS;
95+
false ->
96+
'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
97+
end,
98+
Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
99+
100+
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(NodeName,
101+
rabbit_stream,
102+
emit_publisher_info_all,
103+
[Nodes, VHost,
104+
InfoKeys],
105+
Timeout,
106+
InfoKeys,
107+
length(Nodes)).
108+
109+
banner(_, _) ->
110+
<<"Listing stream publishers ...">>.
111+
112+
output(Result, _Opts) ->
113+
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
emit_connection_info_all/4,
2828
emit_consumer_info_all/5,
2929
emit_consumer_info_local/4,
30+
emit_publisher_info_all/5,
31+
emit_publisher_info_local/4,
3032
list/1]).
3133

3234
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -131,6 +133,25 @@ emit_consumer_info_local(VHost, Items, Ref, AggregatorPid) ->
131133
end,
132134
list(VHost)).
133135

136+
emit_publisher_info_all(Nodes, VHost, Items, Ref, AggregatorPid) ->
137+
Pids =
138+
[spawn_link(Node,
139+
rabbit_stream,
140+
emit_publisher_info_local,
141+
[VHost, Items, Ref, AggregatorPid])
142+
|| Node <- Nodes],
143+
rabbit_control_misc:await_emitters_termination(Pids),
144+
ok.
145+
146+
emit_publisher_info_local(VHost, Items, Ref, AggregatorPid) ->
147+
rabbit_control_misc:emitting_map_with_exit_handler(AggregatorPid,
148+
Ref,
149+
fun(Pid) ->
150+
rabbit_stream_reader:publishers_info(Pid,
151+
Items)
152+
end,
153+
list(VHost)).
154+
134155
list(VHost) ->
135156
[Client
136157
|| {_, ListSupPid, _, _}

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@
141141
init/1,
142142
info/2,
143143
consumers_info/2,
144+
publishers_info/2,
144145
in_vhost/2]).
145146

146147
start_link(KeepaliveSup, Transport, Ref, Opts) ->
@@ -663,6 +664,9 @@ listen_loop_post_auth(Transport,
663664
{'$gen_call', From, {consumers_info, Items}} ->
664665
gen_server:reply(From, consumers_infos(Items, State)),
665666
listen_loop_post_auth(Transport, Connection, State, Configuration);
667+
{'$gen_call', From, {publishers_info, Items}} ->
668+
gen_server:reply(From, publishers_infos(Items, Connection)),
669+
listen_loop_post_auth(Transport, Connection, State, Configuration);
666670
emit_stats ->
667671
Connection1 = emit_stats(Connection, State),
668672
listen_loop_post_auth(Transport, Connection1, State, Configuration);
@@ -2438,6 +2442,39 @@ consumer_i(connection_pid, _) ->
24382442
consumer_i(stream, #consumer{stream = S}) ->
24392443
S.
24402444

2445+
publishers_info(Pid, InfoItems) ->
2446+
case InfoItems -- ?PUBLISHER_INFO_ITEMS of
2447+
[] ->
2448+
gen_server2:call(Pid, {publishers_info, InfoItems});
2449+
UnknownItems ->
2450+
throw({bad_argument, UnknownItems})
2451+
end.
2452+
2453+
publishers_infos(Items,
2454+
#stream_connection{publishers = Publishers}) ->
2455+
[[{Item, publisher_i(Item, Publisher)} || Item <- Items]
2456+
|| Publisher <- maps:values(Publishers)].
2457+
2458+
publisher_i(stream, #publisher{stream = S}) ->
2459+
S;
2460+
publisher_i(connection_pid, _) ->
2461+
self();
2462+
publisher_i(publisher_id, #publisher{publisher_id = Id}) ->
2463+
Id;
2464+
publisher_i(reference, #publisher{reference = undefined}) ->
2465+
<<"">>;
2466+
publisher_i(reference, #publisher{reference = Ref}) ->
2467+
Ref;
2468+
publisher_i(messages_published,
2469+
#publisher{message_counters = Counters}) ->
2470+
messages_published(Counters);
2471+
publisher_i(messages_confirmed,
2472+
#publisher{message_counters = Counters}) ->
2473+
messages_confirmed(Counters);
2474+
publisher_i(messages_errored,
2475+
#publisher{message_counters = Counters}) ->
2476+
messages_errored(Counters).
2477+
24412478
info(Pid, InfoItems) ->
24422479
case InfoItems -- ?INFO_ITEMS of
24432480
[] ->

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@
1919
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
2020
-define(COMMAND_LIST_CONSUMERS,
2121
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
22+
-define(COMMAND_LIST_PUBLISHERS,
23+
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
2224

2325
all() ->
24-
[{group, list_connections}, {group, list_consumers}].
26+
[{group, list_connections}, {group, list_consumers},
27+
{group, list_publishers}].
2528

2629
groups() ->
2730
[{list_connections, [],
2831
[list_connections_merge_defaults, list_connections_run]},
2932
{list_consumers, [],
30-
[list_consumers_merge_defaults, list_consumers_run]}].
33+
[list_consumers_merge_defaults, list_consumers_run]},
34+
{list_publishers, [],
35+
[list_publishers_merge_defaults, list_publishers_run]}].
3136

3237
init_per_suite(Config) ->
3338
Config1 =
@@ -195,12 +200,84 @@ list_consumers_run(Config) ->
195200
wait_until_consumer_count(Config, 0),
196201
ok.
197202

203+
list_publishers_merge_defaults(_Config) ->
204+
DefaultItems =
205+
[rabbit_data_coercion:to_binary(Item)
206+
|| Item <- ?PUBLISHER_INFO_ITEMS],
207+
{DefaultItems, #{verbose := false}} =
208+
?COMMAND_LIST_PUBLISHERS:merge_defaults([], #{}),
209+
210+
{[<<"other_key">>], #{verbose := true}} =
211+
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
212+
#{verbose => true}),
213+
214+
{[<<"other_key">>], #{verbose := false}} =
215+
?COMMAND_LIST_PUBLISHERS:merge_defaults([<<"other_key">>],
216+
#{verbose => false}).
217+
218+
list_publishers_run(Config) ->
219+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
220+
Opts =
221+
#{node => Node,
222+
timeout => 10000,
223+
verbose => false,
224+
vhost => <<"/">>},
225+
226+
%% No connections, no publishers
227+
[] = to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts)),
228+
229+
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
230+
S1 = start_stream_connection(StreamPort),
231+
wait_until_connection_count(Config, 1),
232+
233+
Stream = <<"list_publishers_run">>,
234+
create_stream(S1, Stream),
235+
PubId = 42,
236+
declare_publisher(S1, PubId, Stream),
237+
238+
wait_until_publisher_count(Config, 1),
239+
240+
S2 = start_stream_connection(StreamPort),
241+
wait_until_connection_count(Config, 2),
242+
declare_publisher(S2, PubId, Stream),
243+
244+
wait_until_publisher_count(Config, 2),
245+
246+
%% Verbose returns all keys
247+
InfoItems = ?PUBLISHER_INFO_ITEMS,
248+
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, InfoItems),
249+
AllKeys = to_list(?COMMAND_LIST_PUBLISHERS:run(Infos, Opts)),
250+
Verbose =
251+
to_list(?COMMAND_LIST_PUBLISHERS:run([], Opts#{verbose => true})),
252+
?assertEqual(AllKeys, Verbose),
253+
%% There are two consumers
254+
[[First], [_Second]] = AllKeys,
255+
256+
%% Keys are info items
257+
?assertEqual(length(InfoItems), length(First)),
258+
259+
{Keys, _} = lists:unzip(First),
260+
261+
?assertEqual([], Keys -- InfoItems),
262+
?assertEqual([], InfoItems -- Keys),
263+
264+
delete_stream(S1, Stream),
265+
metadata_update_stream_deleted(S1, Stream),
266+
metadata_update_stream_deleted(S2, Stream),
267+
close(S1),
268+
close(S2),
269+
wait_until_publisher_count(Config, 0),
270+
ok.
271+
198272
create_stream(S, Stream) ->
199273
rabbit_stream_SUITE:test_create_stream(S, Stream).
200274

201275
subscribe(S, SubId, Stream) ->
202276
rabbit_stream_SUITE:test_subscribe(S, SubId, Stream).
203277

278+
declare_publisher(S, PubId, Stream) ->
279+
rabbit_stream_SUITE:test_declare_publisher(S, PubId, Stream).
280+
204281
delete_stream(S, Stream) ->
205282
rabbit_stream_SUITE:test_delete_stream(S, Stream).
206283

@@ -243,6 +320,15 @@ wait_until_consumer_count(Config, Expected) ->
243320
test_utils:wait_until(fun() -> consumer_count(Config) == Expected
244321
end).
245322

323+
publisher_count(Config) ->
324+
command_result_count(?COMMAND_LIST_PUBLISHERS:run([<<"stream">>],
325+
options(Config))).
326+
327+
wait_until_publisher_count(Config, Expected) ->
328+
ok =
329+
test_utils:wait_until(fun() -> publisher_count(Config) == Expected
330+
end).
331+
246332
start_stream_connection(Port) ->
247333
{ok, S} =
248334
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),

0 commit comments

Comments
 (0)