Skip to content

Commit 5502522

Browse files
committed
PR feedback
1 parent e8c3032 commit 5502522

File tree

6 files changed

+77
-77
lines changed

6 files changed

+77
-77
lines changed

src/Common/ServerSentEvents/ArrayBuffer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
// Copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/src/System/Net/ArrayBuffer.cs
4+
// Copied from https://github.com/dotnet/runtime/blob/dcbf3413c5f7ae431a68fd0d3f09af095b525887/src/libraries/Common/src/System/Net/ArrayBuffer.cs
55

66
using System.Buffers;
77
using System.Diagnostics;
@@ -26,7 +26,7 @@ internal struct ArrayBuffer : IDisposable
2626
#if NET
2727
private static int ArrayMaxLength => Array.MaxLength;
2828
#else
29-
private const int ArrayMaxLength = 0X7FFFFFC7;
29+
private const int ArrayMaxLength = 0X7FFFFFC7;
3030
#endif
3131

3232
private readonly bool _usePool;

src/Common/ServerSentEvents/PooledByteBufferWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
// Copied from https://github.com/dotnet/runtime/blob/main/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs
4+
// Copied from https://github.com/dotnet/runtime/blob/dcbf3413c5f7ae431a68fd0d3f09af095b525887/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs
55

66
using System.Buffers;
77
using System.Diagnostics;

src/Common/ServerSentEvents/SseEventWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
// Based on https://github.com/dotnet/runtime/blob/main/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs
4+
// Based on https://github.com/dotnet/runtime/blob/dcbf3413c5f7ae431a68fd0d3f09af095b525887/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs
55

66
using System.Buffers;
77
using System.Diagnostics;

src/Common/ServerSentEvents/SseEventWriterHelpers.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
// Copied from https://github.com/dotnet/runtime/blob/main/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs
4+
// Copied from https://github.com/dotnet/runtime/blob/dcbf3413c5f7ae431a68fd0d3f09af095b525887/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs
55

66
using System.Buffers;
77
using System.Diagnostics;

src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@ namespace ModelContextProtocol.Server;
1010
/// Handles processing the request/response body pairs for the Streamable HTTP transport.
1111
/// This is typically used via <see cref="JsonRpcMessageContext.RelatedTransport"/>.
1212
/// </summary>
13-
internal sealed class StreamableHttpPostTransport(StreamableHttpServerTransport parentTransport, Stream responseStream) : ITransport
13+
internal sealed class StreamableHttpPostTransport(StreamableHttpServerTransport parentTransport, Stream responseStream, CancellationToken sessionCancellationToken) : ITransport
1414
{
1515
private readonly SemaphoreSlim _messageLock = new(1, 1);
16-
private readonly TaskCompletionSource<bool> _responseTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
17-
private readonly SseEventWriter _sseResponseWriter = new(responseStream);
16+
private readonly TaskCompletionSource<bool> _httpResponseTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
17+
private readonly SseEventWriter _httpSseWriter = new(responseStream);
18+
19+
private TaskCompletionSource<bool>? _storeStreamTcs;
20+
private ISseEventStreamWriter? _storeSseWriter;
1821

19-
private TaskCompletionSource<bool>? _streamTcs;
20-
private ISseEventStreamWriter? _sseEventStreamWriter;
2122
private RequestId _pendingRequest;
2223
private bool _finalResponseMessageSent;
23-
private bool _originalResponseCompleted;
24+
private bool _httpResponseCompleted;
2425

2526
public ChannelReader<JsonRpcMessage> MessageReader => throw new NotSupportedException("JsonRpcMessage.Context.RelatedTransport should only be used for sending messages.");
2627

@@ -31,7 +32,7 @@ internal sealed class StreamableHttpPostTransport(StreamableHttpServerTransport
3132
/// False, if nothing was written because the request body did not contain any <see cref="JsonRpcRequest"/> messages to respond to.
3233
/// The HTTP application should typically respond with an empty "202 Accepted" response in this scenario.
3334
/// </returns>
34-
public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, CancellationToken postCancellationToken, CancellationToken sessionCancellationToken)
35+
public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, CancellationToken cancellationToken)
3536
{
3637
Debug.Assert(_pendingRequest.Id is null);
3738

@@ -57,25 +58,25 @@ public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, Cancellatio
5758

5859
if (_pendingRequest.Id is null)
5960
{
60-
await parentTransport.MessageWriter.WriteAsync(message, postCancellationToken).ConfigureAwait(false);
61+
await parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false);
6162
return false;
6263
}
6364

