diff --git a/.changeset/fix-in-memory-event-store-underscore-streams.md b/.changeset/fix-in-memory-event-store-underscore-streams.md new file mode 100644 index 000000000..aa7651a4f --- /dev/null +++ b/.changeset/fix-in-memory-event-store-underscore-streams.md @@ -0,0 +1,6 @@ +--- +"@modelcontextprotocol/examples-server": patch +"@modelcontextprotocol/test-integration": patch +--- + +fix: resume in-memory event streams with underscored ids diff --git a/examples/server/src/inMemoryEventStore.ts b/examples/server/src/inMemoryEventStore.ts index 604b84d39..2c98baecf 100644 --- a/examples/server/src/inMemoryEventStore.ts +++ b/examples/server/src/inMemoryEventStore.ts @@ -15,14 +15,6 @@ export class InMemoryEventStore implements EventStore { return `${streamId}_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`; } - /** - * Extracts the stream ID from an event ID - */ - private getStreamIdFromEventId(eventId: string): string { - const parts = eventId.split('_'); - return parts.length > 0 ? parts[0]! : ''; - } - /** * Stores an event with a generated event ID * Implements EventStore.storeEvent @@ -45,18 +37,17 @@ export class InMemoryEventStore implements EventStore { return ''; } - // Extract the stream ID from the event ID - const streamId = this.getStreamIdFromEventId(lastEventId); + const streamId = this.events.get(lastEventId)?.streamId ?? ''; if (!streamId) { return ''; } let foundLastEvent = false; - // Sort events by eventId for chronological ordering - const sortedEvents = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0])); - - for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { + // Map preserves insertion order, which is the event creation order. The + // generated IDs include a random suffix, so lexicographic sorting can + // reorder events created in the same millisecond. + for (const [eventId, { streamId: eventStreamId, message }] of this.events) { // Only include events from the same stream if (eventStreamId !== streamId) { continue; diff --git a/test/integration/test/taskResumability.test.ts b/test/integration/test/taskResumability.test.ts index f7b4174d1..bf0aa0eb1 100644 --- a/test/integration/test/taskResumability.test.ts +++ b/test/integration/test/taskResumability.test.ts @@ -26,12 +26,11 @@ class InMemoryEventStore implements EventStore { { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } ): Promise { if (!lastEventId || !this.events.has(lastEventId)) return ''; - const streamId = lastEventId.split('_')[0] ?? ''; + const streamId = this.events.get(lastEventId)?.streamId ?? ''; if (!streamId) return ''; let found = false; - const sorted = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0])); - for (const [eventId, { streamId: sid, message }] of sorted) { + for (const [eventId, { streamId: sid, message }] of this.events) { if (sid !== streamId) continue; if (eventId === lastEventId) { found = true; @@ -158,6 +157,40 @@ describe('Zod v4', () => { await transport.close(); }); + it('should replay events for standalone SSE stream IDs', async () => { + const nowSpy = vi.spyOn(Date, 'now').mockReturnValue(1_777_506_150_663); + const randomSpy = vi.spyOn(Math, 'random').mockReturnValueOnce(0.9).mockReturnValueOnce(0.1); + + const message1: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/message', + params: { level: 'info', data: 'first' } + }; + const message2: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/message', + params: { level: 'info', data: 'second' } + }; + const replayed: Array<{ eventId: string; message: JSONRPCMessage }> = []; + + try { + const firstEventId = await eventStore.storeEvent('_GET_stream', message1); + const secondEventId = await eventStore.storeEvent('_GET_stream', message2); + + const streamId = await eventStore.replayEventsAfter(firstEventId, { + send: async (eventId, message) => { + replayed.push({ eventId, message }); + } + }); + + expect(streamId).toBe('_GET_stream'); + expect(replayed).toEqual([{ eventId: secondEventId, message: message2 }]); + } finally { + nowSpy.mockRestore(); + randomSpy.mockRestore(); + } + }); + it('should have session ID functionality', async () => { // The ability to store a session ID when connecting const client = new Client({