Skip to content

Commit 9b4db35

Browse files
authored
Write "event: message" to SSE response (#192)
* Write "event: message" to SSE response * Rename McpJsonRpcEndpoint to McpEndpoint * Remove unused McpServerConfig.Arguments property
1 parent f1af251 commit 9b4db35

File tree

7 files changed

+57
-13
lines changed

7 files changed

+57
-13
lines changed

src/ModelContextProtocol/Client/McpClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
namespace ModelContextProtocol.Client;
1111

1212
/// <inheritdoc/>
13-
internal sealed class McpClient : McpJsonRpcEndpoint, IMcpClient
13+
internal sealed class McpClient : McpEndpoint, IMcpClient
1414
{
1515
private readonly IClientTransport _clientTransport;
1616
private readonly McpClientOptions _options;

src/ModelContextProtocol/Configuration/McpServerConfig.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,6 @@ public record McpServerConfig
2727
/// </summary>
2828
public string? Location { get; set; }
2929

30-
/// <summary>
31-
/// Arguments (if any) to pass to the executable.
32-
/// </summary>
33-
public string[]? Arguments { get; init; }
34-
3530
/// <summary>
3631
/// Additional transport-specific configuration.
3732
/// </summary>

src/ModelContextProtocol/Protocol/Transport/SseResponseStreamTransport.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca
7676
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
7777
}
7878

79-
await _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message), cancellationToken);
79+
// Emit redundant "event: message" lines for better compatibility with other SDKs.
80+
await _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message, SseParser.EventTypeDefault), cancellationToken);
8081
}
8182

8283
/// <summary>

src/ModelContextProtocol/Server/McpServer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
namespace ModelContextProtocol.Server;
1010

1111
/// <inheritdoc />
12-
internal sealed class McpServer : McpJsonRpcEndpoint, IMcpServer
12+
internal sealed class McpServer : McpEndpoint, IMcpServer
1313
{
1414
private readonly EventHandler? _toolsChangedDelegate;
1515
private readonly EventHandler? _promptsChangedDelegate;

src/ModelContextProtocol/Shared/McpJsonRpcEndpoint.cs renamed to src/ModelContextProtocol/Shared/McpEndpoint.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace ModelContextProtocol.Shared;
1717
/// This is especially true as a client represents a connection to one and only one server, and vice versa.
1818
/// Any multi-client or multi-server functionality should be implemented at a higher level of abstraction.
1919
/// </summary>
20-
internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
20+
internal abstract class McpEndpoint : IAsyncDisposable
2121
{
2222
private readonly RequestHandlers _requestHandlers = [];
2323
private readonly NotificationHandlers _notificationHandlers = [];
@@ -31,10 +31,10 @@ internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
3131
protected readonly ILogger _logger;
3232

3333
/// <summary>
34-
/// Initializes a new instance of the <see cref="McpJsonRpcEndpoint"/> class.
34+
/// Initializes a new instance of the <see cref="McpEndpoint"/> class.
3535
/// </summary>
3636
/// <param name="loggerFactory">The logger factory.</param>
37-
protected McpJsonRpcEndpoint(ILoggerFactory? loggerFactory = null)
37+
protected McpEndpoint(ILoggerFactory? loggerFactory = null)
3838
{
3939
_logger = loggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance;
4040
}
@@ -64,7 +64,7 @@ public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancella
6464
/// <summary>
6565
/// Task that processes incoming messages from the transport.
6666
/// </summary>
67-
protected Task? MessageProcessingTask { get; set; }
67+
protected Task? MessageProcessingTask { get; private set; }
6868

6969
[MemberNotNull(nameof(MessageProcessingTask))]
7070
protected void StartSession(ITransport sessionTransport)

src/ModelContextProtocol/Shared/McpSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ internal sealed class McpSession : IDisposable
4545
/// </summary>
4646
private readonly ConcurrentDictionary<RequestId, CancellationTokenSource> _handlingRequests = new();
4747
private readonly ILogger _logger;
48-
48+
4949
private readonly string _id = Guid.NewGuid().ToString("N");
5050
private long _nextRequestId;
5151

tests/ModelContextProtocol.Tests/SseServerIntegrationTests.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using ModelContextProtocol.Client;
22
using ModelContextProtocol.Protocol.Types;
33
using ModelContextProtocol.Tests.Utils;
4+
using System.Net;
5+
using System.Text;
46

57
namespace ModelContextProtocol.Tests;
68

@@ -30,6 +32,12 @@ private Task<IMcpClient> GetClientAsync(McpClientOptions? options = null)
3032
cancellationToken: TestContext.Current.CancellationToken);
3133
}
3234

35+
private HttpClient GetHttpClient() =>
36+
new()
37+
{
38+
BaseAddress = new(_fixture.DefaultConfig.Location!),
39+
};
40+
3341
[Fact]
3442
public async Task ConnectAndPing_Sse_TestServer()
3543
{
@@ -271,4 +279,44 @@ public async Task CallTool_Sse_EchoServer_Concurrently()
271279
Assert.Equal($"Echo: Hello MCP! {i}", textContent.Text);
272280
}
273281
}
282+
283+
[Fact]
284+
public async Task EventSourceStream_Includes_MessageEventType()
285+
{
286+
// Simulate our own MCP client handshake using a plain HttpClient so we can look for "event: message"
287+
// in the raw SSE response stream which is not exposed by the real MCP client.
288+
using var httpClient = GetHttpClient();
289+
await using var sseResponse = await httpClient.GetStreamAsync("", TestContext.Current.CancellationToken);
290+
using var streamReader = new StreamReader(sseResponse);
291+
292+
var endpointEvent = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
293+
Assert.Equal("event: endpoint", endpointEvent);
294+
295+
var endpointData = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
296+
Assert.NotNull(endpointData);
297+
Assert.StartsWith("data: ", endpointData);
298+
var messageEndpoint = endpointData["data: ".Length..];
299+
300+
const string initializeRequest = """
301+
{"jsonrpc":"2.0","id":"1","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"IntegrationTestClient","version":"1.0.0"}}}
302+
""";
303+
using (var initializeRequestBody = new StringContent(initializeRequest, Encoding.UTF8, "application/json"))
304+
{
305+
var response = await httpClient.PostAsync(messageEndpoint, initializeRequestBody, TestContext.Current.CancellationToken);
306+
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
307+
}
308+
309+
const string initializedNotification = """
310+
{"jsonrpc":"2.0","method":"notifications/initialized"}
311+
""";
312+
using (var initializedNotificationBody = new StringContent(initializedNotification, Encoding.UTF8, "application/json"))
313+
{
314+
var response = await httpClient.PostAsync(messageEndpoint, initializedNotificationBody, TestContext.Current.CancellationToken);
315+
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
316+
}
317+
318+
Assert.Equal("", await streamReader.ReadLineAsync(TestContext.Current.CancellationToken));
319+
var messageEvent = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
320+
Assert.Equal("event: message", messageEvent);
321+
}
274322
}

0 commit comments

Comments
 (0)