Skip to content

Commit 8ee7894

Browse files
committed
Add backpressure when rapidly creating new stateful Streamable HTTP sessions
1 parent 5e5b1af commit 8ee7894

File tree

2 files changed

+78
-12
lines changed

2 files changed

+78
-12
lines changed

src/ModelContextProtocol.AspNetCore/HttpMcpSession.cs

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,27 @@
11
using ModelContextProtocol.AspNetCore.Stateless;
22
using ModelContextProtocol.Protocol;
33
using ModelContextProtocol.Server;
4+
using System.Diagnostics;
45
using System.Security.Claims;
6+
using System.Threading;
57

68
namespace ModelContextProtocol.AspNetCore;
79

810
internal sealed class HttpMcpSession<TTransport>(
911
string sessionId,
1012
TTransport transport,
1113
UserIdClaim? userId,
12-
TimeProvider timeProvider) : IAsyncDisposable
14+
TimeProvider timeProvider,
15+
SemaphoreSlim? idleSessionSemaphore = null) : IAsyncDisposable
1316
where TTransport : ITransport
1417
{
1518
private int _referenceCount;
1619
private int _getRequestStarted;
17-
private CancellationTokenSource _disposeCts = new();
20+
private bool _isDisposed;
21+
22+
private readonly SemaphoreSlim? _idleSessionSemaphore = idleSessionSemaphore;
23+
private readonly CancellationTokenSource _disposeCts = new();
24+
private readonly object _referenceCountLock = new();
1825

1926
public string Id { get; } = sessionId;
2027
public TTransport Transport { get; } = transport;
@@ -30,16 +37,39 @@ internal sealed class HttpMcpSession<TTransport>(
3037
public IMcpServer? Server { get; set; }
3138
public Task? ServerRunTask { get; set; }
3239

33-
public IDisposable AcquireReference()
40+
public IAsyncDisposable AcquireReference()
3441
{
35-
Interlocked.Increment(ref _referenceCount);
42+
Debug.Assert(_idleSessionSemaphore is not null, "Only StreamableHttpHandler should call AcquireReference.");
43+
44+
lock (_referenceCountLock)
45+
{
46+
if (!_isDisposed && ++_referenceCount == 1)
47+
{
48+
// Non-idle sessions should not prevent session creation.
49+
_idleSessionSemaphore.Release();
50+
}
51+
}
52+
3653
return new UnreferenceDisposable(this);
3754
}
3855

3956
public bool TryStartGetRequest() => Interlocked.Exchange(ref _getRequestStarted, 1) == 0;
4057

4158
public async ValueTask DisposeAsync()
4259
{
60+
bool shouldReleaseIdleSessionSemaphore;
61+
62+
lock (_referenceCountLock)
63+
{
64+
if (_isDisposed)
65+
{
66+
return;
67+
}
68+
69+
_isDisposed = true;
70+
shouldReleaseIdleSessionSemaphore = _referenceCount == 0;
71+
}
72+
4373
try
4474
{
4575
await _disposeCts.CancelAsync();
@@ -65,20 +95,39 @@ public async ValueTask DisposeAsync()
6595
{
6696
await Transport.DisposeAsync();
6797
_disposeCts.Dispose();
98+
99+
// If the session was disposed while it was inactive, we need to release the semaphore
100+
// to allow new sessions to be created.
101+
if (_idleSessionSemaphore is not null && shouldReleaseIdleSessionSemaphore)
102+
{
103+
_idleSessionSemaphore.Release();
104+
}
68105
}
69106
}
70107
}
71108

72-
public bool HasSameUserId(ClaimsPrincipal user)
73-
=> UserIdClaim == StreamableHttpHandler.GetUserIdClaim(user);
109+
public bool HasSameUserId(ClaimsPrincipal user) => UserIdClaim == StreamableHttpHandler.GetUserIdClaim(user);
74110

75-
private sealed class UnreferenceDisposable(HttpMcpSession<TTransport> session) : IDisposable
111+
private sealed class UnreferenceDisposable(HttpMcpSession<TTransport> session) : IAsyncDisposable
76112
{
77-
public void Dispose()
113+
public async ValueTask DisposeAsync()
78114
{
79-
if (Interlocked.Decrement(ref session._referenceCount) == 0)
115+
Debug.Assert(session._idleSessionSemaphore is not null, "Only StreamableHttpHandler should call AcquireReference.");
116+
117+
bool shouldMarkSessionIdle;
118+
119+
lock (session._referenceCountLock)
120+
{
121+
shouldMarkSessionIdle = !session._isDisposed && --session._referenceCount == 0;
122+
}
123+
124+
if (shouldMarkSessionIdle)
80125
{
81126
session.LastActivityTicks = session.TimeProvider.GetTimestamp();
127+
128+
// Acquire semaphore when session becomes inactive (reference count goes to 0) to slow
129+
// down session creation until idle sessions are disposed by the background service.
130+
await session._idleSessionSemaphore.WaitAsync();
82131
}
83132
}
84133
}

src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ internal sealed class StreamableHttpHandler(
3131

3232
public ConcurrentDictionary<string, HttpMcpSession<StreamableHttpServerTransport>> Sessions { get; } = new(StringComparer.Ordinal);
3333

34+
// Semaphore to control session creation backpressure when there are too many idle sessions
35+
// Initial and max count is 10% more than MaxIdleSessionCount (or 100 more if that's higher)
36+
private readonly SemaphoreSlim _idleSessionSemaphore = CreateIdleSessionSemaphore(httpServerTransportOptions.Value);
37+
3438
public HttpServerTransportOptions HttpServerTransportOptions => httpServerTransportOptions.Value;
3539

3640
private IDataProtector Protector { get; } = dataProtection.CreateProtector("Microsoft.AspNetCore.StreamableHttpHandler.StatelessSessionId");
@@ -58,7 +62,7 @@ await WriteJsonRpcErrorAsync(context,
5862

5963
try
6064
{
61-
using var _ = session.AcquireReference();
65+
await using var _ = session.AcquireReference();
6266

6367
InitializeSseResponse(context);
6468
var wroteResponse = await session.Transport.HandlePostRequest(new HttpDuplexPipe(context), context.RequestAborted);
@@ -106,7 +110,7 @@ await WriteJsonRpcErrorAsync(context,
106110
return;
107111
}
108112

109-
using var _ = session.AcquireReference();
113+
await using var _ = session.AcquireReference();
110114
InitializeSseResponse(context);
111115

112116
// We should flush headers to indicate a 200 success quickly, because the initialization response
@@ -184,6 +188,11 @@ private async ValueTask<HttpMcpSession<StreamableHttpServerTransport>> StartNewS
184188

185189
if (!HttpServerTransportOptions.Stateless)
186190
{
191+
// Acquire semaphore before creating stateful sessions to create backpressure.
192+
// This semaphore represents "slots" for idle sessions, and we may need to wait on the
193+
// IdleTrackingBackgroundService to dispose idle sessions before continuing.
194+
await _idleSessionSemaphore.WaitAsync(context.RequestAborted);
195+
187196
sessionId = MakeNewSessionId();
188197
transport = new()
189198
{
@@ -248,7 +257,8 @@ private async ValueTask<HttpMcpSession<StreamableHttpServerTransport>> CreateSes
248257
context.Features.Set(server);
249258

250259
var userIdClaim = statelessId?.UserIdClaim ?? GetUserIdClaim(context.User);
251-
var session = new HttpMcpSession<StreamableHttpServerTransport>(sessionId, transport, userIdClaim, HttpServerTransportOptions.TimeProvider)
260+
var semaphore = HttpServerTransportOptions.Stateless ? null : _idleSessionSemaphore;
261+
var session = new HttpMcpSession<StreamableHttpServerTransport>(sessionId, transport, userIdClaim, HttpServerTransportOptions.TimeProvider, semaphore)
252262
{
253263
Server = server,
254264
};
@@ -337,6 +347,13 @@ private static bool MatchesApplicationJsonMediaType(MediaTypeHeaderValue acceptH
337347
private static bool MatchesTextEventStreamMediaType(MediaTypeHeaderValue acceptHeaderValue)
338348
=> acceptHeaderValue.MatchesMediaType("text/event-stream");
339349

350+
private static SemaphoreSlim CreateIdleSessionSemaphore(HttpServerTransportOptions options)
351+
{
352+
var maxIdleSessionCount = options.MaxIdleSessionCount;
353+
var semaphoreCount = Math.Max(maxIdleSessionCount + 100, (int)(maxIdleSessionCount * 1.1));
354+
return new SemaphoreSlim(semaphoreCount, semaphoreCount);
355+
}
356+
340357
private sealed class HttpDuplexPipe(HttpContext context) : IDuplexPipe
341358
{
342359
public PipeReader Input => context.Request.BodyReader;

0 commit comments

Comments
 (0)