Skip to content

Commit 9fd8231

Browse files
Merge pull request #14597 from rabbitmq/mergify/bp/v4.2.x/pr-14560
Introduce a 'rabbitmq-diagnostics message_size_stats' for reasoning about message size distribution (backport #14560)
2 parents 24a175d + 81933e9 commit 9fd8231

File tree

4 files changed

+306
-38
lines changed

4 files changed

+306
-38
lines changed

deps/rabbit/src/rabbit_msg_size_metrics.erl

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111

1212
-export([init/1,
1313
observe/2,
14-
prometheus_format/0]).
14+
prometheus_format/0,
15+
local_summary/0,
16+
cluster_summary/0,
17+
cluster_summary_for_cli/0 ]).
1518

1619
%% Integration tests.
1720
-export([raw_buckets/1,
@@ -52,6 +55,9 @@
5255
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
5356
NumObservations :: non_neg_integer()}].
5457

58+
-type summary_entry() :: {{non_neg_integer(), non_neg_integer() | infinity}, {non_neg_integer(), float()}}.
59+
-type summary() :: [summary_entry()].
60+
5561
-spec init(atom()) -> ok.
5662
init(Protocol) ->
5763
Size = ?POS_MSG_SIZE_SUM,
@@ -133,6 +139,109 @@ get_labels_counters() ->
133139
[{[{protocol, Protocol}], Counters}
134140
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].
135141

