Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ public static class HttpMcpServerBuilderExtensions
{
/// <summary>
/// Adds the services necessary for <see cref="M:McpEndpointRouteBuilderExtensions.MapMcp"/>
/// to handle MCP requests and sessions using the MCP HTTP Streaming transport. For more information on configuring the underlying HTTP server
/// to handle MCP requests and sessions using the MCP Streamable HTTP transport. For more information on configuring the underlying HTTP server
/// to control things like port binding custom TLS certificates, see the <see href="https://learn.microsoft.com/aspnet/core/fundamentals/minimal-apis">Minimal APIs quick reference</see>.
/// </summary>
/// <param name="builder">The builder instance.</param>
/// <param name="configureOptions">Configures options for the HTTP Streaming transport. This allows configuring per-session
/// <param name="configureOptions">Configures options for the Streamable HTTP transport. This allows configuring per-session
/// <see cref="McpServerOptions"/> and running logic before and after a session.</param>
/// <returns>The builder provided in <paramref name="builder"/>.</returns>
/// <exception cref="ArgumentNullException"><paramref name="builder"/> is <see langword="null"/>.</exception>
public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action<HttpServerTransportOptions>? configureOptions = null)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.TryAddSingleton<StreamableHttpHandler>();
builder.Services.TryAddSingleton<SseHandler>();
builder.Services.AddHostedService<IdleTrackingBackgroundService>();

