Skip to content

Commit b4ea7e8

Browse files
author
ladeak
committed
Using Interlocked in InputFlowControl and Http2Stream
1 parent 4bc867d commit b4ea7e8

File tree

5 files changed

+509
-99
lines changed

5 files changed

+509
-99
lines changed

src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/FlowControl.cs

Lines changed: 0 additions & 57 deletions
This file was deleted.

src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/InputFlowControl.cs

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,94 +19,125 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
1919
/// <seealso href="https://datatracker.ietf.org/doc/html/rfc9113#name-flow-control"/>
2020
internal sealed class InputFlowControl
2121
{
22-
private readonly int _initialWindowSize;
22+
private struct FlowControlState
23+
{
24+
private const long AbortedBitMask = 1L << 32; // uint MaxValue + 1
25+
internal long _state;
26+
27+
public FlowControlState(uint initialWindowSize, bool isAborted)
28+
{
29+
_state = initialWindowSize;
30+
if (isAborted)
31+
{
32+
_state |= AbortedBitMask;
33+
}
34+
}
35+
36+
public uint Available => (uint)_state;
37+
38+
public bool IsAborted => _state > uint.MaxValue;
39+
}
40+
41+
private readonly uint _initialWindowSize;
2342
private readonly int _minWindowSizeIncrement;
2443

25-
private FlowControl _flow;
44+
private FlowControlState _flow;
2645
private int _pendingUpdateSize;
2746
private bool _windowUpdatesDisabled;
28-
private readonly object _flowLock = new object();
2947

3048
public InputFlowControl(uint initialWindowSize, uint minWindowSizeIncrement)
3149
{
3250
Debug.Assert(initialWindowSize >= minWindowSizeIncrement, "minWindowSizeIncrement is greater than the window size.");
3351

34-
_flow = new FlowControl(initialWindowSize);
35-
_initialWindowSize = (int)initialWindowSize;
52+
_flow = new FlowControlState(initialWindowSize, false);
53+
_initialWindowSize = initialWindowSize;
3654
_minWindowSizeIncrement = (int)minWindowSizeIncrement;
3755
}
3856

3957
public bool IsAvailabilityLow => _flow.Available < _minWindowSizeIncrement;
4058

59+
// Test hook, not participating in mutual exclusion
60+
internal uint Available => _flow.Available;
61+
4162
public void Reset()
4263
{
43-
_flow = new FlowControl((uint)_initialWindowSize);
64+
_flow = new FlowControlState(_initialWindowSize, false);
4465
_pendingUpdateSize = 0;
4566
_windowUpdatesDisabled = false;
4667
}
4768

4869
public bool TryAdvance(int bytes)
4970
{
50-
lock (_flowLock)
71+
FlowControlState currentFlow, computedFlow;
72+
do
5173
{
74+
currentFlow = _flow; // Copy
5275
// Even if the stream is aborted, the client should never send more data than was available in the
5376
// flow-control window at the time of the abort.
54-
if (bytes > _flow.Available)
77+
if (bytes > currentFlow.Available)
5578
{
5679
throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorFlowControlWindowExceeded, Http2ErrorCode.FLOW_CONTROL_ERROR, ConnectionEndReason.FlowControlWindowExceeded);
5780
}
5881

59-
if (_flow.IsAborted)
82+
if (currentFlow.IsAborted)
6083
{
61-
// This data won't be read by the app, so tell the caller to count the data as already consumed.
6284
return false;
6385
}
6486

65-
_flow.Advance(bytes);
66-
return true;
67-
}
87+
computedFlow = new FlowControlState(currentFlow.Available - (uint)bytes, isAborted: false);
88+
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));
89+
90+
return true;
6891
}
6992