142+
get_protocols() ->
143+
[Protocol
144+
|| {{?MODULE, Protocol}, _} <- persistent_term:get()].
145+
146+
%% Aggregates data for all protocols on the local node
147+
-spec local_summary() -> summary().
148+
local_summary() ->
149+
PerProtocolBuckets = lists:map(fun(Protocol) ->
150+
raw_buckets(Protocol)
151+
end, get_protocols()),
152+
153+
%% Sum buckets for all protocols
154+
Buckets0 = [{?BUCKET_1, 0}, {?BUCKET_2, 0}, {?BUCKET_3, 0}, {?BUCKET_4, 0},
155+
{?BUCKET_5, 0}, {?BUCKET_6, 0}, {?BUCKET_7, 0}, {?BUCKET_8, 0}, {?BUCKET_9, 0}],
156+
Buckets = lists:foldl(fun sum_protocol_buckets/2,
157+
Buckets0,
158+
PerProtocolBuckets),
159+
160+
Total = lists:sum([Count || {_UpperBound, Count} <- Buckets]),
161+
162+
Ranges = lists:map(fun({UpperBound, Count}) ->
163+
Percentage = case Total of
164+
0 -> 0.0;
165+
_ -> (Count / Total) * 100
166+
end,
167+
{bucket_range(UpperBound), {Count, Percentage}}
168+
end, Buckets),
169+
170+
Ranges.
171+
172+
sum_protocol_buckets(ProtocolBuckets, Acc) ->
173+
lists:map(fun({UpperBound, AccCount}) ->
174+
ProtocolCount = proplists:get_value(UpperBound, ProtocolBuckets, 0),
175+
{UpperBound, AccCount + ProtocolCount}
176+
end, Acc).
177+
178+
%% Aggregates sumamries from all nodes
179+
-spec cluster_summary() -> summary().
180+
cluster_summary() ->
181+
RemoteNodes = [Node || Node <- rabbit_nodes:list_running(), Node =/= node()],
182+
RemoteSummaries = [ Summary || {ok, Summary} <- erpc:multicall(RemoteNodes,
183+
?MODULE,
184+
local_summary,
185+
[],
186+
5000)],
187+
lists:foldl(fun merge_summaries/2, local_summary(), RemoteSummaries).
188+
189+
bucket_name({_, ?BUCKET_1}) -> <<"below 100B">>;
190+
bucket_name({_, ?BUCKET_2}) -> <<"between 100B and 1KB">>;
191+
bucket_name({_, ?BUCKET_3}) -> <<"between 1KB and 10KB">>;
192+
bucket_name({_, ?BUCKET_4}) -> <<"between 10KB and 100KB">>;
193+
bucket_name({_, ?BUCKET_5}) -> <<"between 100KB and 1MB">>;
194+
bucket_name({_, ?BUCKET_6}) -> <<"between 1MB and 10MB">>;
195+
bucket_name({_, ?BUCKET_7}) -> <<"between 10MB and 50MB">>;
196+
bucket_name({_, ?BUCKET_8}) -> <<"between 50MB and 100MB">>;
197+
bucket_name({_, ?BUCKET_9}) -> <<"above 100MB">>.
198+
199+
cluster_summary_for_cli() ->
200+
[[{<<"Message Size">>, bucket_name(Range)},
201+
{<<"Count">>, Count},
202+
{<<"Percentage">>, iolist_to_binary(io_lib:format("~.2f", [Percentage]))}]
203+
|| {Range, {Count, Percentage}} <- cluster_summary()].
204+
205+
get_count_for_range(Range, SummaryList) ->
206+
case proplists:get_value(Range, SummaryList) of
207+
{Count, _} -> Count;
208+
undefined -> 0
209+
end.
210+
211+
%% Merges two summary lists by adding their counts and recalculating percentages
212+
merge_summaries(Summary1, Summary2) ->
213+
%% Get all bucket ranges
214+
AllRanges = lists:usort([Range || {Range, _} <- Summary1] ++ [Range || {Range, _} <- Summary2]),
215+
216+
MergedRanges = lists:map(fun(Range) ->
217+
Count1 = get_count_for_range(Range, Summary1),
218+
Count2 = get_count_for_range(Range, Summary2),
219+
NewCount = Count1 + Count2,
220+
{Range, NewCount}
221+
end, AllRanges),
222+
223+
%% Calculate total and percentages
224+
NewTotal = lists:sum([Count || {_, Count} <- MergedRanges]),
225+
FinalRanges = lists:map(fun({Range, Count}) ->
226+
NewPercentage = case NewTotal of
227+
0 -> 0.0;
228+
_ -> (Count / NewTotal) * 100
229+
end,
230+
{Range, {Count, NewPercentage}}
231+
end, MergedRanges),
232+
233+
FinalRanges.
234+
235+
bucket_range(?BUCKET_1) -> {0, 100};
236+
bucket_range(?BUCKET_2) -> {101, 1000};
237+
bucket_range(?BUCKET_3) -> {1001, 10000};
238+
bucket_range(?BUCKET_4) -> {10001, 100000};
239+
bucket_range(?BUCKET_5) -> {100001, 1000000};
240+
bucket_range(?BUCKET_6) -> {1000001, 10000000};
241+
bucket_range(?BUCKET_7) -> {10000001, 50000000};
242+
bucket_range(?BUCKET_8) -> {50000001, 100000000};
243+
bucket_range(?BUCKET_9) -> {100000001, infinity}.
244+
136245
-ifdef(TEST).
137246
%% "Counters are not tied to the current process and are automatically
138247
%% garbage collected when they are no longer referenced."

deps/rabbit/test/msg_size_metrics_SUITE.erl

Lines changed: 85 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
-include_lib("amqp_client/include/amqp_client.hrl").
1414

1515
-import(rabbit_ct_broker_helpers,
16-
[rpc/4]).
16+
[rpc/4, rpc/5]).
1717

