Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-in-memory-event-store-underscore-streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@modelcontextprotocol/examples-server": patch
"@modelcontextprotocol/test-integration": patch
---

fix: resume in-memory event streams with underscored ids
19 changes: 5 additions & 14 deletions examples/server/src/inMemoryEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ export class InMemoryEventStore implements EventStore {
return `${streamId}_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
}

/**
* Extracts the stream ID from an event ID
*/
private getStreamIdFromEventId(eventId: string): string {
const parts = eventId.split('_');
return parts.length > 0 ? parts[0]! : '';
}

/**
* Stores an event with a generated event ID
* Implements EventStore.storeEvent
Expand All @@ -45,18 +37,17 @@ export class InMemoryEventStore implements EventStore {
return '';
}

// Extract the stream ID from the event ID
const streamId = this.getStreamIdFromEventId(lastEventId);
const streamId = this.events.get(lastEventId)?.streamId ?? '';
if (!streamId) {
return '';
}

let foundLastEvent = false;

// Sort events by eventId for chronological ordering
const sortedEvents = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0]));

for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
// Map preserves insertion order, which is the event creation order. The
// generated IDs include a random suffix, so lexicographic sorting can
// reorder events created in the same millisecond.
for (const [eventId, { streamId: eventStreamId, message }] of this.events) {
// Only include events from the same stream
if (eventStreamId !== streamId) {
continue;
Expand Down
39 changes: 36 additions & 3 deletions test/integration/test/taskResumability.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ class InMemoryEventStore implements EventStore {
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
): Promise<string> {
if (!lastEventId || !this.events.has(lastEventId)) return '';
const streamId = lastEventId.split('_')[0] ?? '';
const streamId = this.events.get(lastEventId)?.streamId ?? '';
if (!streamId) return '';

let found = false;
const sorted = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0]));
for (const [eventId, { streamId: sid, message }] of sorted) {
for (const [eventId, { streamId: sid, message }] of this.events) {
if (sid !== streamId) continue;
if (eventId === lastEventId) {
found = true;
Expand Down Expand Up @@ -158,6 +157,40 @@ describe('Zod v4', () => {
await transport.close();
});

it('should replay events for standalone SSE stream IDs', async () => {
const nowSpy = vi.spyOn(Date, 'now').mockReturnValue(1_777_506_150_663);
const randomSpy = vi.spyOn(Math, 'random').mockReturnValueOnce(0.9).mockReturnValueOnce(0.1);

const message1: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'notifications/message',
params: { level: 'info', data: 'first' }
};
const message2: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'notifications/message',
params: { level: 'info', data: 'second' }
};
const replayed: Array<{ eventId: string; message: JSONRPCMessage }> = [];

try {
const firstEventId = await eventStore.storeEvent('_GET_stream', message1);
const secondEventId = await eventStore.storeEvent('_GET_stream', message2);

const streamId = await eventStore.replayEventsAfter(firstEventId, {
send: async (eventId, message) => {
replayed.push({ eventId, message });
}
});

expect(streamId).toBe('_GET_stream');
expect(replayed).toEqual([{ eventId: secondEventId, message: message2 }]);
} finally {
nowSpy.mockRestore();
randomSpy.mockRestore();
}
});

it('should have session ID functionality', async () => {
// The ability to store a session ID when connecting
const client = new Client({
Expand Down
Loading