Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion deps/rabbit/src/rabbit_msg_size_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

-export([init/1,
observe/2,
prometheus_format/0]).
prometheus_format/0,
local_summary/0,
cluster_summary/0,
cluster_summary_for_cli/0 ]).

%% Integration tests.
-export([raw_buckets/1,
Expand Down Expand Up @@ -52,6 +55,9 @@
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
NumObservations :: non_neg_integer()}].

-type summary_entry() :: {{non_neg_integer(), non_neg_integer() | infinity}, {non_neg_integer(), float()}}.
-type summary() :: [summary_entry()].

-spec init(atom()) -> ok.
init(Protocol) ->
Size = ?POS_MSG_SIZE_SUM,
Expand Down Expand Up @@ -133,6 +139,109 @@ get_labels_counters() ->
[{[{protocol, Protocol}], Counters}
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].

get_protocols() ->
[Protocol
|| {{?MODULE, Protocol}, _} <- persistent_term:get()].

%% Aggregates data for all protocols on the local node
-spec local_summary() -> summary().
local_summary() ->
PerProtocolBuckets = lists:map(fun(Protocol) ->
raw_buckets(Protocol)
end, get_protocols()),

%% Sum buckets for all protocols
Buckets0 = [{?BUCKET_1, 0}, {?BUCKET_2, 0}, {?BUCKET_3, 0}, {?BUCKET_4, 0},
{?BUCKET_5, 0}, {?BUCKET_6, 0}, {?BUCKET_7, 0}, {?BUCKET_8, 0}, {?BUCKET_9, 0}],
Buckets = lists:foldl(fun sum_protocol_buckets/2,
Buckets0,
PerProtocolBuckets),

Total = lists:sum([Count || {_UpperBound, Count} <- Buckets]),

Ranges = lists:map(fun({UpperBound, Count}) ->
Percentage = case Total of
0 -> 0.0;
_ -> (Count / Total) * 100
end,
{bucket_range(UpperBound), {Count, Percentage}}
end, Buckets),

Ranges.

sum_protocol_buckets(ProtocolBuckets, Acc) ->
lists:map(fun({UpperBound, AccCount}) ->
ProtocolCount = proplists:get_value(UpperBound, ProtocolBuckets, 0),
{UpperBound, AccCount + ProtocolCount}
end, Acc).

%% Aggregates sumamries from all nodes
-spec cluster_summary() -> summary().
cluster_summary() ->
RemoteNodes = [Node || Node <- rabbit_nodes:list_running(), Node =/= node()],
RemoteSummaries = [ Summary || {ok, Summary} <- erpc:multicall(RemoteNodes,
?MODULE,
local_summary,
[],
5000)],
lists:foldl(fun merge_summaries/2, local_summary(), RemoteSummaries).

bucket_name({_, ?BUCKET_1}) -> <<"below 100B">>;
bucket_name({_, ?BUCKET_2}) -> <<"between 100B and 1KB">>;
bucket_name({_, ?BUCKET_3}) -> <<"between 1KB and 10KB">>;
bucket_name({_, ?BUCKET_4}) -> <<"between 10KB and 100KB">>;
bucket_name({_, ?BUCKET_5}) -> <<"between 100KB and 1MB">>;
bucket_name({_, ?BUCKET_6}) -> <<"between 1MB and 10MB">>;
bucket_name({_, ?BUCKET_7}) -> <<"between 10MB and 50MB">>;
bucket_name({_, ?BUCKET_8}) -> <<"between 50MB and 100MB">>;
bucket_name({_, ?BUCKET_9}) -> <<"above 100MB">>.

cluster_summary_for_cli() ->
[[{<<"Message Size">>, bucket_name(Range)},
{<<"Count">>, Count},
{<<"Percentage">>, iolist_to_binary(io_lib:format("~.2f", [Percentage]))}]
|| {Range, {Count, Percentage}} <- cluster_summary()].

get_count_for_range(Range, SummaryList) ->
case proplists:get_value(Range, SummaryList) of
{Count, _} -> Count;
undefined -> 0
end.

%% Merges two summary lists by adding their counts and recalculating percentages
merge_summaries(Summary1, Summary2) ->
%% Get all bucket ranges
AllRanges = lists:usort([Range || {Range, _} <- Summary1] ++ [Range || {Range, _} <- Summary2]),

MergedRanges = lists:map(fun(Range) ->
Count1 = get_count_for_range(Range, Summary1),
Count2 = get_count_for_range(Range, Summary2),
NewCount = Count1 + Count2,
{Range, NewCount}
end, AllRanges),

%% Calculate total and percentages
NewTotal = lists:sum([Count || {_, Count} <- MergedRanges]),
FinalRanges = lists:map(fun({Range, Count}) ->
NewPercentage = case NewTotal of
0 -> 0.0;
_ -> (Count / NewTotal) * 100
end,
{Range, {Count, NewPercentage}}
end, MergedRanges),

FinalRanges.

bucket_range(?BUCKET_1) -> {0, 100};
bucket_range(?BUCKET_2) -> {101, 1000};
bucket_range(?BUCKET_3) -> {1001, 10000};
bucket_range(?BUCKET_4) -> {10001, 100000};
bucket_range(?BUCKET_5) -> {100001, 1000000};
bucket_range(?BUCKET_6) -> {1000001, 10000000};
bucket_range(?BUCKET_7) -> {10000001, 50000000};
bucket_range(?BUCKET_8) -> {50000001, 100000000};
bucket_range(?BUCKET_9) -> {100000001, infinity}.

