forked from modelcontextprotocol/typescript-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstandaloneSseWithGetStreamableHttp.ts
More file actions
129 lines (113 loc) · 4.57 KB
/
standaloneSseWithGetStreamableHttp.ts
File metadata and controls
129 lines (113 loc) · 4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import { randomUUID } from 'node:crypto';
import { createMcpExpressApp } from '@modelcontextprotocol/express';
import { NodeStreamableHTTPServerTransport } from '@modelcontextprotocol/node';
import type { ReadResourceResult } from '@modelcontextprotocol/server';
import { isInitializeRequest, McpServer } from '@modelcontextprotocol/server';
import type { Request, Response } from 'express';
// Create an MCP server with implementation details
const server = new McpServer({
name: 'resource-list-changed-notification-server',
version: '1.0.0'
});
// Store transports by session ID to send notifications
const transports: { [sessionId: string]: NodeStreamableHTTPServerTransport } = {};
const addResource = (name: string, content: string) => {
const uri = `https://mcp-example.com/dynamic/${encodeURIComponent(name)}`;
server.registerResource(
name,
uri,
{ mimeType: 'text/plain', description: `Dynamic resource: ${name}` },
async (): Promise<ReadResourceResult> => {
return {
contents: [{ uri, text: content }]
};
}
);
};
addResource('example-resource', 'Initial content for example-resource');
const resourceChangeInterval = setInterval(() => {
const name = randomUUID();
addResource(name, `Content for ${name}`);
}, 5000); // Change resources every 5 seconds for testing
const app = createMcpExpressApp();
app.post('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP request:', req.body);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: NodeStreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request
transport = new NodeStreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: sessionId => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});
// Connect the transport to the MCP server
await server.connect(transport);
// Handle the request - the onsessioninitialized callback will store the transport
await transport.handleRequest(req, res, req.body);
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32_000,
message: 'Bad Request: No valid session ID provided'
},
id: null
});
return;
}
// Handle the request with existing transport
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32_603,
message: 'Internal server error'
},
id: null
});
}
}
});
// Handle GET requests for SSE streams (now using built-in support from StreamableHTTP)
app.get('/mcp', async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).send('Invalid or missing session ID');
return;
}
console.log(`Establishing SSE stream for session ${sessionId}`);
const transport = transports[sessionId];
await transport.handleRequest(req, res);
});
// Start the server
const PORT = 3000;
app.listen(PORT, error => {
if (error) {
console.error('Failed to start server:', error);
// eslint-disable-next-line unicorn/no-process-exit
process.exit(1);
}
console.log(`Server listening on port ${PORT}`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
clearInterval(resourceChangeInterval);
await server.close();
process.exit(0);
});