Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,6 @@ mapped({call, From},
#state{remote_incoming_window = Window})
when Window =< 0 ->
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
mapped({call, From},
{transfer, _Transfer, _Sections},
#state{remote_incoming_window = Window})
when Window =< 0 ->
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
mapped({call, From = {Pid, _}},
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
delivery_tag = {binary, DeliveryTag},
Expand Down
5 changes: 4 additions & 1 deletion deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,10 @@ session_flow_fields(Frames, State)
session_flow_fields(Flow = #'v1_0.flow'{},
#state{next_outgoing_id = NextOutgoingId,
next_incoming_id = NextIncomingId,
incoming_window = IncomingWindow}) ->
incoming_window = IncomingWindow0}) ->
%% IncomingWindow0 can be negative when the sending client overshoots our window.
%% However, we must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
IncomingWindow = max(0, IncomingWindow0),
Flow#'v1_0.flow'{
next_outgoing_id = ?UINT(NextOutgoingId),
outgoing_window = ?UINT_OUTGOING_WINDOW,
Expand Down
59 changes: 59 additions & 0 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ groups() ->
detach_requeues_drop_head_classic_queue,
resource_alarm_before_session_begin,
resource_alarm_after_session_begin,
resource_alarm_send_many,
max_message_size_client_to_server,
max_message_size_server_to_client,
global_counters,
Expand Down Expand Up @@ -3207,6 +3208,42 @@ resource_alarm_after_session_begin(Config) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).

%% Test case for
%% https://github.com/rabbitmq/rabbitmq-server/issues/12816
resource_alarm_send_many(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),

%% Send many messages while a memory alarm kicks in.
%% Our expectations are:
%% 1. At some point, our client's remote-incoming-window should be exceeded because
%% RabbitMQ sets its incoming-window to 0 when the alarm kicks in.
%% 2. No crash.
{Pid, Ref} = spawn_monitor(?MODULE,
send_until_remote_incoming_window_exceeded,
[Session, Address]),
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
receive {'DOWN', Ref, process, Pid, Reason} ->
?assertEqual(normal, Reason)
after 30_000 ->
ct:fail(send_timeout)
end,

%% Clear memory alarm.
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]),
timer:sleep(100),

ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).

auth_attempt_metrics(Config) ->
open_and_close_connection(Config),
[Attempt1] = rpc(Config, rabbit_core_metrics, get_auth_attempts, []),
Expand Down Expand Up @@ -6286,6 +6323,28 @@ count_received_messages0(Receiver, Count) ->
Count
end.

send_until_remote_incoming_window_exceeded(Session, Address) ->
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, settled),
ok = wait_for_credit(Sender),
ok = send_until_remote_incoming_window_exceeded0(Sender, 100_000),
ok = amqp10_client:detach_link(Sender).

send_until_remote_incoming_window_exceeded0(_Sender, 0) ->
ct:fail(remote_incoming_window_never_exceeded);
send_until_remote_incoming_window_exceeded0(Sender, Left) ->
Bin = integer_to_binary(Left),
Msg = amqp10_msg:new(Bin, Bin, true),
case amqp10_client:send_msg(Sender, Msg) of
ok ->
send_until_remote_incoming_window_exceeded0(Sender, Left - 1);
{error, insufficient_credit} ->
ok = wait_for_credit(Sender),
send_until_remote_incoming_window_exceeded0(Sender, Left);
{error, remote_incoming_window_exceeded = Reason} ->
ct:pal("~s: ~b messages left", [Reason, Left]),
ok
end.

assert_link_credit_runs_out(_Sender, 0) ->
ct:fail(sufficient_link_credit);
assert_link_credit_runs_out(Sender, Left) ->
Expand Down
Loading