Skip to content

Commit bcbf247

Browse files
committed
Store retry interval
1 parent 80b2cb4 commit bcbf247

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStore.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ internal sealed class StoredEvent
118118
{
119119
public string? EventType { get; set; }
120120
public string? EventId { get; set; }
121+
public int? ReconnectionIntervalMs { get; set; }
121122
public JsonRpcMessage? Data { get; set; }
122123
}
123124

@@ -174,6 +175,9 @@ public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken c
174175
{
175176
EventType = newItem.EventType,
176177
EventId = eventId,
178+
ReconnectionIntervalMs = newItem.ReconnectionInterval.HasValue
179+
? (int)newItem.ReconnectionInterval.Value.TotalMilliseconds
180+
: null,
177181
Data = newItem.Data,
178182
};
179183

@@ -299,6 +303,9 @@ public DistributedCacheEventStreamReader(
299303
yield return new SseItem<JsonRpcMessage?>(storedEvent.Data, storedEvent.EventType)
300304
{
301305
EventId = storedEvent.EventId,
306+
ReconnectionInterval = storedEvent.ReconnectionIntervalMs.HasValue
307+
? TimeSpan.FromMilliseconds(storedEvent.ReconnectionIntervalMs.Value)
308+
: null,
302309
};
303310
}
304311
}

tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,95 @@ public async Task WriteEventAsync_PreservesEventTypeProperty_InReturnedItem()
130130
Assert.Equal("custom-event-type", result.EventType);
131131
}
132132

133+
[Fact]
134+
public async Task WriteEventAsync_PreservesReconnectionIntervalProperty_InStoredEvent()
135+
{
136+
// Arrange
137+
var cache = CreateMemoryCache();
138+
var store = new DistributedCacheEventStreamStore(cache);
139+
var writer = await store.CreateStreamAsync(new SseEventStreamOptions
140+
{
141+
SessionId = "session-1",
142+
StreamId = "stream-1",
143+
Mode = SseEventStreamMode.Polling
144+
}, CancellationToken);
145+
146+
var expectedInterval = TimeSpan.FromSeconds(5);
147+
var item = new SseItem<JsonRpcMessage?>(null) { ReconnectionInterval = expectedInterval };
148+
149+
// Act
150+
var result = await writer.WriteEventAsync(item, CancellationToken);
151+
152+
// Assert - ReconnectionInterval should be preserved in returned item
153+
Assert.Equal(expectedInterval, result.ReconnectionInterval);
154+
155+
// Get a reader and verify ReconnectionInterval is preserved after round-trip
156+
var reader = await store.GetStreamReaderAsync(result.EventId!, CancellationToken);
157+
Assert.NotNull(reader);
158+
159+
var events = new List<SseItem<JsonRpcMessage?>>();
160+
await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
161+
{
162+
events.Add(evt);
163+
}
164+
165+
// Reader should not return the event we just wrote (it starts after lastEventId)
166+
Assert.Empty(events);
167+
168+
// Write another event and verify it can be read with correct ReconnectionInterval
169+
var secondItem = new SseItem<JsonRpcMessage?>(null) { ReconnectionInterval = TimeSpan.FromSeconds(10) };
170+
_ = await writer.WriteEventAsync(secondItem, CancellationToken);
171+
172+
// Re-fetch reader using the first event ID to get the second event
173+
reader = await store.GetStreamReaderAsync(result.EventId!, CancellationToken);
174+
Assert.NotNull(reader);
175+
176+
await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
177+
{
178+
events.Add(evt);
179+
}
180+
181+
Assert.Single(events);
182+
Assert.Equal(TimeSpan.FromSeconds(10), events[0].ReconnectionInterval);
183+
}
184+
185+
[Fact]
186+
public async Task WriteEventAsync_HandlesNullReconnectionInterval_InStoredEvent()
187+
{
188+
// Arrange
189+
var cache = CreateMemoryCache();
190+
var store = new DistributedCacheEventStreamStore(cache);
191+
var writer = await store.CreateStreamAsync(new SseEventStreamOptions
192+
{
193+
SessionId = "session-1",
194+
StreamId = "stream-1",
195+
Mode = SseEventStreamMode.Polling
196+
}, CancellationToken);
197+
198+
// Write an event WITH a reconnection interval first
199+
var firstItem = new SseItem<JsonRpcMessage?>(null) { ReconnectionInterval = TimeSpan.FromSeconds(5) };
200+
var firstResult = await writer.WriteEventAsync(firstItem, CancellationToken);
201+
202+
// Write an event WITHOUT a reconnection interval
203+
var secondItem = new SseItem<JsonRpcMessage?>(null);
204+
var secondResult = await writer.WriteEventAsync(secondItem, CancellationToken);
205+
Assert.Null(secondResult.ReconnectionInterval);
206+
207+
// Get a reader starting after the first event
208+
var reader = await store.GetStreamReaderAsync(firstResult.EventId!, CancellationToken);
209+
Assert.NotNull(reader);
210+
211+
var events = new List<SseItem<JsonRpcMessage?>>();
212+
await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
213+
{
214+
events.Add(evt);
215+
}
216+
217+
// Should get the second event with null ReconnectionInterval
218+
Assert.Single(events);
219+
Assert.Null(events[0].ReconnectionInterval);
220+
}
221+
133222
[Fact]
134223
public async Task WriteEventAsync_HandlesNullData_AssignsEventIdAndStoresEvent()
135224
{

0 commit comments

Comments
 (0)