64-
using (await _messageLock.LockAsync(postCancellationToken).ConfigureAwait(false))
65+
using (await _messageLock.LockAsync(cancellationToken).ConfigureAwait(false))
6566
{
66-
var primingItem = await TryStartSseEventStreamAsync(_pendingRequest, sessionCancellationToken).ConfigureAwait(false);
67+
var primingItem = await TryStartSseEventStreamAsync(_pendingRequest).ConfigureAwait(false);
6768
if (primingItem.HasValue)
6869
{
69-
await _sseResponseWriter.WriteAsync(primingItem.Value, postCancellationToken).ConfigureAwait(false);
70+
await _httpSseWriter.WriteAsync(primingItem.Value, cancellationToken).ConfigureAwait(false);
7071
}
7172

7273
// Ensure that we've sent the priming event before processing the incoming request.
73-
await parentTransport.MessageWriter.WriteAsync(message, postCancellationToken).ConfigureAwait(false);
74+
await parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false);
7475
}
7576

7677
// Wait for the response to be written before returning from the handler.
7778
// This keeps the HTTP response open until the final response message is sent.
78-
await _responseTcs.Task.WaitAsync(postCancellationToken).ConfigureAwait(false);
79+
await _httpResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
7980

8081
return true;
8182
}
@@ -104,22 +105,22 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
104105

105106
var item = new SseItem<JsonRpcMessage?>(message, SseParser.EventTypeDefault);
106107

107-
if (_sseEventStreamWriter is not null)
108+
if (_storeSseWriter is not null)
108109
{
109-
item = await _sseEventStreamWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false);
110+
item = await _storeSseWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false);
110111
}
111112

112-
if (!_originalResponseCompleted)
113+
if (!_httpResponseCompleted)
113114
{
114115
// Only write the message to the response if the response has not completed.
115116

116117
try
117118
{
118-
await _sseResponseWriter.WriteAsync(item, cancellationToken).ConfigureAwait(false);
119+
await _httpSseWriter.WriteAsync(item, cancellationToken).ConfigureAwait(false);
119120
}
120121
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
121122
{
122-
_responseTcs.TrySetException(ex);
123+
_httpResponseTcs.TrySetException(ex);
123124
}
124125
}
125126
}
@@ -129,8 +130,8 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
129130
if ((message is JsonRpcResponse or JsonRpcError) && ((JsonRpcMessageWithId)message).Id == _pendingRequest)
130131
{
131132
_finalResponseMessageSent = true;
132-
_responseTcs.TrySetResult(true);
133-
_streamTcs?.TrySetResult(true);
133+
_httpResponseTcs.TrySetResult(true);
134+
_storeStreamTcs?.TrySetResult(true);
134135
}
135136
}
136137
}
@@ -144,61 +145,61 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo
144145

145146
using var _ = await _messageLock.LockAsync(cancellationToken).ConfigureAwait(false);
146147

147-
if (_sseEventStreamWriter is null)
148+
if (_storeSseWriter is null)
148149
{
149150
throw new InvalidOperationException($"Polling requires an event stream store to be configured.");
150151
}
151152

152153
// Send the priming event with the new retry interval.
153-
var primingItem = await _sseEventStreamWriter.WriteEventAsync(
154+
var primingItem = await _storeSseWriter.WriteEventAsync(
154155
sseItem: new SseItem<JsonRpcMessage?>() { ReconnectionInterval = retryInterval },
155156
cancellationToken)
156157
.ConfigureAwait(false);
157158

158159
// Write to the response stream if it still exists.
159-
if (!_originalResponseCompleted)
160+
if (!_httpResponseCompleted)
160161
{
161-
await _sseResponseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false);
162+
await _httpSseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false);
162163
}
163164

164165
// Set the mode to 'Polling' so that the replay stream ends as soon as all available messages have been sent.
165166
// This prevents the client from immediately establishing another long-lived connection.
166-
await _sseEventStreamWriter.SetModeAsync(SseEventStreamMode.Polling, cancellationToken).ConfigureAwait(false);
167+
await _storeSseWriter.SetModeAsync(SseEventStreamMode.Polling, cancellationToken).ConfigureAwait(false);
167168

168169
// Signal completion so HandlePostAsync can return.
169-
_responseTcs.TrySetResult(true);
170+
_httpResponseTcs.TrySetResult(true);
170171
}
171172

