Skip to content

Commit 4d2968c

Browse files
committed
backward compatible server example
1 parent c053c2e commit 4d2968c

File tree

1 file changed

+316
-0
lines changed

1 file changed

+316
-0
lines changed
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
import express, { Request, Response } from 'express';
2+
import { randomUUID } from "node:crypto";
3+
import { McpServer } from '../../server/mcp.js';
4+
import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
5+
import { SSEServerTransport } from '../../server/sse.js';
6+
import { z } from 'zod';
7+
import { CallToolResult, isInitializeRequest, JSONRPCMessage } from '../../types.js';
8+
9+
/**
10+
* This example server demonstrates backwards compatibility with both:
11+
* 1. The deprecated HTTP+SSE transport (protocol version 2024-11-05)
12+
* 2. The Streamable HTTP transport (protocol version 2025-03-26)
13+
*
14+
* It maintains a single MCP server instance but exposes two transport options:
15+
* - /mcp: The new Streamable HTTP endpoint (supports GET/POST/DELETE)
16+
* - /sse: The deprecated SSE endpoint for older clients (GET to establish stream)
17+
* - /request: The deprecated POST endpoint for older clients (POST to send messages)
18+
*/
19+
20+
// Simple in-memory event store for resumability
21+
class InMemoryEventStore implements EventStore {
22+
private events: Map<string, { streamId: string, message: JSONRPCMessage }> = new Map();
23+
24+
/**
25+
* Generates a unique event ID for a given stream ID
26+
*/
27+
private generateEventId(streamId: string): string {
28+
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
29+
}
30+
31+
private getStreamIdFromEventId(eventId: string): string {
32+
const parts = eventId.split('_');
33+
return parts.length > 0 ? parts[0] : '';
34+
}
35+
36+
/**
37+
* Stores an event with a generated event ID
38+
* Implements EventStore.storeEvent
39+
*/
40+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
41+
const eventId = this.generateEventId(streamId);
42+
console.log(`Storing event ${eventId} for stream ${streamId}`);
43+
this.events.set(eventId, { streamId, message });
44+
return eventId;
45+
}
46+
47+
/**
48+
* Replays events that occurred after a specific event ID
49+
* Implements EventStore.replayEventsAfter
50+
*/
51+
async replayEventsAfter(lastEventId: string,
52+
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
53+
): Promise<string> {
54+
if (!lastEventId || !this.events.has(lastEventId)) {
55+
console.log(`No events found for lastEventId: ${lastEventId}`);
56+
return '';
57+
}
58+
59+
const streamId = this.getStreamIdFromEventId(lastEventId);
60+
if (!streamId) {
61+
console.log(`Could not extract streamId from lastEventId: ${lastEventId}`);
62+
return '';
63+
}
64+
65+
let foundLastEvent = false;
66+
let eventCount = 0;
67+
68+
// Sort events by eventId for chronological ordering
69+
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
70+
71+
for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
72+
// Only include events from the same stream
73+
if (eventStreamId !== streamId) {
74+
continue;
75+
}
76+
77+
// Start sending events after we find the lastEventId
78+
if (eventId === lastEventId) {
79+
foundLastEvent = true;
80+
continue;
81+
}
82+
83+
if (foundLastEvent) {
84+
await send(eventId, message);
85+
eventCount++;
86+
}
87+
}
88+
89+
console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`);
90+
return streamId;
91+
}
92+
}
93+
94+
// Create a shared MCP server instance
95+
const server = new McpServer({
96+
name: 'backwards-compatible-server',
97+
version: '1.0.0',
98+
}, { capabilities: { logging: {} } });
99+
100+
// Register a simple tool that sends notifications over time
101+
server.tool(
102+
'start-notification-stream',
103+
'Starts sending periodic notifications for testing resumability',
104+
{
105+
interval: z.number().describe('Interval in milliseconds between notifications').default(100),
106+
count: z.number().describe('Number of notifications to send (0 for 100)').default(50),
107+
},
108+
async ({ interval, count }, { sendNotification }): Promise<CallToolResult> => {
109+
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
110+
let counter = 0;
111+
112+
while (count === 0 || counter < count) {
113+
counter++;
114+
try {
115+
await sendNotification({
116+
method: "notifications/message",
117+
params: {
118+
level: "info",
119+
data: `Periodic notification #${counter} at ${new Date().toISOString()}`
120+
}
121+
});
122+
}
123+
catch (error) {
124+
console.error("Error sending notification:", error);
125+
}
126+
// Wait for the specified interval
127+
await sleep(interval);
128+
}
129+
130+
return {
131+
content: [
132+
{
133+
type: 'text',
134+
text: `Started sending periodic notifications every ${interval}ms`,
135+
}
136+
],
137+
};
138+
}
139+
);
140+
141+
// Create Express application
142+
const app = express();
143+
app.use(express.json());
144+
145+
// Store transports by session ID
146+
const transports: Record<string, StreamableHTTPServerTransport | SSEServerTransport> = {};
147+
148+
//=============================================================================
149+
// STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26)
150+
//=============================================================================
151+
152+
// Handle all MCP Streamable HTTP requests (GET, POST, DELETE) on a single endpoint
153+
app.all('/mcp', async (req: Request, res: Response) => {
154+
console.log(`Received ${req.method} request to /mcp`);
155+
156+
try {
157+
// Check for existing session ID
158+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
159+
let transport: StreamableHTTPServerTransport;
160+
161+
if (sessionId && transports[sessionId]) {
162+
// Check if the transport is of the correct type
163+
const existingTransport = transports[sessionId];
164+
if (existingTransport instanceof StreamableHTTPServerTransport) {
165+
// Reuse existing transport
166+
transport = existingTransport;
167+
} else {
168+
// Transport exists but is not a StreamableHTTPServerTransport (could be SSEServerTransport)
169+
res.status(400).json({
170+
jsonrpc: '2.0',
171+
error: {
172+
code: -32000,
173+
message: 'Bad Request: Session exists but uses a different transport protocol',
174+
},
175+
id: null,
176+
});
177+
return;
178+
}
179+
} else if (!sessionId && req.method === 'POST' && isInitializeRequest(req.body)) {
180+
const eventStore = new InMemoryEventStore();
181+
transport = new StreamableHTTPServerTransport({
182+
sessionIdGenerator: () => randomUUID(),
183+
eventStore, // Enable resumability
184+
onsessioninitialized: (sessionId) => {
185+
// Store the transport by session ID when session is initialized
186+
console.log(`StreamableHTTP session initialized with ID: ${sessionId}`);
187+
transports[sessionId] = transport;
188+
}
189+
});
190+
191+
// Set up onclose handler to clean up transport when closed
192+
transport.onclose = () => {
193+
const sid = transport.sessionId;
194+
if (sid && transports[sid]) {
195+
console.log(`Transport closed for session ${sid}, removing from transports map`);
196+
delete transports[sid];
197+
}
198+
};
199+
200+
// Connect the transport to the MCP server
201+
await server.connect(transport);
202+
} else {
203+
// Invalid request - no session ID or not initialization request
204+
res.status(400).json({
205+
jsonrpc: '2.0',
206+
error: {
207+
code: -32000,
208+
message: 'Bad Request: No valid session ID provided',
209+
},
210+
id: null,
211+
});
212+
return;
213+
}
214+
215+
// Handle the request with the transport
216+
await transport.handleRequest(req, res, req.body);
217+
} catch (error) {
218+
console.error('Error handling MCP request:', error);
219+
if (!res.headersSent) {
220+
res.status(500).json({
221+
jsonrpc: '2.0',
222+
error: {
223+
code: -32603,
224+
message: 'Internal server error',
225+
},
226+
id: null,
227+
});
228+
}
229+
}
230+
});
231+
232+
//=============================================================================
233+
// DEPRECATED HTTP+SSE TRANSPORT (PROTOCOL VERSION 2024-11-05)
234+
//=============================================================================
235+
236+
app.get('/sse', async (req: Request, res: Response) => {
237+
console.log('Received GET request to /sse (deprecated SSE transport)');
238+
const transport = new SSEServerTransport('/messages', res);
239+
transports[transport.sessionId] = transport;
240+
res.on("close", () => {
241+
delete transports[transport.sessionId];
242+
});
243+
await server.connect(transport);
244+
});
245+
246+
app.post("/messages", async (req: Request, res: Response) => {
247+
const sessionId = req.query.sessionId as string;
248+
let transport: SSEServerTransport;
249+
const existingTransport = transports[sessionId];
250+
if (existingTransport instanceof SSEServerTransport) {
251+
// Reuse existing transport
252+
transport = existingTransport;
253+
} else {
254+
// Transport exists but is not a SSEServerTransport (could be StreamableHTTPServerTransport)
255+
res.status(400).json({
256+
jsonrpc: '2.0',
257+
error: {
258+
code: -32000,
259+
message: 'Bad Request: Session exists but uses a different transport protocol',
260+
},
261+
id: null,
262+
});
263+
return;
264+
}
265+
if (transport) {
266+
await transport.handlePostMessage(req, res);
267+
} else {
268+
res.status(400).send('No transport found for sessionId');
269+
}
270+
});
271+
272+
273+
// Start the server
274+
const PORT = 3000;
275+
app.listen(PORT, () => {
276+
console.log(`Backwards compatible MCP server listening on port ${PORT}`);
277+
console.log(`
278+
==============================================
279+
SUPPORTED TRANSPORT OPTIONS:
280+
281+
1. Streamable Http(Protocol version: 2025-03-26)
282+
Endpoint: /mcp
283+
Methods: GET, POST, DELETE
284+
Usage:
285+
- Initialize with POST to /mcp
286+
- Establish SSE stream with GET to /mcp
287+
- Send requests with POST to /mcp
288+
- Terminate session with DELETE to /mcp
289+
290+
2. Http + SSE (Protocol version: 2024-11-05)
291+
Endpoints: /sse (GET) and /request (POST)
292+
Usage:
293+
- Establish SSE stream with GET to /sse
294+
- Send requests with POST to /message?sessionId=<id>
295+
==============================================
296+
`);
297+
});
298+
299+
// Handle server shutdown
300+
process.on('SIGINT', async () => {
301+
console.log('Shutting down server...');
302+
303+
// Close all active transports to properly clean up resources
304+
for (const sessionId in transports) {
305+
try {
306+
console.log(`Closing transport for session ${sessionId}`);
307+
await transports[sessionId].close();
308+
delete transports[sessionId];
309+
} catch (error) {
310+
console.error(`Error closing transport for session ${sessionId}:`, error);
311+
}
312+
}
313+
await server.close();
314+
console.log('Server shutdown complete');
315+
process.exit(0);
316+
});

0 commit comments

Comments
 (0)