@@ -19,94 +19,125 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
1919/// <seealso href="https://datatracker.ietf.org/doc/html/rfc9113#name-flow-control"/>
2020internal sealed class InputFlowControl
2121{
22- private readonly int _initialWindowSize ;
22+ private record struct FlowControlState
23+ {
24+ private const long AbortedBitMask = 1L << 32 ; // uint MaxValue + 1
25+ internal long _state ;
26+
27+ public FlowControlState ( uint initialWindowSize , bool isAborted )
28+ {
29+ _state = initialWindowSize ;
30+ if ( isAborted )
31+ {
32+ _state |= AbortedBitMask ;
33+ }
34+ }
35+
36+ public uint Available => ( uint ) _state ;
37+
38+ public bool IsAborted => ( _state & AbortedBitMask ) > 0 ;
39+ }
40+
41+ private readonly uint _initialWindowSize ;
2342 private readonly int _minWindowSizeIncrement ;
2443
25- private FlowControl _flow ;
44+ private FlowControlState _flow ;
2645 private int _pendingUpdateSize ;
2746 private bool _windowUpdatesDisabled ;
28- private readonly object _flowLock = new ( ) ;
2947
3048 public InputFlowControl ( uint initialWindowSize , uint minWindowSizeIncrement )
3149 {
3250 Debug . Assert ( initialWindowSize >= minWindowSizeIncrement , "minWindowSizeIncrement is greater than the window size." ) ;
3351
34- _flow = new FlowControl ( initialWindowSize ) ;
35- _initialWindowSize = ( int ) initialWindowSize ;
52+ _flow = new FlowControlState ( initialWindowSize , false ) ;
53+ _initialWindowSize = initialWindowSize ;
3654 _minWindowSizeIncrement = ( int ) minWindowSizeIncrement ;
3755 }
3856
3957 public bool IsAvailabilityLow => _flow . Available < _minWindowSizeIncrement ;
4058
59+ // Test hook, not participating in mutual exclusion
60+ internal uint Available => _flow . Available ;
61+
4162 public void Reset ( )
4263 {
43- _flow = new FlowControl ( ( uint ) _initialWindowSize ) ;
64+ _flow = new FlowControlState ( _initialWindowSize , false ) ;
4465 _pendingUpdateSize = 0 ;
4566 _windowUpdatesDisabled = false ;
4667 }
4768
4869 public bool TryAdvance ( int bytes )
4970 {
50- lock ( _flowLock )
71+ FlowControlState currentFlow , computedFlow ;
72+ do
5173 {
74+ currentFlow = _flow ; // Copy
5275 // Even if the stream is aborted, the client should never send more data than was available in the
5376 // flow-control window at the time of the abort.
54- if ( bytes > _flow . Available )
77+ if ( bytes > currentFlow . Available )
5578 {
5679 throw new Http2ConnectionErrorException ( CoreStrings . Http2ErrorFlowControlWindowExceeded , Http2ErrorCode . FLOW_CONTROL_ERROR , ConnectionEndReason . FlowControlWindowExceeded ) ;
5780 }
5881
59- if ( _flow . IsAborted )
82+ if ( currentFlow . IsAborted )
6083 {
61- // This data won't be read by the app, so tell the caller to count the data as already consumed.
6284 return false ;
6385 }
6486
65- _flow . Advance ( bytes ) ;
66- return true ;
67- }
87+ computedFlow = new FlowControlState ( currentFlow . Available - ( uint ) bytes , currentFlow . IsAborted ) ;
88+ } while ( currentFlow . _state != Interlocked . CompareExchange ( ref _flow . _state , computedFlow . _state , currentFlow . _state ) ) ;
89+
90+ return true ;
6891 }
6992
7093 public bool TryUpdateWindow ( int bytes , out int updateSize )
7194 {
72- lock ( _flowLock )
95+ FlowControlState currentFlow , computedFlow ;
96+ do
7397 {
7498 updateSize = 0 ;
75-
76- if ( _flow . IsAborted )
99+ currentFlow = _flow ; // Copy
100+ if ( currentFlow . IsAborted )
77101 {
78102 // All data received by stream has already been returned to the connection window.
79103 return false ;
80104 }
81105
82- if ( ! _flow . TryUpdateWindow ( bytes ) )
106+ var maxUpdate = int . MaxValue - currentFlow . Available ;
107+ if ( bytes > maxUpdate )
83108 {
84109 // We only try to update the window back to its initial size after the app consumes data.
85110 // It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize.
86111 Debug . Assert ( false , $ "{ nameof ( TryUpdateWindow ) } attempted to grow window past max size.") ;
87112 }
113+ computedFlow = new FlowControlState ( currentFlow . Available + ( uint ) bytes , currentFlow . IsAborted ) ;
114+ } while ( currentFlow . _state != Interlocked . CompareExchange ( ref _flow . _state , computedFlow . _state , currentFlow . _state ) ) ;
88115
89- if ( _windowUpdatesDisabled )
90- {
91- // Continue returning space to the connection window. The end of the stream has already
92- // been received, so don't send window updates for the stream window.
93- return true ;
94- }
95-
96- var potentialUpdateSize = _pendingUpdateSize + bytes ;
116+ if ( _windowUpdatesDisabled )
117+ {
118+ // Continue returning space to the connection window. The end of the stream has already
119+ // been received, so don't send window updates for the stream window.
120+ return true ;
121+ }
97122
123+ int computedPendingUpdateSize , currentPendingSize ;
124+ do
125+ {
126+ updateSize = 0 ;
127+ currentPendingSize = _pendingUpdateSize ;
128+ var potentialUpdateSize = currentPendingSize + bytes ;
98129 if ( potentialUpdateSize > _minWindowSizeIncrement )
99130 {
100- _pendingUpdateSize = 0 ;
131+ computedPendingUpdateSize = 0 ;
101132 updateSize = potentialUpdateSize ;
102133 }
103134 else
104135 {
105- _pendingUpdateSize = potentialUpdateSize ;
136+ computedPendingUpdateSize = potentialUpdateSize ;
106137 }
138+ } while ( currentPendingSize != Interlocked . CompareExchange ( ref _pendingUpdateSize , computedPendingUpdateSize , currentPendingSize ) ) ;
107139
108- return true ;
109- }
140+ return true ;
110141 }
111142
112143 public void StopWindowUpdates ( )
@@ -116,18 +147,21 @@ public void StopWindowUpdates()
116147
117148 public int Abort ( )
118149 {
119- lock ( _flowLock )
150+ FlowControlState currentFlow , computedFlow ;
151+ do
120152 {
121- if ( _flow . IsAborted )
153+ currentFlow = _flow ; // Copy
154+ if ( currentFlow . IsAborted )
122155 {
123156 return 0 ;
124157 }
125158
126- _flow . Abort ( ) ;
159+ computedFlow = new FlowControlState ( currentFlow . Available , true ) ;
160+ } while ( currentFlow . _state != Interlocked . CompareExchange ( ref _flow . _state , computedFlow . _state , currentFlow . _state ) ) ;
127161
128- // Tell caller to return connection window space consumed by this stream. Even if window updates have
129- // been disabled at the stream level, connection-level window updates may still be necessary.
130- return _initialWindowSize - _flow . Available ;
131- }
162+ // Tell caller to return connection window space consumed by this stream. Even if window updates have
163+ // been disabled at the stream level, connection-level window updates may still be necessary.
164+ return ( int ) ( _initialWindowSize - computedFlow . Available ) ;
132165 }
133166}
167+
0 commit comments