Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,94 +19,125 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
/// <seealso href="https://datatracker.ietf.org/doc/html/rfc9113#name-flow-control"/>
internal sealed class InputFlowControl
{
private readonly int _initialWindowSize;
private struct FlowControlState
{
private const long AbortedBitMask = 1L << 32; // uint MaxValue + 1
internal long _state;

public FlowControlState(int size, bool isAborted)
{
_state = (uint)size; // Casted first to uint before assigning it to a long field to the address negative values
if (isAborted)
{
_state |= AbortedBitMask;
}
}

public int Available => (int)_state;

public bool IsAborted => _state > uint.MaxValue;
}

private readonly uint _initialWindowSize;
private readonly int _minWindowSizeIncrement;

private FlowControl _flow;
private FlowControlState _flow;
private int _pendingUpdateSize;
private bool _windowUpdatesDisabled;
private readonly Lock _flowLock = new();

public InputFlowControl(uint initialWindowSize, uint minWindowSizeIncrement)
{
Debug.Assert(initialWindowSize >= minWindowSizeIncrement, "minWindowSizeIncrement is greater than the window size.");

_flow = new FlowControl(initialWindowSize);
_initialWindowSize = (int)initialWindowSize;
_flow = new FlowControlState((int)initialWindowSize, isAborted: false);
_initialWindowSize = initialWindowSize;
_minWindowSizeIncrement = (int)minWindowSizeIncrement;
}

public bool IsAvailabilityLow => _flow.Available < _minWindowSizeIncrement;

// Test hook
internal int Available => _flow.Available;
internal bool IsAborted => _flow.IsAborted;

public void Reset()
{
_flow = new FlowControl((uint)_initialWindowSize);
_flow = new FlowControlState((int)_initialWindowSize, isAborted: false);
_pendingUpdateSize = 0;
_windowUpdatesDisabled = false;
}

public bool TryAdvance(int bytes)
{
lock (_flowLock)
FlowControlState currentFlow, computedFlow;
do
{
currentFlow = _flow; // Copy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be a volatile read or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have based the implementation on the documentation: https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.compareexchange?view=net-8.0 (see the examples)

I know there is another implementation, which honestly 'reads' safer to me, happy to update if that is preferred:

do
{
      startValue = currentValue; 
      desiredValue = f(startVal); 
      currentValue = Interlocked.CompareExchange(ref target, desiredValue, startValue); 
}
while (startValue != currentValue); 


// Even if the stream is aborted, the client should never send more data than was available in the
// flow-control window at the time of the abort.
if (bytes > _flow.Available)
if (bytes > currentFlow.Available)
{
throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorFlowControlWindowExceeded, Http2ErrorCode.FLOW_CONTROL_ERROR, ConnectionEndReason.FlowControlWindowExceeded);
}

if (_flow.IsAborted)
if (currentFlow.IsAborted)
{
// This data won't be read by the app, so tell the caller to count the data as already consumed.
return false;
}

_flow.Advance(bytes);
return true;
}
computedFlow = new FlowControlState(currentFlow.Available - bytes, isAborted: false);
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));

return true;
}

public bool TryUpdateWindow(int bytes, out int updateSize)
{
lock (_flowLock)
FlowControlState currentFlow, computedFlow;
do
{
updateSize = 0;

if (_flow.IsAborted)
currentFlow = _flow; // Copy
if (currentFlow.IsAborted)
{
// All data received by stream has already been returned to the connection window.
return false;
}

if (!_flow.TryUpdateWindow(bytes))
{
// We only try to update the window back to its initial size after the app consumes data.
// It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize.
Debug.Assert(false, $"{nameof(TryUpdateWindow)} attempted to grow window past max size.");
}
var maxUpdate = Http2PeerSettings.MaxWindowSize - currentFlow.Available;

if (_windowUpdatesDisabled)
{
// Continue returning space to the connection window. The end of the stream has already
// been received, so don't send window updates for the stream window.
return true;
}
// We only try to update the window back to its initial size after the app consumes data.
// It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize.
Debug.Assert(bytes <= maxUpdate, $"{nameof(TryUpdateWindow)} attempted to grow window past max size.");
computedFlow = new FlowControlState(currentFlow.Available + bytes, isAborted: false);
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));

var potentialUpdateSize = _pendingUpdateSize + bytes;
if (_windowUpdatesDisabled)
{
// Continue returning space to the connection window. The end of the stream has already
// been received, so don't send window updates for the stream window.
return true;
}

int computedPendingUpdateSize, currentPendingSize;
do
{
updateSize = 0;
currentPendingSize = _pendingUpdateSize;
var potentialUpdateSize = currentPendingSize + bytes;
if (potentialUpdateSize > _minWindowSizeIncrement)
{
_pendingUpdateSize = 0;
computedPendingUpdateSize = 0;
updateSize = potentialUpdateSize;
}
else
{
_pendingUpdateSize = potentialUpdateSize;
computedPendingUpdateSize = potentialUpdateSize;
}
} while (currentPendingSize != Interlocked.CompareExchange(ref _pendingUpdateSize, computedPendingUpdateSize, currentPendingSize));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the locking version, the same thread that updated _flow updated _pendingUpdateSize; now it seems like different threads could update them. It's not obvious to me whether that matters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell it does not, but I am not sure if I missed any use-case.


return true;
}
return true;
}

public void StopWindowUpdates()
Expand All @@ -116,18 +147,21 @@ public void StopWindowUpdates()

public int Abort()
{
lock (_flowLock)
FlowControlState currentFlow, computedFlow;
do
{
if (_flow.IsAborted)
currentFlow = _flow; // Copy
if (currentFlow.IsAborted)
{
return 0;
}

_flow.Abort();
computedFlow = new FlowControlState(currentFlow.Available, isAborted: true);
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));

// Tell caller to return connection window space consumed by this stream. Even if window updates have
// been disabled at the stream level, connection-level window updates may still be necessary.
return _initialWindowSize - _flow.Available;
}
// Tell caller to return connection window space consumed by this stream. Even if window updates have
// been disabled at the stream level, connection-level window updates may still be necessary.
return (int)(_initialWindowSize - computedFlow.Available);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -630,16 +630,11 @@ public void DecrementActiveClientStreamCount()
{
// Decrement can be called twice, via calling CompleteAsync and then Abort on the HttpContext.
// Only decrement once total.
lock (_completionLock)
if (Interlocked.CompareExchange(ref _decrementCalled, true, false))
{
if (_decrementCalled)
{
return;
}

_decrementCalled = true;
}

_context.StreamLifetimeHandler.DecrementActiveClientStreamCount();
}

Expand Down
Loading
Loading