Skip to content

Commit c3f0856

Browse files
committed
share implementation of in memory store between examples and integration tests
1 parent 7341c9a commit c3f0856

File tree

4 files changed

+86
-213
lines changed

4 files changed

+86
-213
lines changed

src/examples/server/simpleStreamableHttp.ts

Lines changed: 3 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,10 @@
11
import express, { Request, Response } from 'express';
22
import { randomUUID } from 'node:crypto';
33
import { McpServer } from '../../server/mcp.js';
4-
import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
4+
import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
55
import { z } from 'zod';
6-
import { CallToolResult, GetPromptResult, isInitializeRequest, JSONRPCMessage, ReadResourceResult } from '../../types.js';
7-
8-
// Create a simple in-memory EventStore for resumability
9-
class InMemoryEventStore implements EventStore {
10-
private events: Map<string, { streamId: string, message: JSONRPCMessage }> = new Map();
11-
12-
/**
13-
* Generates a unique event ID for a given stream ID
14-
*/
15-
private generateEventId(streamId: string): string {
16-
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
17-
}
18-
19-
private getStreamIdFromEventId(eventId: string): string {
20-
const parts = eventId.split('_');
21-
return parts.length > 0 ? parts[0] : '';
22-
}
23-
24-
/**
25-
* Stores an event with a generated event ID
26-
* Implements EventStore.storeEvent
27-
*/
28-
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
29-
const eventId = this.generateEventId(streamId);
30-
console.log(`Storing event ${eventId} for stream ${streamId}`);
31-
this.events.set(eventId, { streamId, message });
32-
return eventId;
33-
}
34-
35-
/**
36-
* Replays events that occurred after a specific event ID
37-
* Implements EventStore.replayEventsAfter
38-
*/
39-
async replayEventsAfter(lastEventId: string,
40-
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
41-
): Promise<string> {
42-
if (!lastEventId || !this.events.has(lastEventId)) {
43-
console.log(`No events found for lastEventId: ${lastEventId}`);
44-
return '';
45-
}
46-
47-
// Extract the stream ID from the event ID
48-
const streamId = this.getStreamIdFromEventId(lastEventId);
49-
if (!streamId) {
50-
console.log(`Could not extract streamId from lastEventId: ${lastEventId}`);
51-
return '';
52-
}
53-
54-
let foundLastEvent = false;
55-
let eventCount = 0;
56-
57-
// Sort events by eventId for chronological ordering
58-
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
59-
60-
for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
61-
// Only include events from the same stream
62-
if (eventStreamId !== streamId) {
63-
continue;
64-
}
65-
66-
// Start sending events after we find the lastEventId
67-
if (eventId === lastEventId) {
68-
foundLastEvent = true;
69-
continue;
70-
}
71-
72-
if (foundLastEvent) {
73-
await send(eventId, message);
74-
eventCount++;
75-
}
76-
}
77-
78-
console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`);
79-
return streamId;
80-
}
81-
}
6+
import { CallToolResult, GetPromptResult, isInitializeRequest, ReadResourceResult } from '../../types.js';
7+
import { InMemoryEventStore } from '../shared/inMemoryEventStore.js';
828

839
// Create an MCP server with implementation details
8410
const server = new McpServer({

src/examples/server/sseAndStreamableHttpCompatibleServer.ts

Lines changed: 3 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import express, { Request, Response } from 'express';
22
import { randomUUID } from "node:crypto";
33
import { McpServer } from '../../server/mcp.js';
4-
import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
4+
import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
55
import { SSEServerTransport } from '../../server/sse.js';
66
import { z } from 'zod';
7-
import { CallToolResult, isInitializeRequest, JSONRPCMessage } from '../../types.js';
7+
import { CallToolResult, isInitializeRequest } from '../../types.js';
8+
import { InMemoryEventStore } from '../shared/inMemoryEventStore.js';
89

910
/**
1011
* This example server demonstrates backwards compatibility with both:
@@ -17,81 +18,7 @@ import { CallToolResult, isInitializeRequest, JSONRPCMessage } from '../../types
1718
* - /request: The deprecated POST endpoint for older clients (POST to send messages)
1819
*/
1920

20-
// Simple in-memory event store for resumability
21-
class InMemoryEventStore implements EventStore {
22-
private events: Map<string, { streamId: string, message: JSONRPCMessage }> = new Map();
2321

24-
/**
25-
* Generates a unique event ID for a given stream ID
26-
*/
27-
private generateEventId(streamId: string): string {
28-
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
29-
}
30-
31-
private getStreamIdFromEventId(eventId: string): string {
32-
const parts = eventId.split('_');
33-
return parts.length > 0 ? parts[0] : '';
34-
}
35-
36-
/**
37-
* Stores an event with a generated event ID
38-
* Implements EventStore.storeEvent
39-
*/
40-
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
41-
const eventId = this.generateEventId(streamId);
42-
console.log(`Storing event ${eventId} for stream ${streamId}`);
43-
this.events.set(eventId, { streamId, message });
44-
return eventId;
45-
}
46-
47-
/**
48-
* Replays events that occurred after a specific event ID
49-
* Implements EventStore.replayEventsAfter
50-
*/
51-
async replayEventsAfter(lastEventId: string,
52-
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
53-
): Promise<string> {
54-
if (!lastEventId || !this.events.has(lastEventId)) {
55-
console.log(`No events found for lastEventId: ${lastEventId}`);
56-
return '';
57-
}
58-
59-
const streamId = this.getStreamIdFromEventId(lastEventId);
60-
if (!streamId) {
61-
console.log(`Could not extract streamId from lastEventId: ${lastEventId}`);
62-
return '';
63-
}
64-
65-
let foundLastEvent = false;
66-
let eventCount = 0;
67-
68-
// Sort events by eventId for chronological ordering
69-
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
70-
71-
for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
72-
// Only include events from the same stream
73-
if (eventStreamId !== streamId) {
74-
continue;
75-
}
76-
77-
// Start sending events after we find the lastEventId
78-
if (eventId === lastEventId) {
79-
foundLastEvent = true;
80-
continue;
81-
}
82-
83-
if (foundLastEvent) {
84-
await send(eventId, message);
85-
eventCount++;
86-
}
87-
}
88-
89-
console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`);
90-
return streamId;
91-
}
92-
}
93-
94-
// Create a shared MCP server instance
9522
const server = new McpServer({
9623
name: 'backwards-compatible-server',
9724
version: '1.0.0',
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { JSONRPCMessage } from '../../types.js';
2+
import { EventStore } from '../../server/streamableHttp.js';
3+
4+
/**
5+
* Simple in-memory implementation of the EventStore interface for resumability
6+
* This is primarily intended for examples and testing, not for production use
7+
* where a persistent storage solution would be more appropriate.
8+
*/
9+
export class InMemoryEventStore implements EventStore {
10+
private events: Map<string, { streamId: string, message: JSONRPCMessage }> = new Map();
11+
12+
/**
13+
* Generates a unique event ID for a given stream ID
14+
*/
15+
private generateEventId(streamId: string): string {
16+
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
17+
}
18+
19+
/**
20+
* Extracts the stream ID from an event ID
21+
*/
22+
private getStreamIdFromEventId(eventId: string): string {
23+
const parts = eventId.split('_');
24+
return parts.length > 0 ? parts[0] : '';
25+
}
26+
27+
/**
28+
* Stores an event with a generated event ID
29+
* Implements EventStore.storeEvent
30+
*/
31+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
32+
const eventId = this.generateEventId(streamId);
33+
this.events.set(eventId, { streamId, message });
34+
return eventId;
35+
}
36+
37+
/**
38+
* Replays events that occurred after a specific event ID
39+
* Implements EventStore.replayEventsAfter
40+
*/
41+
async replayEventsAfter(lastEventId: string,
42+
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
43+
): Promise<string> {
44+
if (!lastEventId || !this.events.has(lastEventId)) {
45+
return '';
46+
}
47+
48+
// Extract the stream ID from the event ID
49+
const streamId = this.getStreamIdFromEventId(lastEventId);
50+
if (!streamId) {
51+
return '';
52+
}
53+
54+
let foundLastEvent = false;
55+
56+
// Sort events by eventId for chronological ordering
57+
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
58+
59+
for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
60+
// Only include events from the same stream
61+
if (eventStreamId !== streamId) {
62+
continue;
63+
}
64+
65+
// Start sending events after we find the lastEventId
66+
if (eventId === lastEventId) {
67+
foundLastEvent = true;
68+
continue;
69+
}
70+
71+
if (foundLastEvent) {
72+
await send(eventId, message);
73+
}
74+
}
75+
return streamId;
76+
}
77+
}

