Skip to content

Commit 8328cfe

Browse files
committed
example of sse server
1 parent 4d2968c commit 8328cfe

File tree

1 file changed

+170
-0
lines changed

1 file changed

+170
-0
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import express, { Request, Response } from 'express';
2+
import { randomUUID } from "node:crypto";
3+
import { McpServer } from '../../server/mcp.js';
4+
import { SSEServerTransport } from '../../server/sse.js';
5+
import { z } from 'zod';
6+
import { CallToolResult } from '../../types.js';
7+
8+
/**
9+
* This example server demonstrates the deprecated HTTP+SSE transport
10+
* (protocol version 2024-11-05). It mainly used for testing backward compatible clients.
11+
*
12+
* The server exposes two endpoints:
13+
* - /sse: For establishing the SSE stream (GET)
14+
* - /messages: For receiving client messages (POST)
15+
*
16+
*/
17+
18+
// Create an MCP server instance
19+
const server = new McpServer({
20+
name: 'simple-sse-server',
21+
version: '1.0.0',
22+
}, { capabilities: { logging: {} } });
23+
24+
server.tool(
25+
'start-notification-stream',
26+
'Starts sending periodic notifications',
27+
{
28+
interval: z.number().describe('Interval in milliseconds between notifications').default(1000),
29+
count: z.number().describe('Number of notifications to send').default(10),
30+
},
31+
async ({ interval, count }, { sendNotification }): Promise<CallToolResult> => {
32+
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
33+
let counter = 0;
34+
35+
// Send the initial notification
36+
await sendNotification({
37+
method: "notifications/message",
38+
params: {
39+
level: "info",
40+
data: `Starting notification stream with ${count} messages every ${interval}ms`
41+
}
42+
});
43+
44+
// Send periodic notifications
45+
while (counter < count) {
46+
counter++;
47+
await sleep(interval);
48+
49+
try {
50+
await sendNotification({
51+
method: "notifications/message",
52+
params: {
53+
level: "info",
54+
data: `Notification #${counter} at ${new Date().toISOString()}`
55+
}
56+
});
57+
}
58+
catch (error) {
59+
console.error("Error sending notification:", error);
60+
}
61+
}
62+
63+
return {
64+
content: [
65+
{
66+
type: 'text',
67+
text: `Completed sending ${count} notifications every ${interval}ms`,
68+
}
69+
],
70+
};
71+
}
72+
);
73+
74+
const app = express();
75+
app.use(express.json());
76+
77+
// Store transports by session ID
78+
const transports: Record<string, SSEServerTransport> = {};
79+
80+
// SSE endpoint for establishing the stream
81+
app.get('/sse', async (req: Request, res: Response) => {
82+
console.log('Received GET request to /sse (establishing SSE stream)');
83+
84+
try {
85+
// Create a new SSE transport for the client
86+
// The endpoint for POST messages is '/messages'
87+
const transport = new SSEServerTransport('/messages', res);
88+
89+
// Store the transport by session ID
90+
const sessionId = transport.sessionId;
91+
transports[sessionId] = transport;
92+
93+
// Set up onclose handler to clean up transport when closed
94+
transport.onclose = () => {
95+
console.log(`SSE transport closed for session ${sessionId}`);
96+
delete transports[sessionId];
97+
};
98+
99+
// Connect the transport to the MCP server
100+
await server.connect(transport);
101+
102+
// Start the SSE transport to begin streaming
103+
// This sends an initial 'endpoint' event with the session ID in the URL
104+
await transport.start();
105+
106+
console.log(`Established SSE stream with session ID: ${sessionId}`);
107+
} catch (error) {
108+
console.error('Error establishing SSE stream:', error);
109+
if (!res.headersSent) {
110+
res.status(500).send('Error establishing SSE stream');
111+
}
112+
}
113+
});
114+
115+
// Messages endpoint for receiving client JSON-RPC requests
116+
app.post('/messages', async (req: Request, res: Response) => {
117+
console.log('Received POST request to /messages');
118+
119+
// Extract session ID from URL query parameter
120+
// In the SSE protocol, this is added by the client based on the endpoint event
121+
const sessionId = req.query.sessionId as string | undefined;
122+
123+
if (!sessionId) {
124+
console.error('No session ID provided in request URL');
125+
res.status(400).send('Missing sessionId parameter');
126+
return;
127+
}
128+
129+
const transport = transports[sessionId];
130+
if (!transport) {
131+
console.error(`No active transport found for session ID: ${sessionId}`);
132+
res.status(404).send('Session not found');
133+
return;
134+
}
135+
136+
try {
137+
// Handle the POST message with the transport
138+
await transport.handlePostMessage(req, res, req.body);
139+
} catch (error) {
140+
console.error('Error handling request:', error);
141+
if (!res.headersSent) {
142+
res.status(500).send('Error handling request');
143+
}
144+
}
145+
});
146+
147+
// Start the server
148+
const PORT = 3000;
149+
app.listen(PORT, () => {
150+
console.log(`Simple SSE Server (deprecated protocol version 2024-11-05) listening on port ${PORT}`);
151+
});
152+
153+
// Handle server shutdown
154+
process.on('SIGINT', async () => {
155+
console.log('Shutting down server...');
156+
157+
// Close all active transports to properly clean up resources
158+
for (const sessionId in transports) {
159+
try {
160+
console.log(`Closing transport for session ${sessionId}`);
161+
await transports[sessionId].close();
162+
delete transports[sessionId];
163+
} catch (error) {
164+
console.error(`Error closing transport for session ${sessionId}:`, error);
165+
}
166+
}
167+
await server.close();
168+
console.log('Server shutdown complete');
169+
process.exit(0);
170+
});

0 commit comments

Comments
 (0)