172-
private async ValueTask<SseItem<JsonRpcMessage?>?> TryStartSseEventStreamAsync(RequestId requestId, CancellationToken cancellationToken)
173+
private async ValueTask<SseItem<JsonRpcMessage?>?> TryStartSseEventStreamAsync(RequestId requestId)
173174
{
174-
Debug.Assert(_sseEventStreamWriter is null);
175+
Debug.Assert(_storeSseWriter is null);
175176

176-
_sseEventStreamWriter = await parentTransport.TryCreateEventStreamAsync(
177+
_storeSseWriter = await parentTransport.TryCreateEventStreamAsync(
177178
streamId: requestId.Id!.ToString()!,
178-
cancellationToken: cancellationToken)
179+
cancellationToken: sessionCancellationToken)
179180
.ConfigureAwait(false);
180181

181-
if (_sseEventStreamWriter is null)
182+
if (_storeSseWriter is null)
182183
{
183184
return null;
184185
}
185186

186-
_streamTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
187-
_ = HandleStreamWriterDisposalAsync(_streamTcs.Task, cancellationToken);
187+
_storeStreamTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
188+
_ = HandleStoreStreamDisposalAsync(_storeStreamTcs.Task);
188189

189-
return await _sseEventStreamWriter.WriteEventAsync(SseItem.Prime<JsonRpcMessage>(), cancellationToken).ConfigureAwait(false);
190+
return await _storeSseWriter.WriteEventAsync(SseItem.Prime<JsonRpcMessage>(), sessionCancellationToken).ConfigureAwait(false);
190191

191-
async Task HandleStreamWriterDisposalAsync(Task streamTask, CancellationToken cancellationToken)
192+
async Task HandleStoreStreamDisposalAsync(Task streamTask)
192193
{
193194
try
194195
{
195-
await streamTask.WaitAsync(cancellationToken).ConfigureAwait(false);
196+
await streamTask.WaitAsync(sessionCancellationToken).ConfigureAwait(false);
196197
}
197198
finally
198199
{
199200
using var _ = await _messageLock.LockAsync().ConfigureAwait(false);
200201

201-
await _sseEventStreamWriter!.DisposeAsync().ConfigureAwait(false);
202+
await _storeSseWriter!.DisposeAsync().ConfigureAwait(false);
202203
}
203204
}
204205
}
@@ -207,16 +208,16 @@ public async ValueTask DisposeAsync()
207208
{
208209
using var _ = await _messageLock.LockAsync().ConfigureAwait(false);
209210

210-
if (_originalResponseCompleted)
211+
if (_httpResponseCompleted)
211212
{
212213
return;
213214
}
214215

215-
_originalResponseCompleted = true;
216+
_httpResponseCompleted = true;
216217

217-
_responseTcs.TrySetResult(true);
218+
_httpResponseTcs.TrySetResult(true);
218219

219-
_sseResponseWriter.Dispose();
220+
_httpSseWriter.Dispose();
220221

221222
// Don't dispose the event stream writer here, as we may continue to write to the event store
222223
// after disposal if there are pending messages.

src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ public sealed class StreamableHttpServerTransport : ITransport
3636
private readonly CancellationTokenSource _transportDisposedCts = new();
3737
private readonly SemaphoreSlim _unsolicitedMessageLock = new(1, 1);
3838

39-
private SseEventWriter? _sseResponseWriter;
40-
private ISseEventStreamWriter? _sseEventStreamWriter;
41-
private TaskCompletionSource<bool>? _responseTcs;
42-
private bool _getRequestStarted;
43-
private bool _originalResponseCompleted;
39+
private SseEventWriter? _httpSseWriter;
40+
private ISseEventStreamWriter? _storeSseWriter;
41+
private TaskCompletionSource<bool>? _httpResponseTcs;
42+
private bool _getHttpRequestStarted;
43+
private bool _getHttpResponseCompleted;
4444

4545
/// <inheritdoc/>
4646
public string? SessionId { get; init; }
@@ -111,25 +111,25 @@ public async Task HandleGetRequestAsync(Stream sseResponseStream, CancellationTo
111111

112112
using (await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false))
113113
{
114-
if (_getRequestStarted)
114+
if (_getHttpRequestStarted)
115115
{
116116
throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session.");
117117
}
118118

119-
_getRequestStarted = true;
120-
_sseResponseWriter = new SseEventWriter(sseResponseStream);
121-
_responseTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
122-
_sseEventStreamWriter = await TryCreateEventStreamAsync(streamId: UnsolicitedMessageStreamId, cancellationToken).ConfigureAwait(false);
123-
if (_sseEventStreamWriter is not null)
119+
_getHttpRequestStarted = true;
120+
_httpSseWriter = new SseEventWriter(sseResponseStream);
121+
_httpResponseTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
122+
_storeSseWriter = await TryCreateEventStreamAsync(streamId: UnsolicitedMessageStreamId, cancellationToken).ConfigureAwait(false);
123+
if (_storeSseWriter is not null)
124124
{
125-
var primingItem = await _sseEventStreamWriter.WriteEventAsync(SseItem.Prime<JsonRpcMessage>(), cancellationToken).ConfigureAwait(false);
126-
await _sseResponseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false);
125+
var primingItem = await _storeSseWriter.WriteEventAsync(SseItem.Prime<JsonRpcMessage>(), cancellationToken).ConfigureAwait(false);
126+
await _httpSseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false);
127127
}
128128
}
129129

130130
// Wait for the response to be written before returning from the handler.
131131
// This keeps the HTTP response open until the final response message is sent.
132-
await _responseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
132+
await _httpResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
133133
}
134134

135135
/// <summary>
@@ -155,14 +155,13 @@ public async Task<bool> HandlePostRequestAsync(JsonRpcMessage message, Stream re
155155
Throw.IfNull(message);
156156
Throw.IfNull(responseStream);
157157

158+
var postTransport = new StreamableHttpPostTransport(this, responseStream, _transportDisposedCts.Token);
158159
using var postCts = CancellationTokenSource.CreateLinkedTokenSource(_transportDisposedCts.Token, cancellationToken);
159-
var postTransport = new StreamableHttpPostTransport(this, responseStream);
160160
await using (postTransport.ConfigureAwait(false))
161161
{
162162
return await postTransport.HandlePostAsync(
163163
message,
164-
postCancellationToken: postCts.Token,
165-
sessionCancellationToken: _transportDisposedCts.Token)
164+
cancellationToken: postCts.Token)
166165
.ConfigureAwait(false);
167166
}
168167
}
@@ -179,34 +178,34 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
179178

180179
using var _ = await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false);
181180

182-
if (!_getRequestStarted)
181+
if (!_getHttpRequestStarted)
183182
{
184183
// Clients are not required to make a GET request for unsolicited messages.
185184
// If no GET request has been made, drop the message.
186185
return;
187186
}
188187

189-
Debug.Assert(_sseResponseWriter is not null);
190-
Debug.Assert(_responseTcs is not null);
188+
Debug.Assert(_httpSseWriter is not null);
189+
Debug.Assert(_httpResponseTcs is not null);
191190

192191
var item = SseItem.Message(message);
193192

194-
if (_sseEventStreamWriter is not null)
193+
if (_storeSseWriter is not null)
195194
{
196-
item = await _sseEventStreamWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false);
195+
item = await _storeSseWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false);
197196
}
198197

199-
if (!_originalResponseCompleted)
198+
if (!_getHttpResponseCompleted)
200199
{
201200
// Only write the message to the response if the response has not completed.
202201

203202
try
204203
{
205-
await _sseResponseWriter!.WriteAsync(item, cancellationToken).ConfigureAwait(false);
204+
await _httpSseWriter!.WriteAsync(item, cancellationToken).ConfigureAwait(false);
206205
}
207206
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
208207
{
209-
_responseTcs!.TrySetException(ex);
208+
_httpResponseTcs!.TrySetException(ex);
210209
}
211210
}
212211
}
@@ -216,12 +215,12 @@ public async ValueTask DisposeAsync()
216215
{
217216
using var _ = await _unsolicitedMessageLock.LockAsync().ConfigureAwait(false);
218217

219-
if (_originalResponseCompleted)
218+
if (_getHttpResponseCompleted)
220219
{
221220
return;
222221
}
223222

224-
_originalResponseCompleted = true;
223+
_getHttpResponseCompleted = true;
225224

226225
try
227226
{
@@ -232,12 +231,12 @@ public async ValueTask DisposeAsync()
232231
{
233232
try
234233
{
235-
_responseTcs?.TrySetResult(true);
236-
_sseResponseWriter?.Dispose();
234+
_httpResponseTcs?.TrySetResult(true);
235+
_httpSseWriter?.Dispose();
237236

238-
if (_sseEventStreamWriter is not null)
237+
if (_storeSseWriter is not null)
239238
{
240-
await _sseEventStreamWriter.DisposeAsync().ConfigureAwait(false);
239+
await _storeSseWriter.DisposeAsync().ConfigureAwait(false);
241240
}
242241
}
243242
finally

0 commit comments

Comments
 (0)