src/integration-tests/taskResumability.test.ts

Lines changed: 3 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4,68 +4,11 @@ import { randomUUID } from 'node:crypto';
44
import { Client } from '../client/index.js';
55
import { StreamableHTTPClientTransport } from '../client/streamableHttp.js';
66
import { McpServer } from '../server/mcp.js';
7-
import { EventStore, StreamableHTTPServerTransport } from '../server/streamableHttp.js';
8-
import { CallToolResultSchema, JSONRPCMessage, LoggingMessageNotificationSchema } from '../types.js';
7+
import { StreamableHTTPServerTransport } from '../server/streamableHttp.js';
8+
import { CallToolResultSchema, LoggingMessageNotificationSchema } from '../types.js';
99
import { z } from 'zod';
10+
import { InMemoryEventStore } from '../examples/shared/inMemoryEventStore.js';
1011

11-
/**
12-
* Simple in-memory event store implementation for resumability
13-
*/
14-
class InMemoryEventStore implements EventStore {
15-
private events: Map<string, { streamId: string, message: JSONRPCMessage }> = new Map();
16-
17-
private generateEventId(streamId: string): string {
18-
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
19-
}
20-
21-
private getStreamIdFromEventId(eventId: string): string {
22-
const parts = eventId.split('_');
23-
return parts.length > 0 ? parts[0] : '';
24-
}
25-
26-
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
27-
const eventId = this.generateEventId(streamId);
28-
this.events.set(eventId, { streamId, message });
29-
return eventId;
30-
}
31-
32-
async replayEventsAfter(lastEventId: string,
33-
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
34-
): Promise<string> {
35-
if (!lastEventId || !this.events.has(lastEventId)) {
36-
return '';
37-
}
38-
39-
// Extract the stream ID from the event ID
40-
const streamId = this.getStreamIdFromEventId(lastEventId);
41-
if (!streamId) {
42-
return '';
43-
}
44-
let foundLastEvent = false;
45-
46-
// Sort events by eventId for chronological ordering
47-
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
48-
49-
for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
50-
// Only include events from the same stream
51-
if (eventStreamId !== streamId) {
52-
continue;
53-
}
54-
55-
// Start collecting events after we find the lastEventId
56-
if (eventId === lastEventId) {
57-
foundLastEvent = true;
58-
continue;
59-
}
60-
61-
if (foundLastEvent) {
62-
await send(eventId, message);
63-
}
64-
}
65-
66-
return streamId;
67-
}
68-
}
6912

7013

7114
describe('Transport resumability', () => {

0 commit comments

Comments
 (0)