-ifdef(TEST).
%% "Counters are not tied to the current process and are automatically
%% garbage collected when they are no longer referenced."
Expand Down
122 changes: 85 additions & 37 deletions deps/rabbit/test/msg_size_metrics_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").

-import(rabbit_ct_broker_helpers,
[rpc/4]).
[rpc/4, rpc/5]).

all() ->
[
Expand All @@ -22,9 +22,11 @@ all() ->

groups() ->
[
{tests, [shuffle],
[message_size,
over_max_message_size]}
{tests, [],
[summary, %% needs to run first
message_size,
over_max_message_size]
}
].

%% -------------------------------------------------------------------
Expand All @@ -34,14 +36,18 @@ groups() ->
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
Config.

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(
Config,
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, 3},
{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

Expand All @@ -51,6 +57,13 @@ end_per_group(_Group, Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(summary, Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
ok ->
rabbit_ct_helpers:testcase_started(Config, sumary);
{skip, _} = Skip ->
Skip
end;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

Expand All @@ -65,32 +78,7 @@ message_size(Config) ->
AmqplBefore = get_msg_size_metrics(amqp091, Config),
AmqpBefore = get_msg_size_metrics(amqp10, Config),

Binary2B = <<"12">>,
Binary200K = binary:copy(<<"x">>, 200_000),
Payloads = [Binary2B, Binary200K, Binary2B],

{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[amqp_channel:call(Ch,
#'basic.publish'{routing_key = <<"nowhere">>},
#amqp_msg{payload = Payload})
|| Payload <- Payloads],

OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 30_000 -> ct:fail(credited_timeout)
end,

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),

ok = wait_for_settlement(released, <<"tag1">>),
ok = wait_for_settlement(released, <<"tag2">>),
ok = wait_for_settlement(released, <<"tag3">>),
publish_messages(Config),

AmqplAfter = get_msg_size_metrics(amqp091, Config),
AmqpAfter = get_msg_size_metrics(amqp10, Config),
Expand All @@ -100,10 +88,7 @@ message_size(Config) ->
?assertEqual(ExpectedDiff,
rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)),
?assertEqual(ExpectedDiff,
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)),

ok = amqp10_client:close_connection(Connection),
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)).

over_max_message_size(Config) ->
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
Expand Down Expand Up @@ -134,6 +119,39 @@ over_max_message_size(Config) ->
ok = rabbit_ct_client_helpers:close_connection(Conn),
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).

summary(Config) ->
ZeroSummary = [{{0, 100}, {0, 0.0}},
{{101, 1000}, {0, 0.0}},
{{1001, 10000}, {0, 0.0}},
{{10001, 100000}, {0, 0.0}},
{{100001, 1000000}, {0, 0.0}},
{{1000001, 10000000}, {0, 0.0}},
{{10000001, 50000000}, {0, 0.0}},
{{50000001, 100000000}, {0, 0.0}},
{{100000001, infinity}, {0, 0.0}}],

?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),

publish_messages(Config),

ExpectedSummary = [{{0, 100}, {4, 66.66666666666666}},
{{101, 1000}, {0, 0.0}},
{{1001, 10000}, {0, 0.0}},
{{10001, 100000}, {0, 0.0}},
{{100001, 1000000}, {2, 33.33333333333333}},
{{1000001, 10000000}, {0, 0.0}},
{{10000001, 50000000}, {0, 0.0}},
{{50000001, 100000000}, {0, 0.0}},
{{100000001, infinity}, {0, 0.0}}],

?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ExpectedSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, local_summary, [])).

get_msg_size_metrics(Protocol, Config) ->
rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]).

Expand All @@ -145,6 +163,36 @@ connection_config(Config) ->
container_id => <<"my container">>,
sasl => anon}.

publish_messages(Config) ->
Binary2B = <<"12">>,
Binary200K = binary:copy(<<"x">>, 200_000),
Payloads = [Binary2B, Binary200K, Binary2B],

{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[amqp_channel:call(Ch,
#'basic.publish'{routing_key = <<"nowhere">>},
#amqp_msg{payload = Payload})
|| Payload <- Payloads],

OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 30_000 -> ct:fail(credited_timeout)
end,

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),

ok = wait_for_settlement(released, <<"tag1">>),
ok = wait_for_settlement(released, <<"tag2">>),
ok = wait_for_settlement(released, <<"tag3">>),
ok = amqp10_client:close_connection(Connection),
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).

wait_for_settlement(State, Tag) ->
receive
{amqp10_disposition, {State, Tag}} ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand do
alias RabbitMQ.CLI.Core.DocGuide

@behaviour RabbitMQ.CLI.CommandBehaviour

@default_timeout 60_000

def scopes(), do: [:diagnostics]

def switches(), do: [timeout: :integer]
def aliases(), do: [t: :timeout]

def merge_defaults(args, opts) do
timeout =
case opts[:timeout] do
nil -> @default_timeout
:infinity -> @default_timeout
other -> other
end

{args, Map.merge(%{timeout: timeout}, opts)}
end

use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def run([], %{node: node_name, timeout: timeout}) do
:rabbit_misc.rpc_call(node_name, :rabbit_msg_size_metrics, :cluster_summary_for_cli, [], timeout)
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable

def usage, do: "message_size_stats"

def usage_doc_guides() do
[
DocGuide.monitoring()
]
end

def help_section(), do: :observability_and_health_checks

def description(),
do: "Displays message size distribution statistics aggregated across all cluster nodes"

def banner(_, %{node: node_name}), do: "Gathering message size statistics across the cluster using node #{node_name} ..."

end
Loading
Loading