2020 attach /2 ,
2121 detach /2 ,
2222 transfer /3 ,
23- flow / 4 ,
24- disposition / 5
23+ disposition / 5 ,
24+ flow_link / 4
2525 ]).
2626
27+ % % Manual session flow control is currently only used in tests.
28+ -export ([flow /3 ]).
29+
2730% % Private API
2831-export ([start_link /4 ,
2932 socket_ready /2
5154 [add /2 ,
5255 diff /2 ]).
5356
54- -define (MAX_SESSION_WINDOW_SIZE , 65535 ).
57+ % % By default, we want to keep the server's remote-incoming-window large at all times.
58+ -define (DEFAULT_MAX_INCOMING_WINDOW , 100_000 ).
5559-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
5660-define (INITIAL_OUTGOING_DELIVERY_ID , ? UINT_MAX ).
5761% % "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
129133 available = 0 :: non_neg_integer (),
130134 drain = false :: boolean (),
131135 partial_transfers :: undefined | {# 'v1_0.transfer' {}, [binary ()]},
132- auto_flow :: never | {auto , RenewWhenBelow :: pos_integer (), Credit :: pos_integer ()},
136+ auto_flow :: never | {RenewWhenBelow :: pos_integer (),
137+ Credit :: pos_integer ()},
133138 incoming_unsettled = #{} :: #{delivery_number () => ok },
134139 footer_opt :: footer_opt () | undefined
135140 }).
140145
141146 % % session flow control, see section 2.5.6
142147 next_incoming_id :: transfer_number () | undefined ,
143- incoming_window = ? MAX_SESSION_WINDOW_SIZE :: non_neg_integer (),
148+ % % Can become negative if the peer overshoots our window.
149+ incoming_window :: integer (),
150+ auto_flow :: never | {RenewWhenBelow :: pos_integer (),
151+ NewWindowSize :: pos_integer ()},
144152 next_outgoing_id = ? INITIAL_OUTGOING_TRANSFER_ID :: transfer_number (),
145153 remote_incoming_window = 0 :: non_neg_integer (),
146154 remote_outgoing_window = 0 :: non_neg_integer (),
@@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) ->
200208 [Transfer | Sections ] = amqp10_msg :to_amqp_records (Amqp10Msg ),
201209 gen_statem :call (Session , {transfer , Transfer , Sections }, Timeout ).
202210
203- flow (Session , Handle , Flow , RenewWhenBelow ) ->
211+ -spec flow (pid (), non_neg_integer (), never | pos_integer ()) -> ok .
212+ flow (Session , IncomingWindow , RenewWhenBelow ) when
213+ % % Check that the RenewWhenBelow value make sense.
214+ RenewWhenBelow =:= never orelse
215+ is_integer (RenewWhenBelow ) andalso
216+ RenewWhenBelow > 0 andalso
217+ RenewWhenBelow =< IncomingWindow ->
218+ gen_statem :cast (Session , {flow_session , IncomingWindow , RenewWhenBelow }).
219+
220+ -spec flow_link (pid (), link_handle (), # 'v1_0.flow' {}, never | pos_integer ()) -> ok .
221+ flow_link (Session , Handle , Flow , RenewWhenBelow ) ->
204222 gen_statem :cast (Session , {flow_link , Handle , Flow , RenewWhenBelow }).
205223
206224% % Sending a disposition on a sender link (with receiver-settle-mode = second)
@@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
239257 channel = Channel ,
240258 reader = Reader ,
241259 connection_config = ConnConfig ,
260+ incoming_window = ? DEFAULT_MAX_INCOMING_WINDOW ,
261+ auto_flow = {? DEFAULT_MAX_INCOMING_WINDOW div 2 ,
262+ ? DEFAULT_MAX_INCOMING_WINDOW },
242263 early_attach_requests = []},
243264 {ok , unmapped , State }.
244265
@@ -282,6 +303,7 @@ mapped(cast, 'end', State) ->
282303mapped (cast , {flow_link , OutHandle , Flow0 , RenewWhenBelow }, State0 ) ->
283304 State = send_flow_link (OutHandle , Flow0 , RenewWhenBelow , State0 ),
284305 {keep_state , State };
306+ <<<<<<< HEAD
285307mapped (cast , {flow_session , Flow0 = # 'v1_0.flow' {incoming_window = {uint , IncomingWindow }}},
286308 # state {next_incoming_id = NII ,
287309 next_outgoing_id = NOI } = State ) ->
@@ -292,6 +314,18 @@ mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, Incomi
292314 ok = send (Flow , State ),
293315 {keep_state , State # state {incoming_window = IncomingWindow }};
294316mapped (cast , # 'v1_0.end' {error = Err }, State ) ->
317+ =======
318+ mapped (cast , {flow_session , IncomingWindow , RenewWhenBelow }, State0 ) ->
319+ AutoFlow = case RenewWhenBelow of
320+ never -> never ;
321+ _ -> {RenewWhenBelow , IncomingWindow }
322+ end ,
323+ State = State0 # state {incoming_window = IncomingWindow ,
324+ auto_flow = AutoFlow },
325+ send_flow_session (State ),
326+ {keep_state , State };
327+ mapped (cast , # 'v1_0.end' {} = End , State ) ->
328+ >>>>>>> 35394625 a (Auto widen session incoming - window in AMQP 1.0 client )
295329 % % We receive the first end frame, reply and terminate.
296330 _ = send_end (State ),
297331 % TODO: send notifications for links?
@@ -660,35 +694,44 @@ is_bare_message_section(_Section) ->
660694
661695send_flow_link (OutHandle ,
662696 # 'v1_0.flow' {link_credit = {uint , Credit }} = Flow0 , RenewWhenBelow ,
663- # state {links = Links ,
664- next_incoming_id = NII ,
665- next_outgoing_id = NOI ,
666- incoming_window = InWin } = State ) ->
697+ # state {links = Links } = State ) ->
667698 AutoFlow = case RenewWhenBelow of
668699 never -> never ;
669- Limit -> {auto , Limit , Credit }
700+ _ -> {RenewWhenBelow , Credit }
670701 end ,
671702 #{OutHandle := # link {output_handle = H ,
672703 role = receiver ,
673704 delivery_count = DeliveryCount ,
674705 available = Available } = Link } = Links ,
675- Flow = Flow0 # 'v1_0.flow' {
676- handle = uint (H ),
677- % % "This value MUST be set if the peer has received the begin
678- % % frame for the session, and MUST NOT be set if it has not." [2.7.4]
679- next_incoming_id = maybe_uint (NII ),
680- next_outgoing_id = uint (NOI ),
681- outgoing_window = ? UINT_OUTGOING_WINDOW ,
682- incoming_window = uint (InWin ),
683- % % "In the event that the receiving link endpoint has not yet seen the
684- % % initial attach frame from the sender this field MUST NOT be set." [2.7.4]
685- delivery_count = maybe_uint (DeliveryCount ),
686- available = uint (Available )},
706+ Flow1 = Flow0 # 'v1_0.flow' {
707+ handle = uint (H ),
708+ % % "In the event that the receiving link endpoint has not yet seen the
709+ % % initial attach frame from the sender this field MUST NOT be set." [2.7.4]
710+ delivery_count = maybe_uint (DeliveryCount ),
711+ available = uint (Available )},
712+ Flow = set_flow_session_fields (Flow1 , State ),
687713 ok = send (Flow , State ),
688714 State # state {links = Links #{OutHandle =>
689715 Link # link {link_credit = Credit ,
690716 auto_flow = AutoFlow }}}.
691717
718+ send_flow_session (State ) ->
719+ Flow = set_flow_session_fields (# 'v1_0.flow' {}, State ),
720+ ok = send (Flow , State ).
721+
722+ set_flow_session_fields (Flow , # state {next_incoming_id = NID ,
723+ incoming_window = IW ,
724+ next_outgoing_id = NOI }) ->
725+ Flow # 'v1_0.flow' {
726+ % % "This value MUST be set if the peer has received the begin
727+ % % frame for the session, and MUST NOT be set if it has not." [2.7.4]
728+ next_incoming_id = maybe_uint (NID ),
729+ % % IncomingWindow0 can be negative when the sending server overshoots our window.
730+ % % We must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
731+ incoming_window = uint (max (0 , IW )),
732+ next_outgoing_id = uint (NOI ),
733+ outgoing_window = ? UINT_OUTGOING_WINDOW }.
734+
692735build_frames (Channel , Trf , Bin , MaxPayloadSize , Acc )
693736 when byte_size (Bin ) =< MaxPayloadSize ->
694737 T = amqp10_framing :encode_bin (Trf # 'v1_0.transfer' {more = false }),
@@ -1020,17 +1063,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link,
10201063 links = Links #{Handle => book_link_transfer_send (Link )}}.
10211064
10221065book_partial_transfer_received (# state {next_incoming_id = NID ,
1023- remote_outgoing_window = ROW } = State ) ->
1024- State # state {next_incoming_id = add (NID , 1 ),
1025- remote_outgoing_window = ROW - 1 }.
1066+ incoming_window = IW ,
1067+ remote_outgoing_window = ROW } = State0 ) ->
1068+ State = State0 # state {next_incoming_id = add (NID , 1 ),
1069+ incoming_window = IW - 1 ,
1070+ remote_outgoing_window = ROW - 1 },
1071+ maybe_widen_incoming_window (State ).
10261072
10271073book_transfer_received (State = # state {connection_config =
10281074 #{transfer_limit_margin := Margin }},
10291075 # link {link_credit = Margin } = Link ) ->
10301076 {transfer_limit_exceeded , Link , State };
10311077book_transfer_received (# state {next_incoming_id = NID ,
1078+ incoming_window = IW ,
10321079 remote_outgoing_window = ROW ,
1033- links = Links } = State ,
1080+ links = Links } = State0 ,
10341081 # link {output_handle = OutHandle ,
10351082 delivery_count = DC ,
10361083 link_credit = LC ,
@@ -1040,19 +1087,31 @@ book_transfer_received(#state{next_incoming_id = NID,
10401087 % % "the receiver MUST maintain a floor of zero in its
10411088 % % calculation of the value of available" [2.6.7]
10421089 available = max (0 , Avail - 1 )},
1043- State1 = State # state {links = Links #{OutHandle => Link1 },
1044- next_incoming_id = add (NID , 1 ),
1045- remote_outgoing_window = ROW - 1 },
1090+ State1 = State0 # state {links = Links #{OutHandle => Link1 },
1091+ next_incoming_id = add (NID , 1 ),
1092+ incoming_window = IW - 1 ,
1093+ remote_outgoing_window = ROW - 1 },
1094+ State = maybe_widen_incoming_window (State1 ),
10461095 case Link1 of
10471096 # link {link_credit = 0 ,
10481097 auto_flow = never } ->
1049- {credit_exhausted , Link1 , State1 };
1098+ {credit_exhausted , Link1 , State };
10501099 _ ->
1051- {ok , Link1 , State1 }
1100+ {ok , Link1 , State }
10521101 end .
10531102
1103+ maybe_widen_incoming_window (
1104+ State0 = # state {incoming_window = IncomingWindow ,
1105+ auto_flow = {RenewWhenBelow , NewWindowSize }})
1106+ when IncomingWindow < RenewWhenBelow ->
1107+ State = State0 # state {incoming_window = NewWindowSize },
1108+ send_flow_session (State ),
1109+ State ;
1110+ maybe_widen_incoming_window (State ) ->
1111+ State .
1112+
10541113auto_flow (# link {link_credit = LC ,
1055- auto_flow = {auto , RenewWhenBelow , Credit },
1114+ auto_flow = {RenewWhenBelow , Credit },
10561115 output_handle = OutHandle ,
10571116 incoming_unsettled = Unsettled },
10581117 State )
@@ -1218,6 +1277,7 @@ format_status(Status = #{data := Data0}) ->
12181277 remote_channel = RemoteChannel ,
12191278 next_incoming_id = NextIncomingId ,
12201279 incoming_window = IncomingWindow ,
1280+ auto_flow = SessionAutoFlow ,
12211281 next_outgoing_id = NextOutgoingId ,
12221282 remote_incoming_window = RemoteIncomingWindow ,
12231283 remote_outgoing_window = RemoteOutgoingWindow ,
@@ -1282,6 +1342,7 @@ format_status(Status = #{data := Data0}) ->
12821342 remote_channel => RemoteChannel ,
12831343 next_incoming_id => NextIncomingId ,
12841344 incoming_window => IncomingWindow ,
1345+ auto_flow => SessionAutoFlow ,
12851346 next_outgoing_id => NextOutgoingId ,
12861347 remote_incoming_window => RemoteIncomingWindow ,
12871348 remote_outgoing_window => RemoteOutgoingWindow ,
0 commit comments