- 
                Notifications
    
You must be signed in to change notification settings  - Fork 10.5k
 
Using Interlocked in InputFlowControl and Http2Stream #57968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -19,94 +19,125 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; | |
| /// <seealso href="https://datatracker.ietf.org/doc/html/rfc9113#name-flow-control"/> | ||
| internal sealed class InputFlowControl | ||
| { | ||
| private readonly int _initialWindowSize; | ||
| private struct FlowControlState | ||
| { | ||
| private const long AbortedBitMask = 1L << 32; // uint MaxValue + 1 | ||
| internal long _state; | ||
| 
     | 
||
| public FlowControlState(int size, bool isAborted) | ||
| { | ||
| _state = (uint)size; // Casted first to uint before assigning it to a long field to the address negative values | ||
| if (isAborted) | ||
| { | ||
| _state |= AbortedBitMask; | ||
| } | ||
| } | ||
| 
     | 
||
| public int Available => (int)_state; | ||
| 
     | 
||
| public bool IsAborted => _state > uint.MaxValue; | ||
| } | ||
| 
     | 
||
| private readonly uint _initialWindowSize; | ||
| private readonly int _minWindowSizeIncrement; | ||
| 
     | 
||
| private FlowControl _flow; | ||
| private FlowControlState _flow; | ||
| private int _pendingUpdateSize; | ||
| private bool _windowUpdatesDisabled; | ||
| private readonly Lock _flowLock = new(); | ||
| 
     | 
||
| public InputFlowControl(uint initialWindowSize, uint minWindowSizeIncrement) | ||
| { | ||
| Debug.Assert(initialWindowSize >= minWindowSizeIncrement, "minWindowSizeIncrement is greater than the window size."); | ||
| 
     | 
||
| _flow = new FlowControl(initialWindowSize); | ||
| _initialWindowSize = (int)initialWindowSize; | ||
| _flow = new FlowControlState((int)initialWindowSize, isAborted: false); | ||
| _initialWindowSize = initialWindowSize; | ||
| _minWindowSizeIncrement = (int)minWindowSizeIncrement; | ||
| } | ||
| 
     | 
||
| public bool IsAvailabilityLow => _flow.Available < _minWindowSizeIncrement; | ||
| 
     | 
||
| // Test hook | ||
| internal int Available => _flow.Available; | ||
| internal bool IsAborted => _flow.IsAborted; | ||
| 
     | 
||
| public void Reset() | ||
| { | ||
| _flow = new FlowControl((uint)_initialWindowSize); | ||
| _flow = new FlowControlState((int)_initialWindowSize, isAborted: false); | ||
| _pendingUpdateSize = 0; | ||
| _windowUpdatesDisabled = false; | ||
| } | ||
| 
     | 
||
| public bool TryAdvance(int bytes) | ||
| { | ||
| lock (_flowLock) | ||
| FlowControlState currentFlow, computedFlow; | ||
| do | ||
| { | ||
| currentFlow = _flow; // Copy | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't need to be a volatile read or something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have based the implementation on the documentation: https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.compareexchange?view=net-8.0 (see the examples) I know there is another implementation, which honestly 'reads' safer to me, happy to update if that is preferred: do
{
      startValue = currentValue; 
      desiredValue = f(startVal); 
      currentValue = Interlocked.CompareExchange(ref target, desiredValue, startValue); 
}
while (startValue != currentValue);  | 
||
| 
     | 
||
| // Even if the stream is aborted, the client should never send more data than was available in the | ||
| // flow-control window at the time of the abort. | ||
| if (bytes > _flow.Available) | ||
| if (bytes > currentFlow.Available) | ||
| { | ||
| throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorFlowControlWindowExceeded, Http2ErrorCode.FLOW_CONTROL_ERROR, ConnectionEndReason.FlowControlWindowExceeded); | ||
| } | ||
| 
     | 
||
| if (_flow.IsAborted) | ||
| if (currentFlow.IsAborted) | ||
| { | ||
| // This data won't be read by the app, so tell the caller to count the data as already consumed. | ||
| return false; | ||
| } | ||
| 
     | 
||
| _flow.Advance(bytes); | ||
| return true; | ||
| } | ||
| computedFlow = new FlowControlState(currentFlow.Available - bytes, isAborted: false); | ||
| } while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state)); | ||
| 
     | 
