From c3dcaca947970657cddac0b2b14b62ea36cc83d2 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 9 Dec 2024 15:49:34 +0100 Subject: [PATCH] Set a floor of zero for incoming-window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this commit, when the sending client overshot RabbitMQ's incoming-window (which is allowed in the event of a cluster wide memory or disk alarm), and RabbitMQ sent a FLOW frame to the client, RabbitMQ sent a negative incoming-window field in the FLOW frame causing the following crash in the writer proc: ``` crasher: initial call: rabbit_amqp_writer:init/1 pid: <0.19353.0> registered_name: [] exception error: bad argument in function iolist_size/1 called as iolist_size([<<112,0,0,23,120>>, [82,-15], <<"pÿÿÿü">>,<<"pÿÿÿÿ">>,67, <<112,0,0,23,120>>, "Rª",64,64,64,64]) *** argument 1: not an iodata term in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 141) in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 88) in call from amqp10_binary_generator:generate/1 (amqp10_binary_generator.erl, line 79) in call from rabbit_amqp_writer:assemble_frame/3 (rabbit_amqp_writer.erl, line 206) in call from rabbit_amqp_writer:internal_send_command_async/3 (rabbit_amqp_writer.erl, line 189) in call from rabbit_amqp_writer:handle_cast/2 (rabbit_amqp_writer.erl, line 110) in call from gen_server:try_handle_cast/3 (gen_server.erl, line 1121) ``` This commit fixes this crash by maintaning a floor of zero for incoming-window in the FLOW frame. Fixes #12816 --- .../src/amqp10_client_session.erl | 5 -- deps/rabbit/src/rabbit_amqp_session.erl | 5 +- deps/rabbit/test/amqp_client_SUITE.erl | 59 +++++++++++++++++++ 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index ba7218b84d7e..5738abace5ff 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -467,11 +467,6 @@ mapped({call, From}, #state{remote_incoming_window = Window}) when Window =< 0 -> {keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}}; -mapped({call, From}, - {transfer, _Transfer, _Sections}, - #state{remote_incoming_window = Window}) - when Window =< 0 -> - {keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}}; mapped({call, From = {Pid, _}}, {transfer, #'v1_0.transfer'{handle = {uint, OutHandle}, delivery_tag = {binary, DeliveryTag}, diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 5a15222c0c76..44f492e94456 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2003,7 +2003,10 @@ session_flow_fields(Frames, State) session_flow_fields(Flow = #'v1_0.flow'{}, #state{next_outgoing_id = NextOutgoingId, next_incoming_id = NextIncomingId, - incoming_window = IncomingWindow}) -> + incoming_window = IncomingWindow0}) -> + %% IncomingWindow0 can be negative when the sending client overshoots our window. + %% However, we must set a floor of 0 in the FLOW frame because field incoming-window is an uint. + IncomingWindow = max(0, IncomingWindow0), Flow#'v1_0.flow'{ next_outgoing_id = ?UINT(NextOutgoingId), outgoing_window = ?UINT_OUTGOING_WINDOW, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 97e8e6b079cf..ab4addfd6966 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -108,6 +108,7 @@ groups() -> detach_requeues_drop_head_classic_queue, resource_alarm_before_session_begin, resource_alarm_after_session_begin, + resource_alarm_send_many, max_message_size_client_to_server, max_message_size_server_to_client, global_counters, @@ -3207,6 +3208,42 @@ resource_alarm_after_session_begin(Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). +%% Test case for +%% https://github.com/rabbitmq/rabbitmq-server/issues/12816 +resource_alarm_send_many(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Ch = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), + Address = rabbitmq_amqp_address:queue(QName), + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% Send many messages while a memory alarm kicks in. + %% Our expectations are: + %% 1. At some point, our client's remote-incoming-window should be exceeded because + %% RabbitMQ sets its incoming-window to 0 when the alarm kicks in. + %% 2. No crash. + {Pid, Ref} = spawn_monitor(?MODULE, + send_until_remote_incoming_window_exceeded, + [Session, Address]), + DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []), + ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]), + receive {'DOWN', Ref, process, Pid, Reason} -> + ?assertEqual(normal, Reason) + after 30_000 -> + ct:fail(send_timeout) + end, + + %% Clear memory alarm. + ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]), + timer:sleep(100), + + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection), + #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + ok = rabbit_ct_client_helpers:close_channel(Ch). + auth_attempt_metrics(Config) -> open_and_close_connection(Config), [Attempt1] = rpc(Config, rabbit_core_metrics, get_auth_attempts, []), @@ -6286,6 +6323,28 @@ count_received_messages0(Receiver, Count) -> Count end. +send_until_remote_incoming_window_exceeded(Session, Address) -> + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, settled), + ok = wait_for_credit(Sender), + ok = send_until_remote_incoming_window_exceeded0(Sender, 100_000), + ok = amqp10_client:detach_link(Sender). + +send_until_remote_incoming_window_exceeded0(_Sender, 0) -> + ct:fail(remote_incoming_window_never_exceeded); +send_until_remote_incoming_window_exceeded0(Sender, Left) -> + Bin = integer_to_binary(Left), + Msg = amqp10_msg:new(Bin, Bin, true), + case amqp10_client:send_msg(Sender, Msg) of + ok -> + send_until_remote_incoming_window_exceeded0(Sender, Left - 1); + {error, insufficient_credit} -> + ok = wait_for_credit(Sender), + send_until_remote_incoming_window_exceeded0(Sender, Left); + {error, remote_incoming_window_exceeded = Reason} -> + ct:pal("~s: ~b messages left", [Reason, Left]), + ok + end. + assert_link_credit_runs_out(_Sender, 0) -> ct:fail(sufficient_link_credit); assert_link_credit_runs_out(Sender, Left) ->