Skip to content

Commit cfc3a6c

Browse files
committed
Set a floor of zero for incoming-window
Prior to this commit, when the sending client overshot RabbitMQ's incoming-window (which is allowed in the event of a cluster wide memory or disk alarm), and RabbitMQ sent a FLOW frame to the client, RabbitMQ sent a negative incoming-window field in the FLOW frame causing the following crash in the writer proc: ``` crasher: initial call: rabbit_amqp_writer:init/1 pid: <0.19353.0> registered_name: [] exception error: bad argument in function iolist_size/1 called as iolist_size([<<112,0,0,23,120>>, [82,-15], <<"pÿÿÿü">>,<<"pÿÿÿÿ">>,67, <<112,0,0,23,120>>, "Rª",64,64,64,64]) *** argument 1: not an iodata term in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 141) in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 88) in call from amqp10_binary_generator:generate/1 (amqp10_binary_generator.erl, line 79) in call from rabbit_amqp_writer:assemble_frame/3 (rabbit_amqp_writer.erl, line 206) in call from rabbit_amqp_writer:internal_send_command_async/3 (rabbit_amqp_writer.erl, line 189) in call from rabbit_amqp_writer:handle_cast/2 (rabbit_amqp_writer.erl, line 110) in call from gen_server:try_handle_cast/3 (gen_server.erl, line 1121) ``` This commit fixes this crash by maintaning a floor of zero for incoming-window in the FLOW frame. Fixes #12816
1 parent c15ba8e commit cfc3a6c

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2003,7 +2003,10 @@ session_flow_fields(Frames, State)
20032003
session_flow_fields(Flow = #'v1_0.flow'{},
20042004
#state{next_outgoing_id = NextOutgoingId,
20052005
next_incoming_id = NextIncomingId,
2006-
incoming_window = IncomingWindow}) ->
2006+
incoming_window = IncomingWindow0}) ->
2007+
%% IncomingWindow0 can be negative when the sending client overshoots our window.
2008+
%% However, we must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
2009+
IncomingWindow = max(0, IncomingWindow0),
20072010
Flow#'v1_0.flow'{
20082011
next_outgoing_id = ?UINT(NextOutgoingId),
20092012
outgoing_window = ?UINT_OUTGOING_WINDOW,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ groups() ->
108108
detach_requeues_drop_head_classic_queue,
109109
resource_alarm_before_session_begin,
110110
resource_alarm_after_session_begin,
111+
resource_alarm_send_many,
111112
max_message_size_client_to_server,
112113
max_message_size_server_to_client,
113114
global_counters,
@@ -3207,6 +3208,51 @@ resource_alarm_after_session_begin(Config) ->
32073208
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
32083209
ok = rabbit_ct_client_helpers:close_channel(Ch).
32093210

3211+
%% Test case for
3212+
%% https://github.com/rabbitmq/rabbitmq-server/issues/12816
3213+
resource_alarm_send_many(Config) ->
3214+
QName = atom_to_binary(?FUNCTION_NAME),
3215+
Ch = rabbit_ct_client_helpers:open_channel(Config),
3216+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
3217+
Address = rabbitmq_amqp_address:queue(QName),
3218+
OpnConf = connection_config(Config),
3219+
3220+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
3221+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
3222+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, mixed),
3223+
ok = wait_for_credit(Sender),
3224+
3225+
%% Send many messages while a memory alarm kicks in.
3226+
spawn_link(
3227+
fun() ->
3228+
ct:pal("sending first message..."),
3229+
[begin
3230+
Bin = integer_to_binary(I),
3231+
_ = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin, true))
3232+
end || I <- lists:seq(1, 50_000)],
3233+
ct:pal("sent last message")
3234+
end),
3235+
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
3236+
ct:pal("setting memory alarm..."),
3237+
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
3238+
ct:pal("set memory alarm"),
3239+
timer:sleep(100),
3240+
%% At some point, RabbitMQ blocked our client sending messages.
3241+
%% However, once the memory alarm clears, our client should be able to send a message again.
3242+
ct:pal("clearing memory alarm..."),
3243+
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]),
3244+
ct:pal("cleared memory alarm"),
3245+
timer:sleep(100),
3246+
?assertEqual(ok,
3247+
amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag">>, <<"msg">>, false))),
3248+
ok = wait_for_accepted(<<"tag">>),
3249+
3250+
ok = amqp10_client:detach_link(Sender),
3251+
ok = end_session_sync(Session),
3252+
ok = amqp10_client:close_connection(Connection),
3253+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
3254+
ok = rabbit_ct_client_helpers:close_channel(Ch).
3255+
32103256
auth_attempt_metrics(Config) ->
32113257
open_and_close_connection(Config),
32123258
[Attempt1] = rpc(Config, rabbit_core_metrics, get_auth_attempts, []),

0 commit comments

Comments
 (0)