1818
all() ->
1919
[
@@ -22,9 +22,11 @@ all() ->
2222

2323
groups() ->
2424
[
25-
{tests, [shuffle],
26-
[message_size,
27-
over_max_message_size]}
25+
{tests, [],
26+
[summary, %% needs to run first
27+
message_size,
28+
over_max_message_size]
29+
}
2830
].
2931

3032
%% -------------------------------------------------------------------
@@ -34,14 +36,18 @@ groups() ->
3436
init_per_suite(Config) ->
3537
{ok, _} = application:ensure_all_started(amqp10_client),
3638
rabbit_ct_helpers:log_environment(),
37-
rabbit_ct_helpers:run_setup_steps(Config).
39+
Config.
3840

3941
end_per_suite(Config) ->
4042
rabbit_ct_helpers:run_teardown_steps(Config).
4143

4244
init_per_group(_Group, Config) ->
43-
rabbit_ct_helpers:run_steps(
44-
Config,
45+
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
46+
Config1 = rabbit_ct_helpers:set_config(
47+
Config, [{rmq_nodes_count, 3},
48+
{rmq_nodename_suffix, Suffix}]),
49+
rabbit_ct_helpers:run_setup_steps(
50+
Config1,
4551
rabbit_ct_broker_helpers:setup_steps() ++
4652
rabbit_ct_client_helpers:setup_steps()).
4753

@@ -51,6 +57,13 @@ end_per_group(_Group, Config) ->
5157
rabbit_ct_client_helpers:teardown_steps() ++
5258
rabbit_ct_broker_helpers:teardown_steps()).
5359

60+
init_per_testcase(summary, Config) ->
61+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
62+
ok ->
63+
rabbit_ct_helpers:testcase_started(Config, sumary);
64+
{skip, _} = Skip ->
65+
Skip
66+
end;
5467
init_per_testcase(Testcase, Config) ->
5568
rabbit_ct_helpers:testcase_started(Config, Testcase).
5669

@@ -65,32 +78,7 @@ message_size(Config) ->
6578
AmqplBefore = get_msg_size_metrics(amqp091, Config),
6679
AmqpBefore = get_msg_size_metrics(amqp10, Config),
6780

68-
Binary2B = <<"12">>,
69-
Binary200K = binary:copy(<<"x">>, 200_000),
70-
Payloads = [Binary2B, Binary200K, Binary2B],
71-
72-
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
73-
[amqp_channel:call(Ch,
74-
#'basic.publish'{routing_key = <<"nowhere">>},
75-
#amqp_msg{payload = Payload})
76-
|| Payload <- Payloads],
77-
78-
OpnConf = connection_config(Config),
79-
{ok, Connection} = amqp10_client:open_connection(OpnConf),
80-
{ok, Session} = amqp10_client:begin_session_sync(Connection),
81-
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
82-
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
83-
receive {amqp10_event, {link, Sender, credited}} -> ok
84-
after 30_000 -> ct:fail(credited_timeout)
85-
end,
86-
87-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
88-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
89-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
90-
91-
ok = wait_for_settlement(released, <<"tag1">>),
92-
ok = wait_for_settlement(released, <<"tag2">>),
93-
ok = wait_for_settlement(released, <<"tag3">>),
81+
publish_messages(Config),
9482

9583
AmqplAfter = get_msg_size_metrics(amqp091, Config),
9684
AmqpAfter = get_msg_size_metrics(amqp10, Config),
@@ -100,10 +88,7 @@ message_size(Config) ->
10088
?assertEqual(ExpectedDiff,
10189
rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)),
10290
?assertEqual(ExpectedDiff,
103-
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)),
104-
105-
ok = amqp10_client:close_connection(Connection),
106-
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
91+
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)).
10792

10893
over_max_message_size(Config) ->
10994
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
@@ -134,6 +119,39 @@ over_max_message_size(Config) ->
134119
ok = rabbit_ct_client_helpers:close_connection(Conn),
135120
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).
136121

