Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ flow_link_credit(#link_ref{role = receiver, session = Session,
RenewWhenBelow =< Credit) ->
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
drain = Drain},
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
ok = amqp10_client_session:flow_link(Session, Handle, Flow, RenewWhenBelow).

%% @doc Stop a receiving link.
%% See AMQP 1.0 spec §2.6.10.
Expand All @@ -348,7 +348,7 @@ stop_receiver_link(#link_ref{role = receiver,
link_handle = Handle}) ->
Flow = #'v1_0.flow'{link_credit = {uint, 0},
echo = true},
ok = amqp10_client_session:flow(Session, Handle, Flow, never).
ok = amqp10_client_session:flow_link(Session, Handle, Flow, never).

%%% messages

Expand Down
132 changes: 90 additions & 42 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
attach/2,
detach/2,
transfer/3,
flow/4,
disposition/5
disposition/5,
flow_link/4
]).

%% Manual session flow control is currently only used in tests.
-export([flow/3]).

%% Private API
-export([start_link/4,
socket_ready/2
Expand Down Expand Up @@ -51,7 +54,8 @@
[add/2,
diff/2]).

-define(MAX_SESSION_WINDOW_SIZE, 65535).
%% By default, we want to keep the server's remote-incoming-window large at all times.
-define(DEFAULT_MAX_INCOMING_WINDOW, 100_000).
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
Expand Down Expand Up @@ -129,7 +133,8 @@
available = 0 :: non_neg_integer(),
drain = false :: boolean(),
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
Credit :: pos_integer()},
incoming_unsettled = #{} :: #{delivery_number() => ok},
footer_opt :: footer_opt() | undefined
}).
Expand All @@ -140,7 +145,10 @@

%% session flow control, see section 2.5.6
next_incoming_id :: transfer_number() | undefined,
incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(),
%% Can become negative if the peer overshoots our window.
incoming_window :: integer(),
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
NewWindowSize :: pos_integer()},
next_outgoing_id = ?INITIAL_OUTGOING_TRANSFER_ID :: transfer_number(),
remote_incoming_window = 0 :: non_neg_integer(),
remote_outgoing_window = 0 :: non_neg_integer(),
Expand Down Expand Up @@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).

flow(Session, Handle, Flow, RenewWhenBelow) ->
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
flow(Session, IncomingWindow, RenewWhenBelow) when
%% Check that the RenewWhenBelow value make sense.
RenewWhenBelow =:= never orelse
is_integer(RenewWhenBelow) andalso
RenewWhenBelow > 0 andalso
RenewWhenBelow =< IncomingWindow ->
gen_statem:cast(Session, {flow_session, IncomingWindow, RenewWhenBelow}).

-spec flow_link(pid(), link_handle(), #'v1_0.flow'{}, never | pos_integer()) -> ok.
flow_link(Session, Handle, Flow, RenewWhenBelow) ->
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).

%% Sending a disposition on a sender link (with receiver-settle-mode = second)
Expand Down Expand Up @@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
channel = Channel,
reader = Reader,
connection_config = ConnConfig,
incoming_window = ?DEFAULT_MAX_INCOMING_WINDOW,
auto_flow = {?DEFAULT_MAX_INCOMING_WINDOW div 2,
?DEFAULT_MAX_INCOMING_WINDOW},
early_attach_requests = []},
{ok, unmapped, State}.

Expand Down Expand Up @@ -282,15 +303,15 @@ mapped(cast, 'end', State) ->
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
{keep_state, State};
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
#state{next_incoming_id = NII,
next_outgoing_id = NOI} = State) ->
Flow = Flow0#'v1_0.flow'{
next_incoming_id = maybe_uint(NII),
next_outgoing_id = uint(NOI),
outgoing_window = ?UINT_OUTGOING_WINDOW},
ok = send(Flow, State),
{keep_state, State#state{incoming_window = IncomingWindow}};
mapped(cast, {flow_session, IncomingWindow, RenewWhenBelow}, State0) ->
AutoFlow = case RenewWhenBelow of
never -> never;
_ -> {RenewWhenBelow, IncomingWindow}
end,
State = State0#state{incoming_window = IncomingWindow,
auto_flow = AutoFlow},
send_flow_session(State),
{keep_state, State};
mapped(cast, #'v1_0.end'{} = End, State) ->
%% We receive the first end frame, reply and terminate.
_ = send_end(State),
Expand Down Expand Up @@ -656,35 +677,44 @@ is_bare_message_section(_Section) ->

send_flow_link(OutHandle,
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
#state{links = Links,
next_incoming_id = NII,
next_outgoing_id = NOI,
incoming_window = InWin} = State) ->
#state{links = Links} = State) ->
AutoFlow = case RenewWhenBelow of
never -> never;
Limit -> {auto, Limit, Credit}
_ -> {RenewWhenBelow, Credit}
end,
#{OutHandle := #link{output_handle = H,
role = receiver,
delivery_count = DeliveryCount,
available = Available} = Link} = Links,
Flow = Flow0#'v1_0.flow'{
handle = uint(H),
%% "This value MUST be set if the peer has received the begin
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
next_incoming_id = maybe_uint(NII),
next_outgoing_id = uint(NOI),
outgoing_window = ?UINT_OUTGOING_WINDOW,
incoming_window = uint(InWin),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow1 = Flow0#'v1_0.flow'{
handle = uint(H),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow = set_flow_session_fields(Flow1, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}.

