2020 attach /2 ,
2121 detach /2 ,
2222 transfer /3 ,
23- flow / 4 ,
24- disposition / 5
23+ disposition / 5 ,
24+ flow_link / 4
2525 ]).
2626
27+ % % Manual session flow control is currently only used in tests.
28+ -export ([flow /3 ]).
29+
2730% % Private API
2831-export ([start_link /4 ,
2932 socket_ready /2
5154 [add /2 ,
5255 diff /2 ]).
5356
54- -define (MAX_SESSION_WINDOW_SIZE , 65535 ).
57+ % % By default, we want to keep the server's remote-incoming-window large at all times.
58+ -define (DEFAULT_MAX_INCOMING_WINDOW , 100_000 ).
5559-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
5660-define (INITIAL_OUTGOING_DELIVERY_ID , ? UINT_MAX ).
5761% % "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
129133 available = 0 :: non_neg_integer (),
130134 drain = false :: boolean (),
131135 partial_transfers :: undefined | {# 'v1_0.transfer' {}, [binary ()]},
132- auto_flow :: never | {auto , RenewWhenBelow :: pos_integer (), Credit :: pos_integer ()},
136+ auto_flow :: never | {RenewWhenBelow :: pos_integer (),
137+ Credit :: pos_integer ()},
133138 incoming_unsettled = #{} :: #{delivery_number () => ok },
134139 footer_opt :: footer_opt () | undefined
135140 }).
140145
141146 % % session flow control, see section 2.5.6
142147 next_incoming_id :: transfer_number () | undefined ,
143- incoming_window = ? MAX_SESSION_WINDOW_SIZE :: non_neg_integer (),
148+ % % Can become negative if the peer overshoots our window.
149+ incoming_window :: integer (),
150+ auto_flow :: never | {RenewWhenBelow :: pos_integer (),
151+ NewWindowSize :: pos_integer ()},
144152 next_outgoing_id = ? INITIAL_OUTGOING_TRANSFER_ID :: transfer_number (),
145153 remote_incoming_window = 0 :: non_neg_integer (),
146154 remote_outgoing_window = 0 :: non_neg_integer (),
@@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) ->
200208 [Transfer | Sections ] = amqp10_msg :to_amqp_records (Amqp10Msg ),
201209 gen_statem :call (Session , {transfer , Transfer , Sections }, Timeout ).
202210
203- flow (Session , Handle , Flow , RenewWhenBelow ) ->
211+ -spec flow (pid (), non_neg_integer (), never | pos_integer ()) -> ok .
212+ flow (Session , IncomingWindow , RenewWhenBelow ) when
213+ % % Check that the RenewWhenBelow value make sense.
214+ RenewWhenBelow =:= never orelse
215+ is_integer (RenewWhenBelow ) andalso
216+ RenewWhenBelow > 0 andalso
217+ RenewWhenBelow =< IncomingWindow ->
218+ gen_statem :cast (Session , {flow_session , IncomingWindow , RenewWhenBelow }).
219+
220+ -spec flow_link (pid (), link_handle (), # 'v1_0.flow' {}, never | pos_integer ()) -> ok .
221+ flow_link (Session , Handle , Flow , RenewWhenBelow ) ->
204222 gen_statem :cast (Session , {flow_link , Handle , Flow , RenewWhenBelow }).
205223
206224% % Sending a disposition on a sender link (with receiver-settle-mode = second)
@@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
239257 channel = Channel ,
240258 reader = Reader ,
241259 connection_config = ConnConfig ,
260+ incoming_window = ? DEFAULT_MAX_INCOMING_WINDOW ,
261+ auto_flow = {? DEFAULT_MAX_INCOMING_WINDOW div 2 ,
262+ ? DEFAULT_MAX_INCOMING_WINDOW },
242263 early_attach_requests = []},
243264 {ok , unmapped , State }.
244265
@@ -282,15 +303,15 @@ mapped(cast, 'end', State) ->
282303mapped (cast , {flow_link , OutHandle , Flow0 , RenewWhenBelow }, State0 ) ->
283304 State = send_flow_link (OutHandle , Flow0 , RenewWhenBelow , State0 ),
284305 {keep_state , State };
285- mapped (cast , {flow_session , Flow0 = # 'v1_0.flow' { incoming_window = { uint , IncomingWindow }}},
286- # state { next_incoming_id = NII ,
287- next_outgoing_id = NOI } = State ) ->
288- Flow = Flow0 # 'v1_0.flow' {
289- next_incoming_id = maybe_uint ( NII ) ,
290- next_outgoing_id = uint ( NOI ) ,
291- outgoing_window = ? UINT_OUTGOING_WINDOW },
292- ok = send ( Flow , State ),
293- {keep_state , State # state { incoming_window = IncomingWindow } };
306+ mapped (cast , {flow_session , IncomingWindow , RenewWhenBelow }, State0 ) ->
307+ AutoFlow = case RenewWhenBelow of
308+ never -> never ;
309+ _ -> { RenewWhenBelow , IncomingWindow }
310+ end ,
311+ State = State0 # state { incoming_window = IncomingWindow ,
312+ auto_flow = AutoFlow },
313+ send_flow_session ( State ),
314+ {keep_state , State };
294315mapped (cast , # 'v1_0.end' {} = End , State ) ->
295316 % % We receive the first end frame, reply and terminate.
296317 _ = send_end (State ),
@@ -656,35 +677,44 @@ is_bare_message_section(_Section) ->
656677
657678send_flow_link (OutHandle ,
658679 # 'v1_0.flow' {link_credit = {uint , Credit }} = Flow0 , RenewWhenBelow ,
659- # state {links = Links ,
660- next_incoming_id = NII ,
661- next_outgoing_id = NOI ,
662- incoming_window = InWin } = State ) ->
680+ # state {links = Links } = State ) ->
663681 AutoFlow = case RenewWhenBelow of
664682 never -> never ;
665- Limit -> {auto , Limit , Credit }
683+ _ -> {RenewWhenBelow , Credit }
666684 end ,
667685 #{OutHandle := # link {output_handle = H ,
668686 role = receiver ,
669687 delivery_count = DeliveryCount ,
670688 available = Available } = Link } = Links ,
671- Flow = Flow0 # 'v1_0.flow' {
672- handle = uint (H ),
673- % % "This value MUST be set if the peer has received the begin
674- % % frame for the session, and MUST NOT be set if it has not." [2.7.4]
675- next_incoming_id = maybe_uint (NII ),
676- next_outgoing_id = uint (NOI ),
677- outgoing_window = ? UINT_OUTGOING_WINDOW ,
678- incoming_window = uint (InWin ),
679- % % "In the event that the receiving link endpoint has not yet seen the
680- % % initial attach frame from the sender this field MUST NOT be set." [2.7.4]
681- delivery_count = maybe_uint (DeliveryCount ),
682- available = uint (Available )},
689+ Flow1 = Flow0 # 'v1_0.flow' {
690+ handle = uint (H ),
691+ % % "In the event that the receiving link endpoint has not yet seen the
692+ % % initial attach frame from the sender this field MUST NOT be set." [2.7.4]
693+ delivery_count = maybe_uint (DeliveryCount ),
694+ available = uint (Available )},
695+ Flow = set_flow_session_fields (Flow1 , State ),
683696 ok = send (Flow , State ),
684697 State # state {links = Links #{OutHandle =>
685698 Link # link {link_credit = Credit ,
686699 auto_flow = AutoFlow }}}.
687700
701+ send_flow_session (State ) ->
702+ Flow = set_flow_session_fields (# 'v1_0.flow' {}, State ),
703+ ok = send (Flow , State ).
704+
705+ set_flow_session_fields (Flow , # state {next_incoming_id = NID ,
706+ incoming_window = IW ,
707+ next_outgoing_id = NOI }) ->
708+ Flow # 'v1_0.flow' {
709+ % % "This value MUST be set if the peer has received the begin
710+ % % frame for the session, and MUST NOT be set if it has not." [2.7.4]
711+ next_incoming_id = maybe_uint (NID ),
712+ % % IncomingWindow0 can be negative when the sending server overshoots our window.
713+ % % We must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
714+ incoming_window = uint (max (0 , IW )),
715+ next_outgoing_id = uint (NOI ),
716+ outgoing_window = ? UINT_OUTGOING_WINDOW }.
717+
688718build_frames (Channel , Trf , Bin , MaxPayloadSize , Acc )
689719 when byte_size (Bin ) =< MaxPayloadSize ->
690720 T = amqp10_framing :encode_bin (Trf # 'v1_0.transfer' {more = false }),
@@ -1059,17 +1089,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link,
10591089 links = Links #{Handle => book_link_transfer_send (Link )}}.
10601090
10611091book_partial_transfer_received (# state {next_incoming_id = NID ,
1062- remote_outgoing_window = ROW } = State ) ->
1063- State # state {next_incoming_id = add (NID , 1 ),
1064- remote_outgoing_window = ROW - 1 }.
1092+ incoming_window = IW ,
1093+ remote_outgoing_window = ROW } = State0 ) ->
1094+ State = State0 # state {next_incoming_id = add (NID , 1 ),
1095+ incoming_window = IW - 1 ,
1096+ remote_outgoing_window = ROW - 1 },
1097+ maybe_widen_incoming_window (State ).
10651098
10661099book_transfer_received (State = # state {connection_config =
10671100 #{transfer_limit_margin := Margin }},
10681101 # link {link_credit = Margin } = Link ) ->
10691102 {transfer_limit_exceeded , Link , State };
10701103book_transfer_received (# state {next_incoming_id = NID ,
1104+ incoming_window = IW ,
10711105 remote_outgoing_window = ROW ,
1072- links = Links } = State ,
1106+ links = Links } = State0 ,
10731107 # link {output_handle = OutHandle ,
10741108 delivery_count = DC ,
10751109 link_credit = LC ,
@@ -1079,19 +1113,31 @@ book_transfer_received(#state{next_incoming_id = NID,
10791113 % % "the receiver MUST maintain a floor of zero in its
10801114 % % calculation of the value of available" [2.6.7]
10811115 available = max (0 , Avail - 1 )},
1082- State1 = State # state {links = Links #{OutHandle => Link1 },
1083- next_incoming_id = add (NID , 1 ),
1084- remote_outgoing_window = ROW - 1 },
1116+ State1 = State0 # state {links = Links #{OutHandle => Link1 },
1117+ next_incoming_id = add (NID , 1 ),
1118+ incoming_window = IW - 1 ,
1119+ remote_outgoing_window = ROW - 1 },
1120+ State = maybe_widen_incoming_window (State1 ),
10851121 case Link1 of
10861122 # link {link_credit = 0 ,
10871123 auto_flow = never } ->
1088- {credit_exhausted , Link1 , State1 };
1124+ {credit_exhausted , Link1 , State };
10891125 _ ->
1090- {ok , Link1 , State1 }
1126+ {ok , Link1 , State }
10911127 end .
10921128
1129+ maybe_widen_incoming_window (
1130+ State0 = # state {incoming_window = IncomingWindow ,
1131+ auto_flow = {RenewWhenBelow , NewWindowSize }})
1132+ when IncomingWindow < RenewWhenBelow ->
1133+ State = State0 # state {incoming_window = NewWindowSize },
1134+ send_flow_session (State ),
1135+ State ;
1136+ maybe_widen_incoming_window (State ) ->
1137+ State .
1138+
10931139auto_flow (# link {link_credit = LC ,
1094- auto_flow = {auto , RenewWhenBelow , Credit },
1140+ auto_flow = {RenewWhenBelow , Credit },
10951141 output_handle = OutHandle ,
10961142 incoming_unsettled = Unsettled },
10971143 State )
@@ -1230,6 +1276,7 @@ format_status(Status = #{data := Data0}) ->
12301276 remote_channel = RemoteChannel ,
12311277 next_incoming_id = NextIncomingId ,
12321278 incoming_window = IncomingWindow ,
1279+ auto_flow = SessionAutoFlow ,
12331280 next_outgoing_id = NextOutgoingId ,
12341281 remote_incoming_window = RemoteIncomingWindow ,
12351282 remote_outgoing_window = RemoteOutgoingWindow ,
@@ -1294,6 +1341,7 @@ format_status(Status = #{data := Data0}) ->
12941341 remote_channel => RemoteChannel ,
12951342 next_incoming_id => NextIncomingId ,
12961343 incoming_window => IncomingWindow ,
1344+ auto_flow => SessionAutoFlow ,
12971345 next_outgoing_id => NextOutgoingId ,
12981346 remote_incoming_window => RemoteIncomingWindow ,
12991347 remote_outgoing_window => RemoteOutgoingWindow ,
0 commit comments