Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageVersion Include="Microsoft.Extensions.AI.Abstractions" Version="$(MicrosoftExtensionsAIVersion)" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsVersion)" />
<PackageVersion Include="System.Net.ServerSentEvents" Version="$(SystemVersion)" />
<PackageVersion Include="System.Net.ServerSentEvents" Version="$(System10Version)" />
<PackageVersion Include="System.Text.Json" Version="$(SystemVersion)" />
<PackageVersion Include="System.Threading.Channels" Version="$(SystemVersion)" />

Expand Down
4 changes: 0 additions & 4 deletions samples/AspNetCoreSseServer/AspNetCoreSseServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,4 @@
<ProjectReference Include="..\..\src\ModelContextProtocol\ModelContextProtocol.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Net.ServerSentEvents" VersionOverride="10.0.0-preview.1.25080.5" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using ModelContextProtocol.Server;
using ModelContextProtocol.Utils.Json;
using Microsoft.Extensions.Options;
using ModelContextProtocol.Protocol.Transport;

namespace AspNetCoreSseServer;

Expand All @@ -10,15 +11,15 @@ public static class McpEndpointRouteBuilderExtensions
public static IEndpointConventionBuilder MapMcpSse(this IEndpointRouteBuilder endpoints)
{
IMcpServer? server = null;
SseServerStreamTransport? transport = null;
SseResponseStreamTransport? transport = null;
var loggerFactory = endpoints.ServiceProvider.GetRequiredService<ILoggerFactory>();
var mcpServerOptions = endpoints.ServiceProvider.GetRequiredService<IOptions<McpServerOptions>>();

var routeGroup = endpoints.MapGroup("");

routeGroup.MapGet("/sse", async (HttpResponse response, CancellationToken requestAborted) =>
{
await using var localTransport = transport = new SseServerStreamTransport(response.Body);
await using var localTransport = transport = new SseResponseStreamTransport(response.Body);
await using var localServer = server = McpServerFactory.Create(transport, mcpServerOptions.Value, loggerFactory, endpoints.ServiceProvider);

await localServer.StartAsync(requestAborted);
Expand All @@ -37,7 +38,7 @@ public static IEndpointConventionBuilder MapMcpSse(this IEndpointRouteBuilder en
}
});

routeGroup.MapPost("/message", async (HttpContext context) =>
routeGroup.MapPost("/message", async context =>
{
if (transport is null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text.Json;
using System.Net;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using ModelContextProtocol.Protocol.Messages;
using ModelContextProtocol.Utils.Json;
Expand All @@ -9,7 +10,7 @@
namespace ModelContextProtocol.Protocol.Transport;

/// <summary>
/// Implements the MCP transport protocol over standard input/output streams.
/// Implements the MCP transport protocol using <see cref="HttpListener"/>.
/// </summary>
public sealed class HttpListenerSseServerTransport : TransportBase, IServerTransport
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,31 @@
using System.Text.Json;
using System.Threading.Channels;
using ModelContextProtocol.Protocol.Messages;
using ModelContextProtocol.Protocol.Transport;
using ModelContextProtocol.Utils.Json;

namespace AspNetCoreSseServer;
namespace ModelContextProtocol.Protocol.Transport;

public class SseServerStreamTransport(Stream sseResponseStream) : ITransport
/// <summary>
/// Implements the MCP SSE server transport protocol using the SSE response <see cref="Stream"/>.
/// </summary>
/// <param name="sseResponseStream">The stream to write the SSE response body to.</param>
public sealed class SseResponseStreamTransport(Stream sseResponseStream) : ITransport
{
private readonly Channel<IJsonRpcMessage> _incomingChannel = CreateSingleItemChannel<IJsonRpcMessage>();
private readonly Channel<SseItem<IJsonRpcMessage?>> _outgoingSseChannel = CreateSingleItemChannel<SseItem<IJsonRpcMessage?>>();

private Task? _sseWriteTask;
private Utf8JsonWriter? _jsonWriter;

/// <inherityydoc/>
public bool IsConnected => _sseWriteTask?.IsCompleted == false;

/// <summary>
/// Starts the transport and writes the JSON-RPC messages sent via <see cref="SendMessageAsync(IJsonRpcMessage, CancellationToken)"/>
/// to the SSE response stream until cancelled or disposed.
/// </summary>
/// <param name="cancellationToken">A token to cancel writing to the SSE response stream.</param>
/// <returns>A task representing the send loop that writes JSON-RPC messages to the SSE response stream.</returns>
public Task RunAsync(CancellationToken cancellationToken)
{
void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<byte> writer)
Expand All @@ -28,7 +38,7 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
return;
}

JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions);
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions.GetTypeInfo<IJsonRpcMessage?>());
}

// The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type,
Expand All @@ -39,23 +49,40 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
return _sseWriteTask = SseFormatter.WriteAsync(sseItems, sseResponseStream, WriteJsonRpcMessageToBuffer, cancellationToken);
}

/// <inheritdoc/>
public ChannelReader<IJsonRpcMessage> MessageReader => _incomingChannel.Reader;

/// <inheritdoc/>
public ValueTask DisposeAsync()
{
_incomingChannel.Writer.TryComplete();
_outgoingSseChannel.Writer.TryComplete();
return new ValueTask(_sseWriteTask ?? Task.CompletedTask);
}

public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancellationToken = default) =>
_outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message), cancellationToken).AsTask();
/// <inheritdoc/>
public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancellationToken = default)
{
if (_sseWriteTask is null)
{
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
}

return _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message), cancellationToken).AsTask();
}

/// <summary>
/// Handles incoming JSON-RPC messages received on the /message endpoint.
/// </summary>
/// <param name="message">The JSON-RPC message received.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task representing the potentially asynchronous operation to buffer or process the JSON-RPC message.</returns>
/// <exception cref="InvalidOperationException">Thrown when there is an attempt to process a message before calling <see cref="RunAsync(CancellationToken)"/>.</exception>
public Task OnMessageReceivedAsync(IJsonRpcMessage message, CancellationToken cancellationToken)
{
if (!IsConnected)
if (_sseWriteTask is null)
{
throw new McpTransportException("Transport is not connected");
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
}

return _incomingChannel.Writer.WriteAsync(message, cancellationToken).AsTask();
Expand Down