if (configureOptions is not null)
{
Expand Down
66 changes: 60 additions & 6 deletions src/ModelContextProtocol.AspNetCore/HttpMcpSession.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,61 @@
using ModelContextProtocol.Protocol.Transport;
using ModelContextProtocol.Server;
using System.Security.Claims;

namespace ModelContextProtocol.AspNetCore;

internal class HttpMcpSession
internal sealed class HttpMcpSession<TTransport>(string sessionId, TTransport transport, ClaimsPrincipal user, TimeProvider timeProvider) : IAsyncDisposable
where TTransport : ITransport
{
public HttpMcpSession(SseResponseStreamTransport transport, ClaimsPrincipal user)
private int _referenceCount;
private int _getRequestStarted;
private CancellationTokenSource _disposeCts = new();

public string Id { get; } = sessionId;
public TTransport Transport { get; } = transport;
public (string Type, string Value, string Issuer)? UserIdClaim { get; } = GetUserIdClaim(user);

public CancellationToken SessionClosed => _disposeCts.Token;

public bool IsActive => !SessionClosed.IsCancellationRequested && _referenceCount > 0;
public long LastActivityTicks { get; private set; } = timeProvider.GetUtcNow().UtcTicks;

public bool TryStartGetRequest() => Interlocked.Exchange(ref _getRequestStarted, 1) == 0;

public IMcpServer? Server { get; set; }
public Task? ServerRunTask { get; set; }

public IDisposable AcquireReference()
{
Transport = transport;
UserIdClaim = GetUserIdClaim(user);
Interlocked.Increment(ref _referenceCount);
return new UnreferenceDisposable(this, timeProvider);
}

public SseResponseStreamTransport Transport { get; }
public (string Type, string Value, string Issuer)? UserIdClaim { get; }
public async ValueTask DisposeAsync()
{
try
{
await _disposeCts.CancelAsync();

if (ServerRunTask is not null)
{
await ServerRunTask;
}
}
catch (OperationCanceledException)
{
}
finally
{
if (Server is not null)
{
await Server.DisposeAsync();
}

await Transport.DisposeAsync();
_disposeCts.Dispose();
}
}

public bool HasSameUserId(ClaimsPrincipal user)
=> UserIdClaim == GetUserIdClaim(user);
Expand All @@ -36,4 +79,15 @@ private static (string Type, string Value, string Issuer)? GetUserIdClaim(Claims

return null;
}

private sealed class UnreferenceDisposable(HttpMcpSession<TTransport> session, TimeProvider timeProvider) : IDisposable
{
public void Dispose()
{
if (Interlocked.Decrement(ref session._referenceCount) == 0)
{
session.LastActivityTicks = timeProvider.GetUtcNow().UtcTicks;
}
}
}
}
14 changes: 14 additions & 0 deletions src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,18 @@ public class HttpServerTransportOptions
/// This is useful for running logic before a sessions starts and after it completes.
/// </summary>
public Func<HttpContext, IMcpServer, CancellationToken, Task>? RunSessionHandler { get; set; }

/// <summary>
/// Represents the duration of time the server will wait between any active requests before timing out an
/// MCP session. This is checked in background every 5 seconds. A client trying to resume a session will
/// receive a 404 status code and should restart their session. A client can keep their session open by
/// keeping a GET request open. The default value is set to 2 minutes.
/// minutes.
/// </summary>
public TimeSpan IdleTimeout { get; set; } = TimeSpan.FromMinutes(2);

/// <summary>
/// Used for testing the <see cref="IdleTimeout"/>.
/// </summary>
public TimeProvider TimeProvider { get; set; } = TimeProvider.System;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ModelContextProtocol.Protocol.Transport;

namespace ModelContextProtocol.AspNetCore;

internal sealed partial class IdleTrackingBackgroundService(
StreamableHttpHandler handler,
IOptions<HttpServerTransportOptions> options,
ILogger<IdleTrackingBackgroundService> logger) : BackgroundService
{
// The compiler will complain about the parameter being unused otherwise despite the source generator.
private ILogger _ = logger;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var timeProvider = options.Value.TimeProvider;
var timer = new PeriodicTimer(TimeSpan.FromSeconds(5), timeProvider);

try
{
while (!stoppingToken.IsCancellationRequested && await timer.WaitForNextTickAsync(stoppingToken))
{
var idleActivityCutoff = timeProvider.GetUtcNow().Ticks - options.Value.IdleTimeout.Ticks;

foreach (var (_, session) in handler.Sessions)
{
if (session.IsActive || session.LastActivityTicks > idleActivityCutoff)
{
continue;
}

if (handler.Sessions.TryRemove(session.Id, out var removedSession))
{
LogSessionIdle(removedSession.Id);
await DisposeSessionAsync(removedSession);
}
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
}
finally
{
if (stoppingToken.IsCancellationRequested)
{
List<Task> disposeSessionTasks = [];

foreach (var (sessionKey, _) in handler.Sessions)
{
if (handler.Sessions.TryRemove(sessionKey, out var session))
{
disposeSessionTasks.Add(DisposeSessionAsync(session));
}
}

await Task.WhenAll(disposeSessionTasks);
}
}
}

private async Task DisposeSessionAsync(HttpMcpSession<StreamableHttpServerTransport> session)
{
try
{
await session.DisposeAsync();
}
catch (Exception ex)
{
LogSessionDisposeError(session.Id, ex);
}
}

[LoggerMessage(Level = LogLevel.Information, Message = "Closing idle session {sessionId}.")]
private partial void LogSessionIdle(string sessionId);

[LoggerMessage(Level = LogLevel.Error, Message = "Error disposing the IMcpServer for session {sessionId}.")]
private partial void LogSessionDisposeError(string sessionId, Exception ex);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Metadata;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using ModelContextProtocol.AspNetCore;
using ModelContextProtocol.Protocol.Messages;
using System.Diagnostics.CodeAnalysis;

namespace Microsoft.AspNetCore.Builder;
Expand All @@ -11,21 +14,42 @@ namespace Microsoft.AspNetCore.Builder;
public static class McpEndpointRouteBuilderExtensions
{
/// <summary>
/// Sets up endpoints for handling MCP HTTP Streaming transport.
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the protocol specification</see> for details about the Streamable HTTP transport.
/// Sets up endpoints for handling MCP Streamable HTTP transport.
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the 2025-03-26 protocol specification</see> for details about the Streamable HTTP transport.
/// Also maps legacy SSE endpoints for backward compatibility at the path "/sse" and "/message". <see href="https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">the 2024-11-05 protocol specification</see> for details about the HTTP with SSE transport.
/// </summary>
/// <param name="endpoints">The web application to attach MCP HTTP endpoints.</param>
/// <param name="pattern">The route pattern prefix to map to.</param>
/// <returns>Returns a builder for configuring additional endpoint conventions like authorization policies.</returns>
public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpoints, [StringSyntax("Route")] string pattern = "")
{
var handler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
var streamableHttpHandler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
throw new InvalidOperationException("You must call WithHttpTransport(). Unable to find required services. Call builder.Services.AddMcpServer().WithHttpTransport() in application startup code.");

var routeGroup = endpoints.MapGroup(pattern);
routeGroup.MapGet("", handler.HandleRequestAsync);
routeGroup.MapGet("/sse", handler.HandleRequestAsync);
routeGroup.MapPost("/message", handler.HandleRequestAsync);
return routeGroup;
var mcpGroup = endpoints.MapGroup(pattern);
var streamableHttpGroup = mcpGroup.MapGroup("")
.WithDisplayName(b => $"MCP Streamable HTTP | {b.DisplayName}")
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status404NotFound, typeof(JsonRpcError), contentTypes: ["application/json"]));

streamableHttpGroup.MapPost("", streamableHttpHandler.HandlePostRequestAsync)
.WithMetadata(new AcceptsMetadata(["application/json"]))
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]))
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));
streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync)
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
streamableHttpGroup.MapDelete("", streamableHttpHandler.HandleDeleteRequestAsync);