122+
summary(Config) ->
123+
ZeroSummary = [{{0, 100}, {0, 0.0}},
124+
{{101, 1000}, {0, 0.0}},
125+
{{1001, 10000}, {0, 0.0}},
126+
{{10001, 100000}, {0, 0.0}},
127+
{{100001, 1000000}, {0, 0.0}},
128+
{{1000001, 10000000}, {0, 0.0}},
129+
{{10000001, 50000000}, {0, 0.0}},
130+
{{50000001, 100000000}, {0, 0.0}},
131+
{{100000001, infinity}, {0, 0.0}}],
132+
133+
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
134+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
135+
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
136+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
137+
138+
publish_messages(Config),
139+
140+
ExpectedSummary = [{{0, 100}, {4, 66.66666666666666}},
141+
{{101, 1000}, {0, 0.0}},
142+
{{1001, 10000}, {0, 0.0}},
143+
{{10001, 100000}, {0, 0.0}},
144+
{{100001, 1000000}, {2, 33.33333333333333}},
145+
{{1000001, 10000000}, {0, 0.0}},
146+
{{10000001, 50000000}, {0, 0.0}},
147+
{{50000001, 100000000}, {0, 0.0}},
148+
{{100000001, infinity}, {0, 0.0}}],
149+
150+
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
151+
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, cluster_summary, [])),
152+
?assertEqual(ExpectedSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
153+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, local_summary, [])).
154+
137155
get_msg_size_metrics(Protocol, Config) ->
138156
rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]).
139157

@@ -145,6 +163,36 @@ connection_config(Config) ->
145163
container_id => <<"my container">>,
146164
sasl => anon}.
147165

166+
publish_messages(Config) ->
167+
Binary2B = <<"12">>,
168+
Binary200K = binary:copy(<<"x">>, 200_000),
169+
Payloads = [Binary2B, Binary200K, Binary2B],
170+
171+
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
172+
[amqp_channel:call(Ch,
173+
#'basic.publish'{routing_key = <<"nowhere">>},
174+
#amqp_msg{payload = Payload})
175+
|| Payload <- Payloads],
176+
177+
OpnConf = connection_config(Config),
178+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
179+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
180+
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
181+
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
182+
receive {amqp10_event, {link, Sender, credited}} -> ok
183+
after 30_000 -> ct:fail(credited_timeout)
184+
end,
185+
186+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
187+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
188+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
189+
190+
ok = wait_for_settlement(released, <<"tag1">>),
191+
ok = wait_for_settlement(released, <<"tag2">>),
192+
ok = wait_for_settlement(released, <<"tag3">>),
193+
ok = amqp10_client:close_connection(Connection),
194+
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
195+
148196
wait_for_settlement(State, Tag) ->
149197
receive
150198
{amqp10_disposition, {State, Tag}} ->
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand do
8+
alias RabbitMQ.CLI.Core.DocGuide
9+
10+
@behaviour RabbitMQ.CLI.CommandBehaviour
11+
12+
@default_timeout 60_000
13+
14+
def scopes(), do: [:diagnostics]
15+
16+
def switches(), do: [timeout: :integer]
17+
def aliases(), do: [t: :timeout]
18+
19+
def merge_defaults(args, opts) do
20+
timeout =
21+
case opts[:timeout] do
22+
nil -> @default_timeout
23+
:infinity -> @default_timeout
24+
other -> other
25+
end
26+
27+
{args, Map.merge(%{timeout: timeout}, opts)}
28+
end
29+
30+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
31+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
32+
33+
def run([], %{node: node_name, timeout: timeout}) do
34+
:rabbit_misc.rpc_call(node_name, :rabbit_msg_size_metrics, :cluster_summary_for_cli, [], timeout)
35+
end
36+
37+
use RabbitMQ.CLI.DefaultOutput
38+
39+
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
40+
41+
def usage, do: "message_size_stats"
42+
43+
def usage_doc_guides() do
44+
[
45+
DocGuide.monitoring()
46+
]
47+
end
48+
49+
def help_section(), do: :observability_and_health_checks
50+
51+
def description(),
52+
do: "Displays message size distribution statistics aggregated across all cluster nodes"
53+
54+
def banner(_, %{node: node_name}), do: "Gathering message size statistics across the cluster using node #{node_name} ..."
55+
56+
end

0 commit comments

Comments
 (0)