1- using System . Text . Json ;
1+ using System . Net ;
2+ using System . Text . Json ;
23using Microsoft . Extensions . Logging ;
34using ModelContextProtocol . Protocol . Messages ;
45using ModelContextProtocol . Utils . Json ;
910namespace ModelContextProtocol . Protocol . Transport ;
1011
1112/// <summary>
12- /// Implements the MCP transport protocol over standard input/output streams .
13+ /// Implements the MCP transport protocol using <see cref="HttpListener"/> .
1314/// </summary>
1415public sealed class HttpListenerSseServerTransport : TransportBase , IServerTransport
1516{
1617 private readonly string _serverName ;
1718 private readonly HttpListenerServerProvider _httpServerProvider ;
1819 private readonly ILogger < HttpListenerSseServerTransport > _logger ;
19- private readonly JsonSerializerOptions _jsonOptions ;
20- private CancellationTokenSource ? _shutdownCts ;
21-
20+ private SseResponseStreamTransport ? _sseResponseStreamTransport ;
21+
2222 private string EndpointName => $ "Server (SSE) ({ _serverName } )";
2323
2424 /// <summary>
@@ -43,28 +43,23 @@ public HttpListenerSseServerTransport(string serverName, int port, ILoggerFactor
4343 {
4444 _serverName = serverName ;
4545 _logger = loggerFactory . CreateLogger < HttpListenerSseServerTransport > ( ) ;
46- _jsonOptions = McpJsonUtilities . DefaultOptions ;
47- _httpServerProvider = new HttpListenerServerProvider ( port ) ;
46+ _httpServerProvider = new HttpListenerServerProvider ( port )
47+ {
48+ OnSseConnectionAsync = OnSseConnectionAsync ,
49+ OnMessageAsync = OnMessageAsync ,
50+ } ;
4851 }
4952
5053 /// <inheritdoc/>
5154 public Task StartListeningAsync ( CancellationToken cancellationToken = default )
5255 {
53- _shutdownCts = new CancellationTokenSource ( ) ;
54-
55- _httpServerProvider . InitializeMessageHandler ( HttpMessageHandler ) ;
56- _httpServerProvider . StartAsync ( cancellationToken ) ;
57-
58- SetConnected ( true ) ;
59-
60- return Task . CompletedTask ;
56+ return _httpServerProvider . StartAsync ( cancellationToken ) ;
6157 }
6258
63-
6459 /// <inheritdoc/>
6560 public override async Task SendMessageAsync ( IJsonRpcMessage message , CancellationToken cancellationToken = default )
6661 {
67- if ( ! IsConnected )
62+ if ( ! IsConnected || _sseResponseStreamTransport is null )
6863 {
6964 _logger . TransportNotConnected ( EndpointName ) ;
7065 throw new McpTransportException ( "Transport is not connected" ) ;
@@ -78,10 +73,13 @@ public override async Task SendMessageAsync(IJsonRpcMessage message, Cancellatio
7873
7974 try
8075 {
81- var json = JsonSerializer . Serialize ( message , _jsonOptions . GetTypeInfo < IJsonRpcMessage > ( ) ) ;
82- _logger . TransportSendingMessage ( EndpointName , id , json ) ;
76+ if ( _logger . IsEnabled ( LogLevel . Debug ) )
77+ {
78+ var json = JsonSerializer . Serialize ( message , McpJsonUtilities . DefaultOptions . GetTypeInfo < IJsonRpcMessage > ( ) ) ;
79+ _logger . TransportSendingMessage ( EndpointName , id , json ) ;
80+ }
8381
84- await _httpServerProvider . SendEvent ( json , "message" ) . ConfigureAwait ( false ) ;
82+ await _sseResponseStreamTransport . SendMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
8583
8684 _logger . TransportSentMessage ( EndpointName , id ) ;
8785 }
@@ -99,49 +97,61 @@ public override async ValueTask DisposeAsync()
9997 GC . SuppressFinalize ( this ) ;
10098 }
10199
102- private async Task CleanupAsync ( CancellationToken cancellationToken )
100+ private Task CleanupAsync ( CancellationToken cancellationToken )
103101 {
104102 _logger . TransportCleaningUp ( EndpointName ) ;
105103
106- if ( _shutdownCts != null )
107- {
108- await _shutdownCts . CancelAsync ( ) . ConfigureAwait ( false ) ;
109- _shutdownCts . Dispose ( ) ;
110- _shutdownCts = null ;
111- }
112-
113104 _httpServerProvider . Dispose ( ) ;
114-
115105 SetConnected ( false ) ;
106+
116107 _logger . TransportCleanedUp ( EndpointName ) ;
108+ return Task . CompletedTask ;
109+ }
110+
111+ private async Task OnSseConnectionAsync ( Stream responseStream , CancellationToken cancellationToken )
112+ {
113+ await using var sseResponseStreamTransport = new SseResponseStreamTransport ( responseStream ) ;
114+ _sseResponseStreamTransport = sseResponseStreamTransport ;
115+ SetConnected ( true ) ;
116+ await sseResponseStreamTransport . RunAsync ( cancellationToken ) ;
117117 }
118118
119119 /// <summary>
120120 /// Handles HTTP messages received by the HTTP server provider.
121121 /// </summary>
122122 /// <returns>true if the message was accepted (return 202), false otherwise (return 400)</returns>
123- private bool HttpMessageHandler ( string request , CancellationToken cancellationToken )
123+ private async Task < bool > OnMessageAsync ( Stream requestStream , CancellationToken cancellationToken )
124124 {
125- _logger . TransportReceivedMessage ( EndpointName , request ) ;
125+ string request ;
126+ IJsonRpcMessage ? message = null ;
127+
128+ if ( _logger . IsEnabled ( LogLevel . Information ) )
129+ {
130+ using var reader = new StreamReader ( requestStream ) ;
131+ request = await reader . ReadToEndAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
132+ message = JsonSerializer . Deserialize ( request , McpJsonUtilities . DefaultOptions . GetTypeInfo < IJsonRpcMessage > ( ) ) ;
133+
134+ _logger . TransportReceivedMessage ( EndpointName , request ) ;
135+ }
136+ else
137+ {
138+ request = "(Enable information-level logs to see the request)" ;
139+ }
126140
127141 try
128142 {
129- var message = JsonSerializer . Deserialize ( request , _jsonOptions . GetTypeInfo < IJsonRpcMessage > ( ) ) ;
143+ message ??= await JsonSerializer . DeserializeAsync ( requestStream , McpJsonUtilities . DefaultOptions . GetTypeInfo < IJsonRpcMessage > ( ) ) ;
130144 if ( message != null )
131145 {
132- // Fire-and-forget the message to the message channel
133- Task . Run ( async ( ) =>
146+ string messageId = "(no id)" ;
147+ if ( message is IJsonRpcMessageWithId messageWithId )
134148 {
135- string messageId = "(no id)" ;
136- if ( message is IJsonRpcMessageWithId messageWithId )
137- {
138- messageId = messageWithId . Id . ToString ( ) ;
139- }
140-
141- _logger . TransportReceivedMessageParsed ( EndpointName , messageId ) ;
142- await WriteMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
143- _logger . TransportMessageWritten ( EndpointName , messageId ) ;
144- } , cancellationToken ) ;
149+ messageId = messageWithId . Id . ToString ( ) ;
150+ }
151+
152+ _logger . TransportReceivedMessageParsed ( EndpointName , messageId ) ;
153+ await WriteMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
154+ _logger . TransportMessageWritten ( EndpointName , messageId ) ;
145155
146156 return true ;
147157 }
0 commit comments