diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 8605c7eabafb..b2926a545172 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -339,7 +339,7 @@ flow_link_credit(#link_ref{role = receiver, session = Session, RenewWhenBelow =< Credit) -> Flow = #'v1_0.flow'{link_credit = {uint, Credit}, drain = Drain}, - ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow). + ok = amqp10_client_session:flow_link(Session, Handle, Flow, RenewWhenBelow). %% @doc Stop a receiving link. %% See AMQP 1.0 spec ยง2.6.10. @@ -348,7 +348,7 @@ stop_receiver_link(#link_ref{role = receiver, link_handle = Handle}) -> Flow = #'v1_0.flow'{link_credit = {uint, 0}, echo = true}, - ok = amqp10_client_session:flow(Session, Handle, Flow, never). + ok = amqp10_client_session:flow_link(Session, Handle, Flow, never). %%% messages diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 435cce8aed61..b0dc4ab44548 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -20,10 +20,13 @@ attach/2, detach/2, transfer/3, - flow/4, - disposition/5 + disposition/5, + flow_link/4 ]). +%% Manual session flow control is currently only used in tests. +-export([flow/3]). + %% Private API -export([start_link/4, socket_ready/2 @@ -51,7 +54,8 @@ [add/2, diff/2]). --define(MAX_SESSION_WINDOW_SIZE, 65535). +%% By default, we want to keep the server's remote-incoming-window large at all times. +-define(DEFAULT_MAX_INCOMING_WINDOW, 100_000). -define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}). -define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX). %% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6] @@ -129,7 +133,8 @@ available = 0 :: non_neg_integer(), drain = false :: boolean(), partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]}, - auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()}, + auto_flow :: never | {RenewWhenBelow :: pos_integer(), + Credit :: pos_integer()}, incoming_unsettled = #{} :: #{delivery_number() => ok}, footer_opt :: footer_opt() | undefined }). @@ -140,7 +145,10 @@ %% session flow control, see section 2.5.6 next_incoming_id :: transfer_number() | undefined, - incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(), + %% Can become negative if the peer overshoots our window. + incoming_window :: integer(), + auto_flow :: never | {RenewWhenBelow :: pos_integer(), + NewWindowSize :: pos_integer()}, next_outgoing_id = ?INITIAL_OUTGOING_TRANSFER_ID :: transfer_number(), remote_incoming_window = 0 :: non_neg_integer(), remote_outgoing_window = 0 :: non_neg_integer(), @@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) -> [Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg), gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout). -flow(Session, Handle, Flow, RenewWhenBelow) -> +-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok. +flow(Session, IncomingWindow, RenewWhenBelow) when + %% Check that the RenewWhenBelow value make sense. + RenewWhenBelow =:= never orelse + is_integer(RenewWhenBelow) andalso + RenewWhenBelow > 0 andalso + RenewWhenBelow =< IncomingWindow -> + gen_statem:cast(Session, {flow_session, IncomingWindow, RenewWhenBelow}). + +-spec flow_link(pid(), link_handle(), #'v1_0.flow'{}, never | pos_integer()) -> ok. +flow_link(Session, Handle, Flow, RenewWhenBelow) -> gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}). %% Sending a disposition on a sender link (with receiver-settle-mode = second) @@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) -> channel = Channel, reader = Reader, connection_config = ConnConfig, + incoming_window = ?DEFAULT_MAX_INCOMING_WINDOW, + auto_flow = {?DEFAULT_MAX_INCOMING_WINDOW div 2, + ?DEFAULT_MAX_INCOMING_WINDOW}, early_attach_requests = []}, {ok, unmapped, State}. @@ -282,15 +303,15 @@ mapped(cast, 'end', State) -> mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) -> State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0), {keep_state, State}; -mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}}, - #state{next_incoming_id = NII, - next_outgoing_id = NOI} = State) -> - Flow = Flow0#'v1_0.flow'{ - next_incoming_id = maybe_uint(NII), - next_outgoing_id = uint(NOI), - outgoing_window = ?UINT_OUTGOING_WINDOW}, - ok = send(Flow, State), - {keep_state, State#state{incoming_window = IncomingWindow}}; +mapped(cast, {flow_session, IncomingWindow, RenewWhenBelow}, State0) -> + AutoFlow = case RenewWhenBelow of + never -> never; + _ -> {RenewWhenBelow, IncomingWindow} + end, + State = State0#state{incoming_window = IncomingWindow, + auto_flow = AutoFlow}, + send_flow_session(State), + {keep_state, State}; mapped(cast, #'v1_0.end'{} = End, State) -> %% We receive the first end frame, reply and terminate. _ = send_end(State), @@ -656,35 +677,44 @@ is_bare_message_section(_Section) -> send_flow_link(OutHandle, #'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow, - #state{links = Links, - next_incoming_id = NII, - next_outgoing_id = NOI, - incoming_window = InWin} = State) -> + #state{links = Links} = State) -> AutoFlow = case RenewWhenBelow of never -> never; - Limit -> {auto, Limit, Credit} + _ -> {RenewWhenBelow, Credit} end, #{OutHandle := #link{output_handle = H, role = receiver, delivery_count = DeliveryCount, available = Available} = Link} = Links, - Flow = Flow0#'v1_0.flow'{ - handle = uint(H), - %% "This value MUST be set if the peer has received the begin - %% frame for the session, and MUST NOT be set if it has not." [2.7.4] - next_incoming_id = maybe_uint(NII), - next_outgoing_id = uint(NOI), - outgoing_window = ?UINT_OUTGOING_WINDOW, - incoming_window = uint(InWin), - %% "In the event that the receiving link endpoint has not yet seen the - %% initial attach frame from the sender this field MUST NOT be set." [2.7.4] - delivery_count = maybe_uint(DeliveryCount), - available = uint(Available)}, + Flow1 = Flow0#'v1_0.flow'{ + handle = uint(H), + %% "In the event that the receiving link endpoint has not yet seen the + %% initial attach frame from the sender this field MUST NOT be set." [2.7.4] + delivery_count = maybe_uint(DeliveryCount), + available = uint(Available)}, + Flow = set_flow_session_fields(Flow1, State), ok = send(Flow, State), State#state{links = Links#{OutHandle => Link#link{link_credit = Credit, auto_flow = AutoFlow}}}. +send_flow_session(State) -> + Flow = set_flow_session_fields(#'v1_0.flow'{}, State), + ok = send(Flow, State). + +set_flow_session_fields(Flow, #state{next_incoming_id = NID, + incoming_window = IW, + next_outgoing_id = NOI}) -> + Flow#'v1_0.flow'{ + %% "This value MUST be set if the peer has received the begin + %% frame for the session, and MUST NOT be set if it has not." [2.7.4] + next_incoming_id = maybe_uint(NID), + %% IncomingWindow0 can be negative when the sending server overshoots our window. + %% We must set a floor of 0 in the FLOW frame because field incoming-window is an uint. + incoming_window = uint(max(0, IW)), + next_outgoing_id = uint(NOI), + outgoing_window = ?UINT_OUTGOING_WINDOW}. + build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc) when byte_size(Bin) =< MaxPayloadSize -> T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}), @@ -1059,17 +1089,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link, links = Links#{Handle => book_link_transfer_send(Link)}}. book_partial_transfer_received(#state{next_incoming_id = NID, - remote_outgoing_window = ROW} = State) -> - State#state{next_incoming_id = add(NID, 1), - remote_outgoing_window = ROW - 1}. + incoming_window = IW, + remote_outgoing_window = ROW} = State0) -> + State = State0#state{next_incoming_id = add(NID, 1), + incoming_window = IW - 1, + remote_outgoing_window = ROW - 1}, + maybe_widen_incoming_window(State). book_transfer_received(State = #state{connection_config = #{transfer_limit_margin := Margin}}, #link{link_credit = Margin} = Link) -> {transfer_limit_exceeded, Link, State}; book_transfer_received(#state{next_incoming_id = NID, + incoming_window = IW, remote_outgoing_window = ROW, - links = Links} = State, + links = Links} = State0, #link{output_handle = OutHandle, delivery_count = DC, link_credit = LC, @@ -1079,19 +1113,31 @@ book_transfer_received(#state{next_incoming_id = NID, %% "the receiver MUST maintain a floor of zero in its %% calculation of the value of available" [2.6.7] available = max(0, Avail - 1)}, - State1 = State#state{links = Links#{OutHandle => Link1}, - next_incoming_id = add(NID, 1), - remote_outgoing_window = ROW - 1}, + State1 = State0#state{links = Links#{OutHandle => Link1}, + next_incoming_id = add(NID, 1), + incoming_window = IW - 1, + remote_outgoing_window = ROW - 1}, + State = maybe_widen_incoming_window(State1), case Link1 of #link{link_credit = 0, auto_flow = never} -> - {credit_exhausted, Link1, State1}; + {credit_exhausted, Link1, State}; _ -> - {ok, Link1, State1} + {ok, Link1, State} end. +maybe_widen_incoming_window( + State0 = #state{incoming_window = IncomingWindow, + auto_flow = {RenewWhenBelow, NewWindowSize}}) + when IncomingWindow < RenewWhenBelow -> + State = State0#state{incoming_window = NewWindowSize}, + send_flow_session(State), + State; +maybe_widen_incoming_window(State) -> + State. + auto_flow(#link{link_credit = LC, - auto_flow = {auto, RenewWhenBelow, Credit}, + auto_flow = {RenewWhenBelow, Credit}, output_handle = OutHandle, incoming_unsettled = Unsettled}, State) @@ -1230,6 +1276,7 @@ format_status(Status = #{data := Data0}) -> remote_channel = RemoteChannel, next_incoming_id = NextIncomingId, incoming_window = IncomingWindow, + auto_flow = SessionAutoFlow, next_outgoing_id = NextOutgoingId, remote_incoming_window = RemoteIncomingWindow, remote_outgoing_window = RemoteOutgoingWindow, @@ -1294,6 +1341,7 @@ format_status(Status = #{data := Data0}) -> remote_channel => RemoteChannel, next_incoming_id => NextIncomingId, incoming_window => IncomingWindow, + auto_flow => SessionAutoFlow, next_outgoing_id => NextOutgoingId, remote_incoming_window => RemoteIncomingWindow, remote_outgoing_window => RemoteOutgoingWindow, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 6e75e9a8f1fe..35f7c9d5c198 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -163,6 +163,8 @@ groups() -> incoming_window_closed_rabbitmq_internal_flow_quorum_queue, tcp_back_pressure_rabbitmq_internal_flow_classic_queue, tcp_back_pressure_rabbitmq_internal_flow_quorum_queue, + session_flow_control_default_max_frame_size, + session_flow_control_small_max_frame_size, session_max_per_connection, link_max_per_session, reserved_annotation, @@ -1644,7 +1646,7 @@ server_closes_link(QType, Config) -> receive {amqp10_msg, Receiver, Msg} -> ?assertEqual([Body], amqp10_msg:body(Msg)) - after 30000 -> ct:fail("missing msg") + after 9000 -> ct:fail({missing_msg, ?LINE}) end, [SessionPid] = rpc(Config, rabbit_amqp_session, list_local, []), @@ -2994,7 +2996,7 @@ detach_requeues_two_connections(QType, Config) -> {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session1, <<"my link pair">>), QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), - flush(link_pair_attached), + flush(queue_declared), %% Attach 1 sender and 2 receivers. {ok, Sender} = amqp10_client:attach_sender_link(Session0, <<"sender">>, Address, settled), @@ -3004,7 +3006,7 @@ detach_requeues_two_connections(QType, Config) -> receive {amqp10_event, {link, Receiver0, attached}} -> ok after 30000 -> ct:fail({missing_event, ?LINE}) end, - ok = gen_statem:cast(Session0, {flow_session, #'v1_0.flow'{incoming_window = {uint, 1}}}), + ok = amqp10_client_session:flow(Session0, 1, never), ok = amqp10_client:flow_link_credit(Receiver0, 50, never), %% Wait for credit being applied to the queue. timer:sleep(100), @@ -4319,7 +4321,7 @@ available_messages(QType, Config) -> link_credit = {uint, 1}, %% Request sending queue to send us a FLOW including available messages. echo = true}, - ok = amqp10_client_session:flow(Session, OutputHandle, Flow0, never), + ok = amqp10_client_session:flow_link(Session, OutputHandle, Flow0, never), receive_messages(Receiver, 1), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok after 30000 -> ct:fail({missing_event, ?LINE}) @@ -4360,8 +4362,8 @@ available_messages(QType, Config) -> link_credit = {uint, 1}, echo = true}, %% Send both FLOW frames in sequence. - ok = amqp10_client_session:flow(Session, OutputHandle, Flow1, never), - ok = amqp10_client_session:flow(Session, OutputHandle, Flow2, never), + ok = amqp10_client_session:flow_link(Session, OutputHandle, Flow1, never), + ok = amqp10_client_session:flow_link(Session, OutputHandle, Flow2, never), receive_messages(Receiver, 1), receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok after 30000 -> ct:fail({missing_event, ?LINE}) @@ -5916,7 +5918,7 @@ incoming_window_closed_transfer_flow_order(Config) -> end, %% Open our incoming window - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + ok = amqp10_client_session:flow(Session, 5, never), %% Important: We should first receive the TRANSFER, %% and only thereafter the FLOW (and hence the credit_exhausted notification). receive First -> @@ -5969,7 +5971,7 @@ incoming_window_closed_stop_link(Config) -> end, %% Open our incoming window - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + ok = amqp10_client_session:flow(Session, 5, never), %% Since we decreased link credit dynamically, we may or may not receive the 1st message. receive {amqp10_msg, Receiver, Msg1} -> @@ -6015,7 +6017,7 @@ incoming_window_closed_close_link(Config) -> %% Close the link while our session incoming-window is closed. ok = detach_link_sync(Receiver), %% Open our incoming window. - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + ok = amqp10_client_session:flow(Session, 5, never), %% Given that both endpoints have now destroyed the link, we do not %% expect to receive any TRANSFER or FLOW frame referencing the destroyed link. receive Unexpected2 -> ct:fail({unexpected, Unexpected2}) @@ -6069,7 +6071,7 @@ incoming_window_closed_rabbitmq_internal_flow(QType, Config) -> ?assert(MsgsReady > 0), %% Open our incoming window. - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, Num}}}), + ok = amqp10_client_session:flow(Session, 100, 50), receive_messages(Receiver, Num), ok = detach_link_sync(Receiver), @@ -6168,6 +6170,122 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = close({Connection, Session, LinkPair}). +session_flow_control_default_max_frame_size(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {_, Session, LinkPair} = Init = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + Num = 1000, + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + + ok = amqp10_client_session:flow(Session, 2, never), + %% Grant link credit worth of all messages that we are going to receive + %% in this test case. + ok = amqp10_client:flow_link_credit(Receiver, Num * 2, never), + + [Msg1000, Msg999] = receive_messages(Receiver, 2), + ?assertEqual(<<"1000">>, amqp10_msg:body_bin(Msg1000)), + ?assertEqual(<<"999">>, amqp10_msg:body_bin(Msg999)), + receive {amqp10_msg, _, _} = Unexpected0 -> + ct:fail({unexpected_msg, Unexpected0, ?LINE}) + after 50 -> ok + end, + + ok = amqp10_client_session:flow(Session, 1, never), + [Msg998] = receive_messages(Receiver, 1), + ?assertEqual(<<"998">>, amqp10_msg:body_bin(Msg998)), + receive {amqp10_msg, _, _} = Unexpected1 -> + ct:fail({unexpected_msg, Unexpected1, ?LINE}) + after 50 -> ok + end, + + ok = amqp10_client_session:flow(Session, 0, never), + receive {amqp10_msg, _, _} = Unexpected2 -> + ct:fail({unexpected_msg, Unexpected2, ?LINE}) + after 50 -> ok + end, + + %% When the client automatically widens the session window, + %% we should receive all remaining messages. + ok = amqp10_client_session:flow(Session, 2, 1), + receive_messages(Receiver, Num - 3), + + %% Let's test with a different auto renew session flow config (100, 100). + ok = amqp10_client_session:flow(Session, 0, never), + ok = send_messages(Sender, Num, false), + ok = wait_for_accepts(Num), + receive {amqp10_msg, _, _} = Unexpected3 -> + ct:fail({unexpected_msg, Unexpected3, ?LINE}) + after 50 -> ok + end, + ok = amqp10_client_session:flow(Session, 100, 100), + receive_messages(Receiver, Num), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = close(Init). + +%% Test session flow control with large messages split into multiple transfer frames. +session_flow_control_small_max_frame_size(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{max_frame_size => 1000}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"pair">>), + + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + Suffix = binary:copy(<<"x">>, 2500), + Num = 10, + ok = send_messages(Sender, Num, false, Suffix), + ok = wait_for_accepts(Num), + + %% 1 message of size ~2500 bytes gets split into 3 transfer frames + %% because each transfer frame has max size of 1000 bytes. + %% Hence, if we set our incoming-window to 3, we should receive exactly 1 message. + ok = amqp10_client_session:flow(Session, 3, never), + %% Grant plenty of link credit. + ok = amqp10_client:flow_link_credit(Receiver, Num * 5, never), + receive {amqp10_msg, Receiver, Msg10} -> + ?assertEqual(<<"10", Suffix/binary>>, + amqp10_msg:body_bin(Msg10)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, _, _} = Unexpected0 -> + ct:fail({unexpected_msg, Unexpected0, ?LINE}) + after 50 -> ok + end, + + %% When the client automatically widens the session window, + %% we should receive all remaining messages. + ok = amqp10_client_session:flow(Session, 2, 1), + Msgs = receive_messages(Receiver, Num - 1), + Msg1 = lists:last(Msgs), + ?assertEqual(<<"1", Suffix/binary>>, + amqp10_msg:body_bin(Msg1)), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = close_connection_sync(Connection). + session_max_per_connection(Config) -> App = rabbit, Par = session_max_per_connection, @@ -6703,4 +6821,4 @@ find_event(Type, Props, Events) when is_list(Props), is_list(Events) -> end, Events). close_incoming_window(Session) -> - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}). + amqp10_client_session:flow(Session, 0, never). diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl index 8e025951a2b5..42343270d58d 100644 --- a/deps/rabbitmq_amqp_client/test/management_SUITE.erl +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -1015,7 +1015,7 @@ session_flow_control(Config) -> ok = amqp10_client:flow_link_credit(IncomingLink, 1, never), %% Close our incoming window. - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}), + amqp10_client_session:flow(Session, 0, never), Request0 = amqp10_msg:new(<<>>, #'v1_0.amqp_value'{content = null}, true), MessageId = <<1>>, @@ -1031,7 +1031,7 @@ session_flow_control(Config) -> end, %% Open our incoming window - gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}), + amqp10_client_session:flow(Session, 1, never), receive {amqp10_msg, IncomingLink, Response} -> ?assertMatch(#{correlation_id := MessageId,