// Map legacy HTTP with SSE endpoints.
var sseHandler = endpoints.ServiceProvider.GetRequiredService<SseHandler>();
var sseGroup = mcpGroup.MapGroup("")
.WithDisplayName(b => $"MCP HTTP with SSE | {b.DisplayName}");

sseGroup.MapGet("/sse", sseHandler.HandleSseRequestAsync)
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
sseGroup.MapPost("/message", sseHandler.HandleMessageRequestAsync)
.WithMetadata(new AcceptsMetadata(["application/json"]))
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));

return mcpGroup;
}
}
110 changes: 110 additions & 0 deletions src/ModelContextProtocol.AspNetCore/SseHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ModelContextProtocol.Protocol.Messages;
using ModelContextProtocol.Protocol.Transport;
using ModelContextProtocol.Server;
using ModelContextProtocol.Utils.Json;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace ModelContextProtocol.AspNetCore;

internal sealed class SseHandler(
IOptions<McpServerOptions> mcpServerOptionsSnapshot,
IOptionsFactory<McpServerOptions> mcpServerOptionsFactory,
IOptions<HttpServerTransportOptions> httpMcpServerOptions,
IHostApplicationLifetime hostApplicationLifetime,
ILoggerFactory loggerFactory)
{
private readonly ConcurrentDictionary<string, HttpMcpSession<SseResponseStreamTransport>> _sessions = new(StringComparer.Ordinal);

public async Task HandleSseRequestAsync(HttpContext context)
{
var sessionId = StreamableHttpHandler.MakeNewSessionId();

// If the server is shutting down, we need to cancel all SSE connections immediately without waiting for HostOptions.ShutdownTimeout
// which defaults to 30 seconds.
using var sseCts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted, hostApplicationLifetime.ApplicationStopping);
var cancellationToken = sseCts.Token;

StreamableHttpHandler.InitializeSseResponse(context);

await using var transport = new SseResponseStreamTransport(context.Response.Body, $"message?sessionId={sessionId}");
await using var httpMcpSession = new HttpMcpSession<SseResponseStreamTransport>(sessionId, transport, context.User, httpMcpServerOptions.Value.TimeProvider);
if (!_sessions.TryAdd(sessionId, httpMcpSession))
{
throw new UnreachableException($"Unreachable given good entropy! Session with ID '{sessionId}' has already been created.");
}

try
{
var mcpServerOptions = mcpServerOptionsSnapshot.Value;
if (httpMcpServerOptions.Value.ConfigureSessionOptions is { } configureSessionOptions)
{
mcpServerOptions = mcpServerOptionsFactory.Create(Options.DefaultName);
await configureSessionOptions(context, mcpServerOptions, cancellationToken);
}

var transportTask = transport.RunAsync(cancellationToken);

try
{
await using var mcpServer = McpServerFactory.Create(transport, mcpServerOptions, loggerFactory, context.RequestServices);
httpMcpSession.Server = mcpServer;
context.Features.Set(mcpServer);

var runSessionAsync = httpMcpServerOptions.Value.RunSessionHandler ?? StreamableHttpHandler.RunSessionAsync;
httpMcpSession.ServerRunTask = runSessionAsync(context, mcpServer, cancellationToken);
await httpMcpSession.ServerRunTask;
}
finally
{
await transport.DisposeAsync();
await transportTask;
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// RequestAborted always triggers when the client disconnects before a complete response body is written,
// but this is how SSE connections are typically closed.
}
finally
{
_sessions.TryRemove(sessionId, out _);
}
}

public async Task HandleMessageRequestAsync(HttpContext context)
{
if (!context.Request.Query.TryGetValue("sessionId", out var sessionId))
{
await Results.BadRequest("Missing sessionId query parameter.").ExecuteAsync(context);
return;
}

if (!_sessions.TryGetValue(sessionId.ToString(), out var httpMcpSession))
{
await Results.BadRequest($"Session ID not found.").ExecuteAsync(context);
return;
}

if (!httpMcpSession.HasSameUserId(context.User))
{
await Results.Forbid().ExecuteAsync(context);
return;
}

var message = (JsonRpcMessage?)await context.Request.ReadFromJsonAsync(McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage)), context.RequestAborted);
if (message is null)
{
await Results.BadRequest("No message in request body.").ExecuteAsync(context);
return;
}

await httpMcpSession.Transport.OnMessageReceivedAsync(message, context.RequestAborted);
context.Response.StatusCode = StatusCodes.Status202Accepted;
await context.Response.WriteAsync("Accepted");
}
}
Loading