Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public sealed class SseResponseStreamTransport(Stream sseResponseStream) : ITran
private Utf8JsonWriter? _jsonWriter;

/// <inheritdoc/>
public bool IsConnected => _sseWriteTask?.IsCompleted == false;
public bool IsConnected { get; private set; }

/// <summary>
/// Starts the transport and writes the JSON-RPC messages sent via <see cref="SendMessageAsync(IJsonRpcMessage, CancellationToken)"/>
Expand All @@ -41,6 +41,8 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions.GetTypeInfo<IJsonRpcMessage?>());
}

IsConnected = true;

// The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type,
// so we fib and special-case the "endpoint" event type in the formatter.
_outgoingSseChannel.Writer.TryWrite(new SseItem<IJsonRpcMessage?>(null, "endpoint"));
Expand All @@ -55,6 +57,7 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
/// <inheritdoc/>
public ValueTask DisposeAsync()
{
IsConnected = false;
_incomingChannel.Writer.TryComplete();
_outgoingSseChannel.Writer.TryComplete();
return new ValueTask(_sseWriteTask ?? Task.CompletedTask);
Expand All @@ -63,7 +66,7 @@ public ValueTask DisposeAsync()
/// <inheritdoc/>
public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancellationToken = default)
{
if (_sseWriteTask is null)
if (!IsConnected)
{
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
}
Expand All @@ -80,7 +83,7 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca
/// <exception cref="InvalidOperationException">Thrown when there is an attempt to process a message before calling <see cref="RunAsync(CancellationToken)"/>.</exception>
public async Task OnMessageReceivedAsync(IJsonRpcMessage message, CancellationToken cancellationToken)
{
if (_sseWriteTask is null)
if (!IsConnected)
{
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
}
Expand Down