diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 070205fa0b64..791124d7e2de 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -10,6 +10,7 @@ -include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl"). +-include("rabbit_amqp_reader.hrl"). -include("rabbit_amqp.hrl"). -export([init/1, @@ -79,7 +80,8 @@ pending_recv :: boolean(), buf :: list(), buf_len :: non_neg_integer(), - tracked_channels :: #{channel_number() => Session :: pid()} + tracked_channels :: #{channel_number() => Session :: pid()}, + stats_timer :: rabbit_event:state() }). -type state() :: #v1{}. @@ -90,7 +92,7 @@ unpack_from_0_9_1( {Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket, - ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt}, + ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}, Parent) -> logger:update_process_metadata(#{connection => ConnectionName}), #v1{parent = Parent, @@ -106,6 +108,7 @@ unpack_from_0_9_1( tracked_channels = maps:new(), writer = none, connection_state = received_amqp3100, + stats_timer = StatsTimer, connection = #v1_connection{ name = ConnectionName, container_id = none, @@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. +handle_other(emit_stats, State) -> + emit_stats(State); +handle_other(ensure_stats_timer, State) -> + ensure_stats_timer(State); handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'", [Reason]), @@ -247,8 +254,16 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> end, gen_server:reply(From, Reply), State; -handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> - State; +handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) -> + case ?IS_RUNNING(State) of + true -> + Infos = infos(?CONNECTION_EVENT_KEYS, State), + rabbit_event:notify(connection_created, Infos, Ref), + rabbit_event:init_stats_timer(State, #v1.stats_timer); + false -> + %% Ignore, we will emit a connection_created event once we start running. + State + end; handle_other(terminate_connection, _State) -> stop; handle_other({set_credential, Cred}, State) -> @@ -527,6 +542,7 @@ handle_connection_frame( proplists:get_value(pid, Infos), Infos), ok = rabbit_event:notify(connection_created, Infos), + ok = maybe_emit_stats(State), ok = rabbit_amqp1_0:register_connection(self()), Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306 <<"LINK_PAIR_V1_0">>, @@ -629,25 +645,26 @@ handle_input(handshake, switch_callback(State, {frame_header, amqp}, 8); handle_input({frame_header, Mode}, Header = <>, - State) when DOff >= 2 -> + State0) when DOff >= 2 -> case {Mode, Type} of {amqp, 0} -> ok; {sasl, 1} -> ok; - _ -> throw({bad_1_0_header_type, Header, Mode}) + _ -> throw({bad_1_0_header_type, Header, Mode}) end, - MaxFrameSize = State#v1.connection#v1_connection.incoming_max_frame_size, - if Size =:= 8 -> - %% heartbeat - State; - Size > MaxFrameSize -> - handle_exception( - State, Channel, error_frame( - ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, - "frame size (~b bytes) > maximum frame size (~b bytes)", - [Size, MaxFrameSize])); - true -> - switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8) - end; + MaxFrameSize = State0#v1.connection#v1_connection.incoming_max_frame_size, + State = if Size =:= 8 -> + %% heartbeat + State0; + Size > MaxFrameSize -> + Err = error_frame( + ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + "frame size (~b bytes) > maximum frame size (~b bytes)", + [Size, MaxFrameSize]), + handle_exception(State0, Channel, Err); + true -> + switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8) + end, + ensure_stats_timer(State); handle_input({frame_header, _Mode}, Malformed, _State) -> throw({bad_1_0_header, Malformed}); handle_input({frame_body, Mode, DOff, Channel}, @@ -1013,13 +1030,18 @@ i(peer_host, #v1{connection = #v1_connection{peer_host = Val}}) -> Val; i(peer_port, #v1{connection = #v1_connection{peer_port = Val}}) -> Val; -i(SockStat, S) when SockStat =:= recv_oct; - SockStat =:= recv_cnt; - SockStat =:= send_oct; - SockStat =:= send_cnt; - SockStat =:= send_pend -> - socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end, - fun ([{_, I}]) -> I end, S); +i(SockStat, #v1{sock = Sock}) + when SockStat =:= recv_oct; + SockStat =:= recv_cnt; + SockStat =:= send_oct; + SockStat =:= send_cnt; + SockStat =:= send_pend -> + case rabbit_net:getstat(Sock, [SockStat]) of + {ok, [{SockStat, Val}]} -> + Val; + {error, _} -> + '' + end; i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); i(SSL, #v1{sock = Sock, proxy_socket = ProxySock}) when SSL =:= ssl_protocol; @@ -1045,15 +1067,37 @@ i(channels, #v1{tracked_channels = Channels}) -> maps:size(Channels); i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) -> Max; +i(reductions = Item, _State) -> + {Item, Reductions} = erlang:process_info(self(), Item), + Reductions; +i(garbage_collection, _State) -> + rabbit_misc:get_gc_info(self()); i(Item, #v1{}) -> throw({bad_argument, Item}). -%% From rabbit_reader -socket_info(Get, Select, #v1{sock = Sock}) -> - case Get(Sock) of - {ok, T} -> Select(T); - {error, _} -> '' - end. +maybe_emit_stats(State) -> + ok = rabbit_event:if_enabled( + State, + #v1.stats_timer, + fun() -> emit_stats(State) end). + +emit_stats(State) -> + [{_, Pid}, + {_, RecvOct}, + {_, SendOct}, + {_, Reductions}] = infos(?SIMPLE_METRICS, State), + Infos = infos(?OTHER_METRICS, State), + rabbit_core_metrics:connection_stats(Pid, Infos), + rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions), + %% NB: Don't call ensure_stats_timer because it becomes expensive + %% if all idle non-hibernating connections emit stats. + rabbit_event:reset_stats_timer(State, #v1.stats_timer). + +ensure_stats_timer(State) + when ?IS_RUNNING(State) -> + rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); +ensure_stats_timer(State) -> + State. ignore_maintenance({map, Properties}) -> lists:member( diff --git a/deps/rabbit/src/rabbit_amqp_reader.hrl b/deps/rabbit/src/rabbit_amqp_reader.hrl new file mode 100644 index 000000000000..7c71b21dc90f --- /dev/null +++ b/deps/rabbit/src/rabbit_amqp_reader.hrl @@ -0,0 +1,17 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-define(SIMPLE_METRICS, [pid, + recv_oct, + send_oct, + reductions]). + +-define(OTHER_METRICS, [recv_cnt, + send_cnt, + send_pend, + state, + channels, + garbage_collection]). diff --git a/deps/rabbit/src/rabbit_amqp_writer.erl b/deps/rabbit/src/rabbit_amqp_writer.erl index 7b239a10a107..4750e9954007 100644 --- a/deps/rabbit/src/rabbit_amqp_writer.erl +++ b/deps/rabbit/src/rabbit_amqp_writer.erl @@ -31,7 +31,8 @@ pending :: iolist(), %% This field is just an optimisation to minimize the cost of erlang:iolist_size/1 pending_size :: non_neg_integer(), - monitored_sessions :: #{pid() => true} + monitored_sessions :: #{pid() => true}, + stats_timer :: rabbit_event:state() }). -define(HIBERNATE_AFTER, 6_000). @@ -100,7 +101,8 @@ init({Sock, ReaderPid}) -> reader = ReaderPid, pending = [], pending_size = 0, - monitored_sessions = #{}}, + monitored_sessions = #{}, + stats_timer = rabbit_event:init_stats_timer()}, process_flag(message_queue_data, off_heap), {ok, State}. @@ -123,6 +125,10 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) -> State = flush(State1), {reply, ok, State}. +handle_info(emit_stats, State0 = #state{reader = ReaderPid}) -> + ReaderPid ! ensure_stats_timer, + State = rabbit_event:reset_stats_timer(State0, #state.stats_timer), + no_reply(State); handle_info(timeout, State0) -> State = flush(State0), {noreply, State}; @@ -223,18 +229,19 @@ tcp_send(Sock, Data) -> maybe_flush(State = #state{pending_size = PendingSize}) -> case PendingSize > ?FLUSH_THRESHOLD of - true -> flush(State); + true -> flush(State); false -> State end. flush(State = #state{pending = []}) -> State; -flush(State = #state{sock = Sock, - pending = Pending}) -> +flush(State0 = #state{sock = Sock, + pending = Pending}) -> case rabbit_net:send(Sock, lists:reverse(Pending)) of ok -> - State#state{pending = [], - pending_size = 0}; + State = State0#state{pending = [], + pending_size = 0}, + rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats); {error, Reason} -> exit({writer, send_failed, Reason}) end. diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 42e7e70a75fe..4ce9c0fec829 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -42,6 +42,7 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include("rabbit_amqp_reader.hrl"). -export([start_link/2, info/2, force_event_refresh/2, shutdown/2]). @@ -116,10 +117,6 @@ connection_blocked_message_sent }). --define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). --define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, - garbage_collection]). - -define(CREATION_EVENT_KEYS, [pid, name, port, peer_port, host, peer_host, ssl, peer_cert_subject, peer_cert_issuer, @@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState, end; i(garbage_collection, _State) -> rabbit_misc:get_gc_info(self()); -i(reductions, _State) -> - {reductions, Reductions} = erlang:process_info(self(), reductions), +i(reductions = Item, _State) -> + {Item, Reductions} = erlang:process_info(self(), Item), Reductions; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). @@ -1623,12 +1620,12 @@ maybe_emit_stats(State) -> emit_stats(State) -> [{_, Pid}, - {_, Recv_oct}, - {_, Send_oct}, + {_, RecvOct}, + {_, SendOct}, {_, Reductions}] = infos(?SIMPLE_METRICS, State), Infos = infos(?OTHER_METRICS, State), rabbit_core_metrics:connection_stats(Pid, Infos), - rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions), + rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions), State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer), ensure_stats_timer(State1). @@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock, pending_recv = PendingRecv, helper_sup = {_HelperSup091, HelperSup10}, proxy_socket = ProxySocket, + stats_timer = StatsTimer, connection = #connection{ name = Name, host = Host, @@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock, peer_port = PeerPort, connected_at = ConnectedAt}}) -> {Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket, - Name, Host, PeerHost, Port, PeerPort, ConnectedAt}. + Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}. respond_and_close(State, Channel, Protocol, Reason, LogErr) -> log_hard_error(State, Channel, LogErr), diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 8b5430076f53..492809e520a8 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -141,9 +141,9 @@ connection_stats(Pid, Infos) -> ets:insert(connection_metrics, {Pid, Infos}), ok. -connection_stats(Pid, Recv_oct, Send_oct, Reductions) -> +connection_stats(Pid, RecvOct, SendOct, Reductions) -> %% Includes delete marker - ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}), + ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}), ok. channel_created(Pid, Infos) -> diff --git a/deps/rabbit_common/src/rabbit_event.erl b/deps/rabbit_common/src/rabbit_event.erl index ac584ed0819f..9412e940e0eb 100644 --- a/deps/rabbit_common/src/rabbit_event.erl +++ b/deps/rabbit_common/src/rabbit_event.erl @@ -10,7 +10,7 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/2, init_disabled_stats_timer/2, +-export([init_stats_timer/0, init_stats_timer/2, init_disabled_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). -export([notify/2, notify/3, notify_if/3]). @@ -89,23 +89,34 @@ start_link() -> %% Nowadays, instead of sending a message to rabbit_event via notify(stats), %% some stat-emitting objects update ETS tables directly via module rabbit_core_metrics. -init_stats_timer(C, P) -> +-spec init_stats_timer() -> state(). +init_stats_timer() -> %% If the rabbit app is not loaded - use default none:5000 StatsLevel = application:get_env(rabbit, collect_statistics, none), - Interval = application:get_env(rabbit, collect_statistics_interval, 5000), - setelement(P, C, #state{level = StatsLevel, interval = Interval, - timer = undefined}). + Interval = application:get_env(rabbit, collect_statistics_interval, 5000), + #state{level = StatsLevel, + interval = Interval, + timer = undefined}. + +init_stats_timer(C, P) -> + State = init_stats_timer(), + setelement(P, C, State). init_disabled_stats_timer(C, P) -> - setelement(P, C, #state{level = none, interval = 0, timer = undefined}). + State = #state{level = none, + interval = 0, + timer = undefined}, + setelement(P, C, State). ensure_stats_timer(C, P, Msg) -> case element(P, C) of - #state{level = Level, interval = Interval, timer = undefined} = State + #state{level = Level, + interval = Interval, + timer = undefined} = State when Level =/= none -> TRef = erlang:send_after(Interval, self(), Msg), setelement(P, C, State#state{timer = TRef}); - #state{} -> + _State -> C end. @@ -156,5 +167,5 @@ event_cons(Type, Props, Ref) -> #event{type = Type, props = Props, reference = Ref, - timestamp = os:system_time(milli_seconds)}. + timestamp = os:system_time(millisecond)}. diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index d517cb4810a8..e30d532607c6 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -134,8 +134,6 @@ all_tests() -> [ permissions_validation_test, permissions_list_test, permissions_test, - connections_test_amqpl, - connections_test_amqp, multiple_invalid_connections_test, quorum_queues_test, stream_queues_have_consumers_field, @@ -201,7 +199,10 @@ all_tests() -> [ disabled_qq_replica_opers_test, qq_status_test, list_deprecated_features_test, - list_used_deprecated_features_test + list_used_deprecated_features_test, + connections_test_amqpl, + connections_test_amqp, + enable_plugin_amqp ]. %% ------------------------------------------------------------------- @@ -1068,6 +1069,33 @@ connections_test_amqp(Config) -> eventually(?_assertEqual([], http_get(Config, "/connections")), 10, 5), ?assertEqual(0, length(rpc(Config, rabbit_amqp1_0, list_local, []))). +%% Test that AMQP 1.0 connection can be listed if the rabbitmq_management plugin gets enabled +%% after the connection was established. +enable_plugin_amqp(Config) -> + ?assertEqual(0, length(http_get(Config, "/connections"))), + + ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management), + ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management_agent), + + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpnConf = #{address => ?config(rmq_hostname, Config), + port => Port, + container_id => <<"my container">>, + sasl => anon}, + {ok, Conn} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Conn, opened}} -> ok + after 5000 -> ct:fail(opened_timeout) + end, + + ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management_agent), + ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management), + eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), + + ok = amqp10_client:close_connection(Conn), + receive {amqp10_event, {connection, Conn, {closed, normal}}} -> ok + after 5000 -> ct:fail({connection_close_timeout, Conn}) + end. + flush(Prefix) -> receive Msg -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 94925d75fb9c..5372491753d4 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -37,7 +37,7 @@ rabbit_mqtt_processor:state(), connection_state :: running | blocked, conserve :: boolean(), - stats_timer :: option(rabbit_event:state()), + stats_timer :: rabbit_event:state(), keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(), conn_name :: binary() }). @@ -87,9 +87,9 @@ init(Ref) -> await_recv = false, connection_state = running, conserve = false, - parse_state = rabbit_mqtt_packet:init_state()}, - State1 = control_throttle(State0), - State = rabbit_event:init_stats_timer(State1, #state.stats_timer), + parse_state = rabbit_mqtt_packet:init_state(), + stats_timer = rabbit_event:init_stats_timer()}, + State = control_throttle(State0), gen_server:enter_loop(?MODULE, [], State); {error, Reason = enotconn} -> ?LOG_INFO("MQTT could not get connection string: ~s", [Reason]), @@ -440,8 +440,6 @@ maybe_process_deferred_recv(State = #state{ deferred_recv = Data, socket = Sock handle_info({tcp, Sock, Data}, State#state{ deferred_recv = undefined }). -maybe_emit_stats(#state{stats_timer = undefined}) -> - ok; maybe_emit_stats(State) -> rabbit_event:if_enabled(State, #state.stats_timer, fun() -> emit_stats(State) end). diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 67e99400b500..607df88c2498 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -42,7 +42,7 @@ stats_timer :: option(rabbit_event:state()), keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(), conn_name :: option(binary()) - }). + }). -type state() :: #state{}. @@ -79,13 +79,12 @@ init(Req, Opts) -> false -> no_supported_sub_protocol(Protocol, Req); true -> + Req1 = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req), + State = #state{socket = maps:get(proxy_header, Req, undefined), + stats_timer = rabbit_event:init_stats_timer()}, WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), WsOpts = maps:merge(#{compress => true}, WsOpts0), - - {?MODULE, - cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req), - #state{socket = maps:get(proxy_header, Req, undefined)}, - WsOpts} + {?MODULE, Req1, State, WsOpts} end end. @@ -112,8 +111,7 @@ websocket_init(State0 = #state{socket = Sock}) -> ConnName = rabbit_data_coercion:to_binary(ConnStr), ?LOG_INFO("Accepting Web MQTT connection ~s", [ConnName]), _ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - State1 = State0#state{conn_name = ConnName}, - State = rabbit_event:init_stats_timer(State1, #state.stats_timer), + State = State0#state{conn_name = ConnName}, process_flag(trap_exit, true), {[], State, hibernate}; {error, Reason} -> diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index 294aabe37ffc..781b64b0e99d 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -29,6 +29,15 @@ This annotation allows publishers to specify a [list](https://docs.oasis-open.or This feature allows clients to set a new token proactively before the current one [expires](/docs/oauth2#token-expiration), ensuring uninterrupted connectivity. If a client does not set a new token before the existing one expires, RabbitMQ will automatically close the AMQP 1.0 connection. +### Metrics for AMQP 1.0 Connections +[PR #12638](https://github.com/rabbitmq/rabbitmq-server/pull/12638) exposes the following AMQP 1.0 connection metrics in the RabbitMQ Management UI and the [/metrics/per-object](https://www.rabbitmq.com/docs/prometheus#per-object-endpoint) Prometheus endpoint: +* Bytes received and sent +* Reductions +* Garbage collections +* Number of channels/sessions + +These metrics have already been emitted for AMQP 0.9.1 connections prior to RabbitMQ 4.1. + ## Potential incompatibilities * The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB).