Skip to content

Commit f70bd12

Browse files
committed
Adjust for latest changes to ISseEventStreamStore
1 parent e344927 commit f70bd12

File tree

3 files changed

+36
-193
lines changed

3 files changed

+36
-193
lines changed

src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStore.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ internal sealed class StoredEvent
114114
private sealed class DistributedCacheEventStreamWriter : ISseEventStreamWriter
115115
{
116116
private readonly IDistributedCache _cache;
117+
private readonly string _sessionId;
118+
private readonly string _streamId;
119+
private SseEventStreamMode _mode;
117120
private readonly DistributedCacheEventStreamStoreOptions _options;
118121
private long _sequence;
119122
private bool _disposed;
@@ -126,19 +129,15 @@ public DistributedCacheEventStreamWriter(
126129
DistributedCacheEventStreamStoreOptions options)
127130
{
128131
_cache = cache;
129-
SessionId = sessionId;
130-
StreamId = streamId;
131-
Mode = mode;
132+
_sessionId = sessionId;
133+
_streamId = streamId;
134+
_mode = mode;
132135
_options = options;
133136
}
134137

135-
public string SessionId { get; }
136-
public string StreamId { get; }
137-
public SseEventStreamMode Mode { get; private set; }
138-
139138
public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken cancellationToken = default)
140139
{
141-
Mode = mode;
140+
_mode = mode;
142141
await UpdateMetadataAsync(cancellationToken).ConfigureAwait(false);
143142
}
144143

@@ -152,7 +151,7 @@ public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken c
152151

153152
// Generate a new sequence number and event ID
154153
var sequence = Interlocked.Increment(ref _sequence);
155-
var eventId = DistributedCacheEventIdFormatter.Format(SessionId, StreamId, sequence);
154+
var eventId = DistributedCacheEventIdFormatter.Format(_sessionId, _streamId, sequence);
156155
var newItem = sseItem with { EventId = eventId };
157156

158157
// Store the event in the cache
@@ -182,13 +181,13 @@ private async ValueTask UpdateMetadataAsync(CancellationToken cancellationToken)
182181
{
183182
var metadata = new StreamMetadata
184183
{
185-
Mode = Mode,
184+
Mode = _mode,
186185
IsCompleted = _disposed,
187186
LastSequence = Interlocked.Read(ref _sequence),
188187
};
189188

190189
var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadata, McpJsonUtilities.JsonContext.Default.StreamMetadata);
191-
var metadataKey = CacheKeys.StreamMetadata(SessionId, StreamId);
190+
var metadataKey = CacheKeys.StreamMetadata(_sessionId, _streamId);
192191

193192
await _cache.SetAsync(metadataKey, metadataBytes, new DistributedCacheEntryOptions
194193
{

src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStoreOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public sealed class DistributedCacheEventStreamStoreOptions
4141

4242
/// <summary>
4343
/// Gets or sets the interval between polling attempts when a reader is waiting for new events
44-
/// in <see cref="Server.SseEventStreamMode.Default"/> mode.
44+
/// in <see cref="SseEventStreamMode.Polling"/> mode.
4545
/// </summary>
4646
/// <remarks>
4747
/// This only affects readers. A shorter interval provides lower latency for new events

0 commit comments

Comments
 (0)