diff --git a/deps/rabbit/src/rabbit_msg_size_metrics.erl b/deps/rabbit/src/rabbit_msg_size_metrics.erl index 4b8104d6e7f9..e7b6425685a6 100644 --- a/deps/rabbit/src/rabbit_msg_size_metrics.erl +++ b/deps/rabbit/src/rabbit_msg_size_metrics.erl @@ -11,7 +11,10 @@ -export([init/1, observe/2, - prometheus_format/0]). + prometheus_format/0, + local_summary/0, + cluster_summary/0, + cluster_summary_for_cli/0 ]). %% Integration tests. -export([raw_buckets/1, @@ -52,6 +55,9 @@ -type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(), NumObservations :: non_neg_integer()}]. +-type summary_entry() :: {{non_neg_integer(), non_neg_integer() | infinity}, {non_neg_integer(), float()}}. +-type summary() :: [summary_entry()]. + -spec init(atom()) -> ok. init(Protocol) -> Size = ?POS_MSG_SIZE_SUM, @@ -133,6 +139,109 @@ get_labels_counters() -> [{[{protocol, Protocol}], Counters} || {{?MODULE, Protocol}, Counters} <- persistent_term:get()]. +get_protocols() -> + [Protocol + || {{?MODULE, Protocol}, _} <- persistent_term:get()]. + +%% Aggregates data for all protocols on the local node +-spec local_summary() -> summary(). +local_summary() -> + PerProtocolBuckets = lists:map(fun(Protocol) -> + raw_buckets(Protocol) + end, get_protocols()), + + %% Sum buckets for all protocols + Buckets0 = [{?BUCKET_1, 0}, {?BUCKET_2, 0}, {?BUCKET_3, 0}, {?BUCKET_4, 0}, + {?BUCKET_5, 0}, {?BUCKET_6, 0}, {?BUCKET_7, 0}, {?BUCKET_8, 0}, {?BUCKET_9, 0}], + Buckets = lists:foldl(fun sum_protocol_buckets/2, + Buckets0, + PerProtocolBuckets), + + Total = lists:sum([Count || {_UpperBound, Count} <- Buckets]), + + Ranges = lists:map(fun({UpperBound, Count}) -> + Percentage = case Total of + 0 -> 0.0; + _ -> (Count / Total) * 100 + end, + {bucket_range(UpperBound), {Count, Percentage}} + end, Buckets), + + Ranges. + +sum_protocol_buckets(ProtocolBuckets, Acc) -> + lists:map(fun({UpperBound, AccCount}) -> + ProtocolCount = proplists:get_value(UpperBound, ProtocolBuckets, 0), + {UpperBound, AccCount + ProtocolCount} + end, Acc). + +%% Aggregates sumamries from all nodes +-spec cluster_summary() -> summary(). +cluster_summary() -> + RemoteNodes = [Node || Node <- rabbit_nodes:list_running(), Node =/= node()], + RemoteSummaries = [ Summary || {ok, Summary} <- erpc:multicall(RemoteNodes, + ?MODULE, + local_summary, + [], + 5000)], + lists:foldl(fun merge_summaries/2, local_summary(), RemoteSummaries). + +bucket_name({_, ?BUCKET_1}) -> <<"below 100B">>; +bucket_name({_, ?BUCKET_2}) -> <<"between 100B and 1KB">>; +bucket_name({_, ?BUCKET_3}) -> <<"between 1KB and 10KB">>; +bucket_name({_, ?BUCKET_4}) -> <<"between 10KB and 100KB">>; +bucket_name({_, ?BUCKET_5}) -> <<"between 100KB and 1MB">>; +bucket_name({_, ?BUCKET_6}) -> <<"between 1MB and 10MB">>; +bucket_name({_, ?BUCKET_7}) -> <<"between 10MB and 50MB">>; +bucket_name({_, ?BUCKET_8}) -> <<"between 50MB and 100MB">>; +bucket_name({_, ?BUCKET_9}) -> <<"above 100MB">>. + +cluster_summary_for_cli() -> + [[{<<"Message Size">>, bucket_name(Range)}, + {<<"Count">>, Count}, + {<<"Percentage">>, iolist_to_binary(io_lib:format("~.2f", [Percentage]))}] + || {Range, {Count, Percentage}} <- cluster_summary()]. + +get_count_for_range(Range, SummaryList) -> + case proplists:get_value(Range, SummaryList) of + {Count, _} -> Count; + undefined -> 0 + end. + +%% Merges two summary lists by adding their counts and recalculating percentages +merge_summaries(Summary1, Summary2) -> + %% Get all bucket ranges + AllRanges = lists:usort([Range || {Range, _} <- Summary1] ++ [Range || {Range, _} <- Summary2]), + + MergedRanges = lists:map(fun(Range) -> + Count1 = get_count_for_range(Range, Summary1), + Count2 = get_count_for_range(Range, Summary2), + NewCount = Count1 + Count2, + {Range, NewCount} + end, AllRanges), + + %% Calculate total and percentages + NewTotal = lists:sum([Count || {_, Count} <- MergedRanges]), + FinalRanges = lists:map(fun({Range, Count}) -> + NewPercentage = case NewTotal of + 0 -> 0.0; + _ -> (Count / NewTotal) * 100 + end, + {Range, {Count, NewPercentage}} + end, MergedRanges), + + FinalRanges. + +bucket_range(?BUCKET_1) -> {0, 100}; +bucket_range(?BUCKET_2) -> {101, 1000}; +bucket_range(?BUCKET_3) -> {1001, 10000}; +bucket_range(?BUCKET_4) -> {10001, 100000}; +bucket_range(?BUCKET_5) -> {100001, 1000000}; +bucket_range(?BUCKET_6) -> {1000001, 10000000}; +bucket_range(?BUCKET_7) -> {10000001, 50000000}; +bucket_range(?BUCKET_8) -> {50000001, 100000000}; +bucket_range(?BUCKET_9) -> {100000001, infinity}. + -ifdef(TEST). %% "Counters are not tied to the current process and are automatically %% garbage collected when they are no longer referenced." diff --git a/deps/rabbit/test/msg_size_metrics_SUITE.erl b/deps/rabbit/test/msg_size_metrics_SUITE.erl index 72af61b2194a..6f6ac217c5cf 100644 --- a/deps/rabbit/test/msg_size_metrics_SUITE.erl +++ b/deps/rabbit/test/msg_size_metrics_SUITE.erl @@ -13,7 +13,7 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -import(rabbit_ct_broker_helpers, - [rpc/4]). + [rpc/4, rpc/5]). all() -> [ @@ -22,9 +22,11 @@ all() -> groups() -> [ - {tests, [shuffle], - [message_size, - over_max_message_size]} + {tests, [], + [summary, %% needs to run first + message_size, + over_max_message_size] + } ]. %% ------------------------------------------------------------------- @@ -34,14 +36,18 @@ groups() -> init_per_suite(Config) -> {ok, _} = application:ensure_all_started(amqp10_client), rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + Config. end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). init_per_group(_Group, Config) -> - rabbit_ct_helpers:run_steps( - Config, + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:run_setup_steps( + Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). @@ -51,6 +57,13 @@ end_per_group(_Group, Config) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(summary, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of + ok -> + rabbit_ct_helpers:testcase_started(Config, sumary); + {skip, _} = Skip -> + Skip + end; init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -65,32 +78,7 @@ message_size(Config) -> AmqplBefore = get_msg_size_metrics(amqp091, Config), AmqpBefore = get_msg_size_metrics(amqp10, Config), - Binary2B = <<"12">>, - Binary200K = binary:copy(<<"x">>, 200_000), - Payloads = [Binary2B, Binary200K, Binary2B], - - {AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), - [amqp_channel:call(Ch, - #'basic.publish'{routing_key = <<"nowhere">>}, - #amqp_msg{payload = Payload}) - || Payload <- Payloads], - - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), - {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), - receive {amqp10_event, {link, Sender, credited}} -> ok - after 30_000 -> ct:fail(credited_timeout) - end, - - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)), - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)), - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)), - - ok = wait_for_settlement(released, <<"tag1">>), - ok = wait_for_settlement(released, <<"tag2">>), - ok = wait_for_settlement(released, <<"tag3">>), + publish_messages(Config), AmqplAfter = get_msg_size_metrics(amqp091, Config), AmqpAfter = get_msg_size_metrics(amqp10, Config), @@ -100,10 +88,7 @@ message_size(Config) -> ?assertEqual(ExpectedDiff, rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)), ?assertEqual(ExpectedDiff, - rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)), - - ok = amqp10_client:close_connection(Connection), - ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch). + rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)). over_max_message_size(Config) -> DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]), @@ -134,6 +119,39 @@ over_max_message_size(Config) -> ok = rabbit_ct_client_helpers:close_connection(Conn), ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]). +summary(Config) -> + ZeroSummary = [{{0, 100}, {0, 0.0}}, + {{101, 1000}, {0, 0.0}}, + {{1001, 10000}, {0, 0.0}}, + {{10001, 100000}, {0, 0.0}}, + {{100001, 1000000}, {0, 0.0}}, + {{1000001, 10000000}, {0, 0.0}}, + {{10000001, 50000000}, {0, 0.0}}, + {{50000001, 100000000}, {0, 0.0}}, + {{100000001, infinity}, {0, 0.0}}], + + ?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])), + ?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])), + ?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])), + ?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])), + + publish_messages(Config), + + ExpectedSummary = [{{0, 100}, {4, 66.66666666666666}}, + {{101, 1000}, {0, 0.0}}, + {{1001, 10000}, {0, 0.0}}, + {{10001, 100000}, {0, 0.0}}, + {{100001, 1000000}, {2, 33.33333333333333}}, + {{1000001, 10000000}, {0, 0.0}}, + {{10000001, 50000000}, {0, 0.0}}, + {{50000001, 100000000}, {0, 0.0}}, + {{100000001, infinity}, {0, 0.0}}], + + ?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])), + ?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, cluster_summary, [])), + ?assertEqual(ExpectedSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])), + ?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, local_summary, [])). + get_msg_size_metrics(Protocol, Config) -> rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]). @@ -145,6 +163,36 @@ connection_config(Config) -> container_id => <<"my container">>, sasl => anon}. +publish_messages(Config) -> + Binary2B = <<"12">>, + Binary200K = binary:copy(<<"x">>, 200_000), + Payloads = [Binary2B, Binary200K, Binary2B], + + {AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + [amqp_channel:call(Ch, + #'basic.publish'{routing_key = <<"nowhere">>}, + #amqp_msg{payload = Payload}) + || Payload <- Payloads], + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), + receive {amqp10_event, {link, Sender, credited}} -> ok + after 30_000 -> ct:fail(credited_timeout) + end, + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)), + + ok = wait_for_settlement(released, <<"tag1">>), + ok = wait_for_settlement(released, <<"tag2">>), + ok = wait_for_settlement(released, <<"tag3">>), + ok = amqp10_client:close_connection(Connection), + ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch). + wait_for_settlement(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/diagnostics/commands/message_size_stats_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/diagnostics/commands/message_size_stats_command.ex new file mode 100644 index 000000000000..f9299665d1ee --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/diagnostics/commands/message_size_stats_command.ex @@ -0,0 +1,56 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand do + alias RabbitMQ.CLI.Core.DocGuide + + @behaviour RabbitMQ.CLI.CommandBehaviour + + @default_timeout 60_000 + + def scopes(), do: [:diagnostics] + + def switches(), do: [timeout: :integer] + def aliases(), do: [t: :timeout] + + def merge_defaults(args, opts) do + timeout = + case opts[:timeout] do + nil -> @default_timeout + :infinity -> @default_timeout + other -> other + end + + {args, Map.merge(%{timeout: timeout}, opts)} + end + + use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + + def run([], %{node: node_name, timeout: timeout}) do + :rabbit_misc.rpc_call(node_name, :rabbit_msg_size_metrics, :cluster_summary_for_cli, [], timeout) + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable + + def usage, do: "message_size_stats" + + def usage_doc_guides() do + [ + DocGuide.monitoring() + ] + end + + def help_section(), do: :observability_and_health_checks + + def description(), + do: "Displays message size distribution statistics aggregated across all cluster nodes" + + def banner(_, %{node: node_name}), do: "Gathering message size statistics across the cluster using node #{node_name} ..." + +end diff --git a/deps/rabbitmq_cli/test/diagnostics/message_size_stats_command_test.exs b/deps/rabbitmq_cli/test/diagnostics/message_size_stats_command_test.exs new file mode 100644 index 000000000000..041bf12131dd --- /dev/null +++ b/deps/rabbitmq_cli/test/diagnostics/message_size_stats_command_test.exs @@ -0,0 +1,55 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule MessageSizeStatsCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup do + {:ok, opts: %{node: get_rabbit_hostname(), timeout: 60_000}} + end + + test "validate: with extra arguments returns an arg count error", context do + assert @command.validate(["extra"], context[:opts]) == {:validation_failure, :too_many_args} + end + + test "validate: with no arguments succeeds", context do + assert @command.validate([], context[:opts]) == :ok + end + + test "run: request to a named, active node succeeds", context do + result = @command.run([], context[:opts]) + Enum.each(result, fn row -> + assert is_list(row) + assert Enum.any?(row, fn {key, _} -> key == "Message Size" end) + assert Enum.any?(row, fn {key, _} -> key == "Count" end) + assert Enum.any?(row, fn {key, _} -> key == "Percentage" end) + count_value = Enum.find_value(row, fn {key, value} -> if key == "Count", do: value end) + percentage_value = Enum.find_value(row, fn {key, value} -> if key == "Percentage", do: value end) + assert is_integer(count_value) + assert is_float(String.to_float(percentage_value)) + end) + end + + test "run: request to a non-existent node returns a badrpc" do + opts = %{node: :jake@thedog, timeout: 200} + assert match?({:badrpc, _}, @command.run([], opts)) + end + + test "banner", context do + banner = @command.banner([], context[:opts]) + assert banner =~ ~r/message size statistics/ + end + +end