Skip to content

Commit b5a26e4

Browse files
authored
HttpListener based SSE Server Support (#73)
1 parent 4597bbd commit b5a26e4

13 files changed

+1197
-11
lines changed

mcpdotnet.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{02EA
2121
EndProject
2222
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{2A77AF5C-138A-4EBB-9A13-9205DCD67928}"
2323
EndProject
24+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "mcpdotnet.TestSseServer", "tests\mcpdotnet.TestSseServer\mcpdotnet.TestSseServer.csproj", "{79B94BF9-E557-33DB-3F19-B2C7D9BF8C56}"
25+
EndProject
2426
Global
2527
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2628
Debug|Any CPU = Debug|Any CPU
@@ -55,6 +57,10 @@ Global
5557
{6499876E-2F76-44A8-B6EB-5B889C6E9B7F}.Debug|Any CPU.Build.0 = Debug|Any CPU
5658
{6499876E-2F76-44A8-B6EB-5B889C6E9B7F}.Release|Any CPU.ActiveCfg = Release|Any CPU
5759
{6499876E-2F76-44A8-B6EB-5B889C6E9B7F}.Release|Any CPU.Build.0 = Release|Any CPU
60+
{79B94BF9-E557-33DB-3F19-B2C7D9BF8C56}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
61+
{79B94BF9-E557-33DB-3F19-B2C7D9BF8C56}.Debug|Any CPU.Build.0 = Debug|Any CPU
62+
{79B94BF9-E557-33DB-3F19-B2C7D9BF8C56}.Release|Any CPU.ActiveCfg = Release|Any CPU
63+
{79B94BF9-E557-33DB-3F19-B2C7D9BF8C56}.Release|Any CPU.Build.0 = Release|Any CPU
5864
EndGlobalSection
5965
GlobalSection(SolutionProperties) = preSolution
6066
HideSolutionNode = FALSE

src/mcpdotnet/Configuration/McpServerBuilderExtensions.Transports.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,19 @@ public static IMcpServerBuilder WithStdioServerTransport(this IMcpServerBuilder
2121
builder.Services.AddSingleton<IServerTransport, StdioServerTransport>();
2222
return builder;
2323
}
24+
25+
/// <summary>
26+
/// Adds a server transport that uses SSE via a HttpListener for communication.
27+
/// </summary>
28+
/// <param name="builder">The builder instance.</param>
29+
public static IMcpServerBuilder WithHttpListenerSseServerTransport(this IMcpServerBuilder builder)
30+
{
31+
if (builder is null)
32+
{
33+
throw new ArgumentNullException(nameof(builder));
34+
}
35+
36+
builder.Services.AddSingleton<IServerTransport, HttpListenerSseServerTransport>();
37+
return builder;
38+
}
2439
}
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
using McpDotNet.Server;
2+
using System.Diagnostics.CodeAnalysis;
3+
using System.Net;
4+
using System.Text;
5+
6+
namespace McpDotNet.Protocol.Transport;
7+
8+
/// <summary>
9+
/// HTTP server provider using HttpListener.
10+
/// </summary>
11+
[ExcludeFromCodeCoverage]
12+
internal class HttpListenerServerProvider : IDisposable
13+
{
14+
private readonly int _port;
15+
private readonly string _sseEndpoint = "/sse";
16+
private readonly string _messageEndpoint = "/message";
17+
private HttpListener? _listener;
18+
private CancellationTokenSource? _cts;
19+
private Func<string, CancellationToken, bool>? _messageHandler;
20+
private StreamWriter? _streamWriter;
21+
private bool _isRunning;
22+
23+
/// <summary>
24+
/// Creates a new instance of the HTTP server provider.
25+
/// </summary>
26+
/// <param name="port">The port to listen on</param>
27+
public HttpListenerServerProvider(int port)
28+
{
29+
_port = port;
30+
}
31+
32+
public Task<string> GetSseEndpointUri()
33+
{
34+
return Task.FromResult($"http://localhost:{_port}{_sseEndpoint}");
35+
}
36+
37+
public Task InitializeMessageHandler(Func<string, CancellationToken, bool> messageHandler)
38+
{
39+
_messageHandler = messageHandler;
40+
return Task.CompletedTask;
41+
}
42+
43+
public async Task SendEvent(string data, string eventId)
44+
{
45+
if (_streamWriter == null)
46+
{
47+
throw new McpServerException("Stream writer not initialized");
48+
}
49+
if (eventId != null)
50+
{
51+
await _streamWriter.WriteLineAsync($"id: {eventId}").ConfigureAwait(false);
52+
}
53+
await _streamWriter.WriteLineAsync($"data: {data}").ConfigureAwait(false);
54+
await _streamWriter.WriteLineAsync().ConfigureAwait(false); // Empty line to finish the event
55+
await _streamWriter.FlushAsync().ConfigureAwait(false);
56+
}
57+
58+
/// <inheritdoc/>
59+
public Task StartAsync(CancellationToken cancellationToken = default)
60+
{
61+
if (_isRunning)
62+
return Task.CompletedTask;
63+
64+
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
65+
_listener = new HttpListener();
66+
_listener.Prefixes.Add($"http://localhost:{_port}/");
67+
_listener.Start();
68+
_isRunning = true;
69+
70+
// Start listening for connections
71+
_ = Task.Run(() => ListenForConnectionsAsync(_cts.Token), cancellationToken).ConfigureAwait(false);
72+
return Task.CompletedTask;
73+
}
74+
75+
/// <inheritdoc/>
76+
public Task StopAsync(CancellationToken cancellationToken = default)
77+
{
78+
if (!_isRunning)
79+
return Task.CompletedTask;
80+
81+
_cts?.Cancel();
82+
_listener?.Stop();
83+
84+
_streamWriter?.Close();
85+
86+
_isRunning = false;
87+
return Task.CompletedTask;
88+
}
89+
90+
private async Task ListenForConnectionsAsync(CancellationToken cancellationToken)
91+
{
92+
if (_listener == null)
93+
{
94+
throw new McpServerException("Listener not initialized");
95+
}
96+
97+
while (!cancellationToken.IsCancellationRequested)
98+
{
99+
try
100+
{
101+
var context = await _listener.GetContextAsync().ConfigureAwait(false);
102+
103+
// Process the request in a separate task
104+
_ = Task.Run(() => ProcessRequestAsync(context, cancellationToken), cancellationToken);
105+
}
106+
catch (Exception) when (cancellationToken.IsCancellationRequested)
107+
{
108+
// Shutdown requested, exit gracefully
109+
break;
110+
}
111+
catch (Exception)
112+
{
113+
// Log error but continue listening
114+
if (!cancellationToken.IsCancellationRequested)
115+
{
116+
// Continue listening if not shutting down
117+
continue;
118+
}
119+
}
120+
}
121+
}
122+
123+
private async Task ProcessRequestAsync(HttpListenerContext context, CancellationToken cancellationToken)
124+
{
125+
try
126+
{
127+
var request = context.Request;
128+
var response = context.Response;
129+
130+
if (request == null)
131+
throw new McpServerException("Request is null");
132+
133+
// Handle SSE connection
134+
if (request.HttpMethod == "GET" && request.Url?.LocalPath == _sseEndpoint)
135+
{
136+
await HandleSseConnectionAsync(context, cancellationToken).ConfigureAwait(false);
137+
}
138+
// Handle message POST
139+
else if (request.HttpMethod == "POST" && request.Url?.LocalPath == _messageEndpoint)
140+
{
141+
await HandleMessageAsync(context, cancellationToken).ConfigureAwait(false);
142+
}
143+
else
144+
{
145+
// Not found
146+
response.StatusCode = 404;
147+
response.Close();
148+
}
149+
}
150+
catch (Exception)
151+
{
152+
try
153+
{
154+
context.Response.StatusCode = 500;
155+
context.Response.Close();
156+
}
157+
catch { /* Ignore errors during error handling */ }
158+
}
159+
}
160+
161+
private async Task HandleSseConnectionAsync(HttpListenerContext context, CancellationToken cancellationToken)
162+
{
163+
var response = context.Response;
164+
165+
// Set SSE headers
166+
response.ContentType = "text/event-stream";
167+
response.Headers.Add("Cache-Control", "no-cache");
168+
response.Headers.Add("Connection", "keep-alive");
169+
170+
// Create a unique ID for this client
171+
var clientId = Guid.NewGuid().ToString();
172+
173+
// Get the output stream and create a StreamWriter
174+
var outputStream = response.OutputStream;
175+
_streamWriter = new StreamWriter(outputStream, Encoding.UTF8) { AutoFlush = true };
176+
177+
// Keep the connection open until cancelled
178+
try
179+
{
180+
// Immediately send the "endpoint" event with the POST URL
181+
await _streamWriter.WriteLineAsync("event: endpoint").ConfigureAwait(false);
182+
await _streamWriter.WriteLineAsync($"data: {_messageEndpoint}").ConfigureAwait(false);
183+
await _streamWriter.WriteLineAsync().ConfigureAwait(false); // blank line to end an SSE message
184+
await _streamWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
185+
186+
// Keep the connection open by "pinging" or just waiting
187+
// until the client disconnects or the server is canceled.
188+
while (!cancellationToken.IsCancellationRequested && response.OutputStream.CanWrite)
189+
{
190+
// Do a periodic no-op to keep connection alive:
191+
await _streamWriter.WriteLineAsync(": keep-alive").ConfigureAwait(false);
192+
await _streamWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
193+
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken).ConfigureAwait(false);
194+
}
195+
}
196+
catch (TaskCanceledException)
197+
{
198+
// Normal shutdown
199+
}
200+
catch (Exception)
201+
{
202+
// Client disconnected or other error
203+
}
204+
finally
205+
{
206+
// Remove client on disconnect
207+
try
208+
{
209+
_streamWriter.Close();
210+
response.Close();
211+
}
212+
catch { /* Ignore errors during cleanup */ }
213+
}
214+
}
215+
216+
private async Task HandleMessageAsync(HttpListenerContext context, CancellationToken cancellationToken)
217+
{
218+
var request = context.Request;
219+
var response = context.Response;
220+
221+
// Read the request body
222+
string requestBody;
223+
using (var reader = new StreamReader(request.InputStream, request.ContentEncoding))
224+
{
225+
// TODO: Add cancellation token and netstandard2.0 support (polyfill?)
226+
#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods
227+
requestBody = await reader.ReadToEndAsync().ConfigureAwait(false);
228+
#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods
229+
}
230+
231+
// Process the message asynchronously
232+
if (_messageHandler != null && _messageHandler(requestBody, cancellationToken))
233+
{
234+
// Return 202 Accepted
235+
response.StatusCode = 202;
236+
// Write "accepted" response
237+
// TODO: Use WriteAsync, add cancellation token and netstandard2.0 support (polyfill?)
238+
#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods
239+
byte[] buffer = Encoding.UTF8.GetBytes("Accepted");
240+
response.OutputStream.Write(buffer, 0, buffer.Length);
241+
#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods
242+
}
243+
else
244+
{
245+
// Return 400 Bad Request
246+
response.StatusCode = 400;
247+
}
248+
249+
response.Close();
250+
}
251+
252+
253+
/// <inheritdoc/>
254+
public void Dispose()
255+
{
256+
Dispose(true);
257+
GC.SuppressFinalize(this);
258+
}
259+
260+
/// <inheritdoc/>
261+
protected virtual void Dispose(bool disposing)
262+
{
263+
StopAsync().GetAwaiter().GetResult();
264+
_cts?.Dispose();
265+
_listener?.Close();
266+
}
267+
}

0 commit comments

Comments
 (0)