7093
public bool TryUpdateWindow(int bytes, out int updateSize)
7194
{
72-
lock (_flowLock)
95+
FlowControlState currentFlow, computedFlow;
96+
do
7397
{
7498
updateSize = 0;
75-
76-
if (_flow.IsAborted)
99+
currentFlow = _flow; // Copy
100+
if (currentFlow.IsAborted)
77101
{
78102
// All data received by stream has already been returned to the connection window.
79103
return false;
80104
}
81105

82-
if (!_flow.TryUpdateWindow(bytes))
106+
var maxUpdate = int.MaxValue - currentFlow.Available;
107+
if (bytes > maxUpdate)
83108
{
84109
// We only try to update the window back to its initial size after the app consumes data.
85110
// It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize.
86111
Debug.Assert(false, $"{nameof(TryUpdateWindow)} attempted to grow window past max size.");
87112
}
113+
computedFlow = new FlowControlState(currentFlow.Available + (uint)bytes, isAborted: false);
114+
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));
88115

89-
if (_windowUpdatesDisabled)
90-
{
91-
// Continue returning space to the connection window. The end of the stream has already
92-
// been received, so don't send window updates for the stream window.
93-
return true;
94-
}
95-
96-
var potentialUpdateSize = _pendingUpdateSize + bytes;
116+
if (_windowUpdatesDisabled)
117+
{
118+
// Continue returning space to the connection window. The end of the stream has already
119+
// been received, so don't send window updates for the stream window.
120+
return true;
121+
}
97122

123+
int computedPendingUpdateSize, currentPendingSize;
124+
do
125+
{
126+
updateSize = 0;
127+
currentPendingSize = _pendingUpdateSize;
128+
var potentialUpdateSize = currentPendingSize + bytes;
98129
if (potentialUpdateSize > _minWindowSizeIncrement)
99130
{
100-
_pendingUpdateSize = 0;
131+
computedPendingUpdateSize = 0;
101132
updateSize = potentialUpdateSize;
102133
}
103134
else
104135
{
105-
_pendingUpdateSize = potentialUpdateSize;
136+
computedPendingUpdateSize = potentialUpdateSize;
106137
}
138+
} while (currentPendingSize != Interlocked.CompareExchange(ref _pendingUpdateSize, computedPendingUpdateSize, currentPendingSize));
107139

108-
return true;
109-
}
140+
return true;
110141
}
111142

112143
public void StopWindowUpdates()
@@ -116,18 +147,21 @@ public void StopWindowUpdates()
116147

117148
public int Abort()
118149
{
119-
lock (_flowLock)
150+
FlowControlState currentFlow, computedFlow;
151+
do
120152
{
121-
if (_flow.IsAborted)
153+
currentFlow = _flow; // Copy
154+
if (currentFlow.IsAborted)
122155
{
123156
return 0;
124157
}
125158

126-
_flow.Abort();
159+
computedFlow = new FlowControlState(currentFlow.Available, isAborted: true);
160+
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));
127161

128-
// Tell caller to return connection window space consumed by this stream. Even if window updates have
129-
// been disabled at the stream level, connection-level window updates may still be necessary.
130-
return _initialWindowSize - _flow.Available;
131-
}
162+
// Tell caller to return connection window space consumed by this stream. Even if window updates have
163+
// been disabled at the stream level, connection-level window updates may still be necessary.
164+
return (int)(_initialWindowSize - computedFlow.Available);
132165
}
133166
}
167+

src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -630,16 +630,11 @@ public void DecrementActiveClientStreamCount()
630630
{
631631
// Decrement can be called twice, via calling CompleteAsync and then Abort on the HttpContext.
632632
// Only decrement once total.
633-
lock (_completionLock)
633+
if (Interlocked.CompareExchange(ref _decrementCalled, true, false))
634634
{
635-
if (_decrementCalled)
636-
{
637635
return;
638636
}
639637

640-
_decrementCalled = true;
641-
}
642-
643638
_context.StreamLifetimeHandler.DecrementActiveClientStreamCount();
644639
}
645640

0 commit comments

Comments
 (0)