Skip to content

Commit 3c4709e

Browse files
committed
fix: resume in-memory event streams with underscored ids
1 parent db83829 commit 3c4709e

2 files changed

Lines changed: 41 additions & 17 deletions

File tree

examples/server/src/inMemoryEventStore.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@ export class InMemoryEventStore implements EventStore {
1515
return `${streamId}_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
1616
}
1717

18-
/**
19-
* Extracts the stream ID from an event ID
20-
*/
21-
private getStreamIdFromEventId(eventId: string): string {
22-
const parts = eventId.split('_');
23-
return parts.length > 0 ? parts[0]! : '';
24-
}
25-
2618
/**
2719
* Stores an event with a generated event ID
2820
* Implements EventStore.storeEvent
@@ -45,18 +37,17 @@ export class InMemoryEventStore implements EventStore {
4537
return '';
4638
}
4739

48-
// Extract the stream ID from the event ID
49-
const streamId = this.getStreamIdFromEventId(lastEventId);
40+
const streamId = this.events.get(lastEventId)?.streamId ?? '';
5041
if (!streamId) {
5142
return '';
5243
}
5344

5445
let foundLastEvent = false;
5546

56-
// Sort events by eventId for chronological ordering
57-
const sortedEvents = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0]));
58-
59-
for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
47+
// Map preserves insertion order, which is the event creation order. The
48+
// generated IDs include a random suffix, so lexicographic sorting can
49+
// reorder events created in the same millisecond.
50+
for (const [eventId, { streamId: eventStreamId, message }] of this.events) {
6051
// Only include events from the same stream
6152
if (eventStreamId !== streamId) {
6253
continue;

test/integration/test/taskResumability.test.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@ class InMemoryEventStore implements EventStore {
2626
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
2727
): Promise<string> {
2828
if (!lastEventId || !this.events.has(lastEventId)) return '';
29-
const streamId = lastEventId.split('_')[0] ?? '';
29+
const streamId = this.events.get(lastEventId)?.streamId ?? '';
3030
if (!streamId) return '';
3131

3232
let found = false;
33-
const sorted = [...this.events.entries()].toSorted((a, b) => a[0].localeCompare(b[0]));
34-
for (const [eventId, { streamId: sid, message }] of sorted) {
33+
for (const [eventId, { streamId: sid, message }] of this.events) {
3534
if (sid !== streamId) continue;
3635
if (eventId === lastEventId) {
3736
found = true;
@@ -158,6 +157,40 @@ describe('Zod v4', () => {
158157
await transport.close();
159158
});
160159

160+
it('should replay events for standalone SSE stream IDs', async () => {
161+
const nowSpy = vi.spyOn(Date, 'now').mockReturnValue(1_777_506_150_663);
162+
const randomSpy = vi.spyOn(Math, 'random').mockReturnValueOnce(0.9).mockReturnValueOnce(0.1);
163+
164+
const message1: JSONRPCMessage = {
165+
jsonrpc: '2.0',
166+
method: 'notifications/message',
167+
params: { level: 'info', data: 'first' }
168+
};
169+
const message2: JSONRPCMessage = {
170+
jsonrpc: '2.0',
171+
method: 'notifications/message',
172+
params: { level: 'info', data: 'second' }
173+
};
174+
const replayed: Array<{ eventId: string; message: JSONRPCMessage }> = [];
175+
176+
try {
177+
const firstEventId = await eventStore.storeEvent('_GET_stream', message1);
178+
const secondEventId = await eventStore.storeEvent('_GET_stream', message2);
179+
180+
const streamId = await eventStore.replayEventsAfter(firstEventId, {
181+
send: async (eventId, message) => {
182+
replayed.push({ eventId, message });
183+
}
184+
});
185+
186+
expect(streamId).toBe('_GET_stream');
187+
expect(replayed).toEqual([{ eventId: secondEventId, message: message2 }]);
188+
} finally {
189+
nowSpy.mockRestore();
190+
randomSpy.mockRestore();
191+
}
192+
});
193+
161194
it('should have session ID functionality', async () => {
162195
// The ability to store a session ID when connecting
163196
const client = new Client({

0 commit comments

Comments
 (0)