||
| return true; | ||
| } | ||
| 
     | 
||
| public bool TryUpdateWindow(int bytes, out int updateSize) | ||
| { | ||
| lock (_flowLock) | ||
| FlowControlState currentFlow, computedFlow; | ||
| do | ||
| { | ||
| updateSize = 0; | ||
| 
     | 
||
| if (_flow.IsAborted) | ||
| currentFlow = _flow; // Copy | ||
| if (currentFlow.IsAborted) | ||
| { | ||
| // All data received by stream has already been returned to the connection window. | ||
| return false; | ||
| } | ||
| 
     | 
||
| if (!_flow.TryUpdateWindow(bytes)) | ||
| { | ||
| // We only try to update the window back to its initial size after the app consumes data. | ||
| // It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize. | ||
| Debug.Assert(false, $"{nameof(TryUpdateWindow)} attempted to grow window past max size."); | ||
| } | ||
| var maxUpdate = Http2PeerSettings.MaxWindowSize - currentFlow.Available; | ||
| 
     | 
||
| if (_windowUpdatesDisabled) | ||
| { | ||
| // Continue returning space to the connection window. The end of the stream has already | ||
| // been received, so don't send window updates for the stream window. | ||
| return true; | ||
| } | ||
| // We only try to update the window back to its initial size after the app consumes data. | ||
| // It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize. | ||
| Debug.Assert(bytes <= maxUpdate, $"{nameof(TryUpdateWindow)} attempted to grow window past max size."); | ||
| computedFlow = new FlowControlState(currentFlow.Available + bytes, isAborted: false); | ||
| } while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state)); | ||
| 
     | 
||
| var potentialUpdateSize = _pendingUpdateSize + bytes; | ||
| if (_windowUpdatesDisabled) | ||
| { | ||
| // Continue returning space to the connection window. The end of the stream has already | ||
| // been received, so don't send window updates for the stream window. | ||
| return true; | ||
| } | ||
| 
     | 
||
| int computedPendingUpdateSize, currentPendingSize; | ||
| do | ||
| { | ||
| updateSize = 0; | ||
| currentPendingSize = _pendingUpdateSize; | ||
| var potentialUpdateSize = currentPendingSize + bytes; | ||
| if (potentialUpdateSize > _minWindowSizeIncrement) | ||
| { | ||
| _pendingUpdateSize = 0; | ||
| computedPendingUpdateSize = 0; | ||
| updateSize = potentialUpdateSize; | ||
| } | ||
| else | ||
| { | ||
| _pendingUpdateSize = potentialUpdateSize; | ||
| computedPendingUpdateSize = potentialUpdateSize; | ||
| } | ||
| } while (currentPendingSize != Interlocked.CompareExchange(ref _pendingUpdateSize, computedPendingUpdateSize, currentPendingSize)); | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the locking version, the same thread that updated  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can tell it does not, but I am not sure if I missed any use-case.  | 
||
| 
     | 
||
| return true; | ||
| } | ||
| return true; | ||
| } | ||
| 
     | 
||
| public void StopWindowUpdates() | ||
| 
        
          
        
         | 
    @@ -116,18 +147,21 @@ public void StopWindowUpdates() | |
| 
     | 
||
| public int Abort() | ||
| { | ||
| lock (_flowLock) | ||
| FlowControlState currentFlow, computedFlow; | ||
| do | ||
| { | ||
| if (_flow.IsAborted) | ||
| currentFlow = _flow; // Copy | ||
| if (currentFlow.IsAborted) | ||
| { | ||
| return 0; | ||
| } | ||
| 
     | 
||
| _flow.Abort(); | ||
| computedFlow = new FlowControlState(currentFlow.Available, isAborted: true); | ||
| } while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state)); | ||
| 
     | 
||
| // Tell caller to return connection window space consumed by this stream. Even if window updates have | ||
| // been disabled at the stream level, connection-level window updates may still be necessary. | ||
| return _initialWindowSize - _flow.Available; | ||
| } | ||
| // Tell caller to return connection window space consumed by this stream. Even if window updates have | ||
| // been disabled at the stream level, connection-level window updates may still be necessary. | ||
| return (int)(_initialWindowSize - computedFlow.Available); | ||
| } | ||
| } | ||
| 
     | 
||
Uh oh!
There was an error while loading. Please reload this page.