send_flow_session(State) ->
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),
ok = send(Flow, State).

set_flow_session_fields(Flow, #state{next_incoming_id = NID,
incoming_window = IW,
next_outgoing_id = NOI}) ->
Flow#'v1_0.flow'{
%% "This value MUST be set if the peer has received the begin
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
next_incoming_id = maybe_uint(NID),
%% IncomingWindow0 can be negative when the sending server overshoots our window.
%% We must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
incoming_window = uint(max(0, IW)),
next_outgoing_id = uint(NOI),
outgoing_window = ?UINT_OUTGOING_WINDOW}.

build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc)
when byte_size(Bin) =< MaxPayloadSize ->
T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}),
Expand Down Expand Up @@ -1059,17 +1089,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link,
links = Links#{Handle => book_link_transfer_send(Link)}}.

book_partial_transfer_received(#state{next_incoming_id = NID,
remote_outgoing_window = ROW} = State) ->
State#state{next_incoming_id = add(NID, 1),
remote_outgoing_window = ROW - 1}.
incoming_window = IW,
remote_outgoing_window = ROW} = State0) ->
State = State0#state{next_incoming_id = add(NID, 1),
incoming_window = IW - 1,
remote_outgoing_window = ROW - 1},
maybe_widen_incoming_window(State).

book_transfer_received(State = #state{connection_config =
#{transfer_limit_margin := Margin}},
#link{link_credit = Margin} = Link) ->
{transfer_limit_exceeded, Link, State};
book_transfer_received(#state{next_incoming_id = NID,
incoming_window = IW,
remote_outgoing_window = ROW,
links = Links} = State,
links = Links} = State0,
#link{output_handle = OutHandle,
delivery_count = DC,
link_credit = LC,
Expand All @@ -1079,19 +1113,31 @@ book_transfer_received(#state{next_incoming_id = NID,
%% "the receiver MUST maintain a floor of zero in its
%% calculation of the value of available" [2.6.7]
available = max(0, Avail - 1)},
State1 = State#state{links = Links#{OutHandle => Link1},
next_incoming_id = add(NID, 1),
remote_outgoing_window = ROW - 1},
State1 = State0#state{links = Links#{OutHandle => Link1},
next_incoming_id = add(NID, 1),
incoming_window = IW - 1,
remote_outgoing_window = ROW - 1},
State = maybe_widen_incoming_window(State1),
case Link1 of
#link{link_credit = 0,
auto_flow = never} ->
{credit_exhausted, Link1, State1};
{credit_exhausted, Link1, State};
_ ->
{ok, Link1, State1}
{ok, Link1, State}
end.

maybe_widen_incoming_window(
State0 = #state{incoming_window = IncomingWindow,
auto_flow = {RenewWhenBelow, NewWindowSize}})
when IncomingWindow < RenewWhenBelow ->
State = State0#state{incoming_window = NewWindowSize},
send_flow_session(State),
State;
maybe_widen_incoming_window(State) ->
State.

auto_flow(#link{link_credit = LC,
auto_flow = {auto, RenewWhenBelow, Credit},
auto_flow = {RenewWhenBelow, Credit},
output_handle = OutHandle,
incoming_unsettled = Unsettled},
State)
Expand Down Expand Up @@ -1230,6 +1276,7 @@ format_status(Status = #{data := Data0}) ->
remote_channel = RemoteChannel,
next_incoming_id = NextIncomingId,
incoming_window = IncomingWindow,
auto_flow = SessionAutoFlow,
next_outgoing_id = NextOutgoingId,
remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow,
Expand Down Expand Up @@ -1294,6 +1341,7 @@ format_status(Status = #{data := Data0}) ->
remote_channel => RemoteChannel,
next_incoming_id => NextIncomingId,
incoming_window => IncomingWindow,
auto_flow => SessionAutoFlow,
next_outgoing_id => NextOutgoingId,
remote_incoming_window => RemoteIncomingWindow,
remote_outgoing_window => RemoteOutgoingWindow,
Expand Down
Loading
Loading