|
2 | 2 | * Express server implementation used for standby Actor mode. |
3 | 3 | */ |
4 | 4 |
|
| 5 | +import { randomUUID } from 'node:crypto'; |
| 6 | + |
5 | 7 | import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; |
| 8 | +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; |
6 | 9 | import type { Request, Response } from 'express'; |
7 | 10 | import express from 'express'; |
8 | 11 |
|
9 | 12 | import log from '@apify/log'; |
10 | 13 |
|
11 | | -import { HEADER_READINESS_PROBE, Routes } from './const.js'; |
12 | 14 | import { type ActorsMcpServer } from '../mcp-server.js'; |
| 15 | +import { getHelpMessage, HEADER_READINESS_PROBE, Routes } from './const.js'; |
13 | 16 | import { getActorRunData, processParamsGetTools } from './utils.js'; |
14 | 17 |
|
15 | 18 | export function createExpressApp( |
16 | 19 | host: string, |
17 | 20 | mcpServer: ActorsMcpServer, |
18 | 21 | ): express.Express { |
19 | | - const HELP_MESSAGE = `Connect to the server with GET request to ${host}/sse?token=YOUR-APIFY-TOKEN` |
20 | | - + ` and then send POST requests to ${host}/message?token=YOUR-APIFY-TOKEN`; |
21 | | - |
22 | 22 | const app = express(); |
| 23 | + app.use(express.json()); |
| 24 | + let transportSSE: SSEServerTransport; |
| 25 | + const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; |
23 | 26 |
|
24 | | - let transport: SSEServerTransport; |
| 27 | + function respondWithError(res: Response, error: unknown, logMessage: string, statusCode = 500) { |
| 28 | + log.error(`${logMessage}: ${error}`); |
| 29 | + if (!res.headersSent) { |
| 30 | + res.status(statusCode).json({ |
| 31 | + jsonrpc: '2.0', |
| 32 | + error: { |
| 33 | + code: statusCode === 500 ? -32603 : -32000, |
| 34 | + message: statusCode === 500 ? 'Internal server error' : 'Bad Request', |
| 35 | + }, |
| 36 | + id: null, |
| 37 | + }); |
| 38 | + } |
| 39 | + } |
25 | 40 |
|
26 | | - app.route(Routes.ROOT) |
27 | | - .get(async (req: Request, res: Response) => { |
28 | | - if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) { |
29 | | - log.debug('Received readiness probe'); |
30 | | - res.status(200).json({ message: 'Server is ready' }).end(); |
31 | | - return; |
| 41 | + app.get(Routes.ROOT, async (req: Request, res: Response) => { |
| 42 | + if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) { |
| 43 | + log.debug('Received readiness probe'); |
| 44 | + res.status(200).json({ message: 'Server is ready' }).end(); |
| 45 | + return; |
| 46 | + } |
| 47 | + try { |
| 48 | + log.info(`Received GET message at: ${Routes.ROOT}`); |
| 49 | + const tools = await processParamsGetTools(req.url); |
| 50 | + if (tools) { |
| 51 | + mcpServer.updateTools(tools); |
32 | 52 | } |
33 | | - try { |
34 | | - log.info(`Received GET message at: ${Routes.ROOT}`); |
35 | | - const tools = await processParamsGetTools(req.url); |
36 | | - if (tools) { |
37 | | - mcpServer.updateTools(tools); |
38 | | - } |
39 | | - res.setHeader('Content-Type', 'text/event-stream'); |
40 | | - res.setHeader('Cache-Control', 'no-cache'); |
41 | | - res.setHeader('Connection', 'keep-alive'); |
42 | | - res.status(200).json({ message: `Actor is using Model Context Protocol. ${HELP_MESSAGE}`, data: getActorRunData() }).end(); |
43 | | - } catch (error) { |
44 | | - log.error(`Error in GET ${Routes.ROOT} ${error}`); |
45 | | - res.status(500).json({ message: 'Internal Server Error' }).end(); |
| 53 | + res.setHeader('Content-Type', 'text/event-stream'); |
| 54 | + res.setHeader('Cache-Control', 'no-cache'); |
| 55 | + res.setHeader('Connection', 'keep-alive'); |
| 56 | + res.status(200).json({ message: `Actor is using Model Context Protocol. ${getHelpMessage(host)}`, data: getActorRunData() }).end(); |
| 57 | + } catch (error) { |
| 58 | + respondWithError(res, error, `Error in GET ${Routes.ROOT}`); |
| 59 | + } |
| 60 | + }); |
| 61 | + |
| 62 | + app.head(Routes.ROOT, (_req: Request, res: Response) => { |
| 63 | + res.status(200).end(); |
| 64 | + }); |
| 65 | + |
| 66 | + app.get(Routes.SSE, async (req: Request, res: Response) => { |
| 67 | + try { |
| 68 | + log.info(`Received GET message at: ${Routes.SSE}`); |
| 69 | + const tools = await processParamsGetTools(req.url); |
| 70 | + if (tools) { |
| 71 | + mcpServer.updateTools(tools); |
46 | 72 | } |
47 | | - }) |
48 | | - .head((_req: Request, res: Response) => { |
49 | | - res.status(200).end(); |
50 | | - }); |
51 | | - |
52 | | - app.route(Routes.SSE) |
53 | | - .get(async (req: Request, res: Response) => { |
54 | | - try { |
55 | | - log.info(`Received GET message at: ${Routes.SSE}`); |
56 | | - const tools = await processParamsGetTools(req.url); |
57 | | - if (tools) { |
58 | | - mcpServer.updateTools(tools); |
59 | | - } |
60 | | - transport = new SSEServerTransport(Routes.MESSAGE, res); |
61 | | - await mcpServer.connect(transport); |
62 | | - } catch (error) { |
63 | | - log.error(`Error in GET ${Routes.SSE}: ${error}`); |
64 | | - res.status(500).json({ message: 'Internal Server Error' }).end(); |
| 73 | + transportSSE = new SSEServerTransport(Routes.MESSAGE, res); |
| 74 | + await mcpServer.connect(transportSSE); |
| 75 | + } catch (error) { |
| 76 | + respondWithError(res, error, `Error in GET ${Routes.SSE}`); |
| 77 | + } |
| 78 | + }); |
| 79 | + |
| 80 | + app.post(Routes.MESSAGE, async (req: Request, res: Response) => { |
| 81 | + try { |
| 82 | + log.info(`Received POST message at: ${Routes.MESSAGE}`); |
| 83 | + if (transportSSE) { |
| 84 | + await transportSSE.handlePostMessage(req, res); |
| 85 | + } else { |
| 86 | + log.error('Server is not connected to the client.'); |
| 87 | + res.status(400).json({ |
| 88 | + jsonrpc: '2.0', |
| 89 | + error: { |
| 90 | + code: -32000, |
| 91 | + message: 'Bad Request: Server is not connected to the client. ' |
| 92 | + + 'Connect to the server with GET request to /sse endpoint', |
| 93 | + }, |
| 94 | + id: null, |
| 95 | + }); |
65 | 96 | } |
66 | | - }); |
67 | | - |
68 | | - app.route(Routes.MESSAGE) |
69 | | - .post(async (req: Request, res: Response) => { |
70 | | - try { |
71 | | - log.info(`Received POST message at: ${Routes.MESSAGE}`); |
72 | | - if (transport) { |
73 | | - await transport.handlePostMessage(req, res); |
74 | | - } else { |
75 | | - res.status(400).json({ |
76 | | - message: 'Server is not connected to the client. ' |
77 | | - + 'Connect to the server with GET request to /sse endpoint', |
78 | | - }); |
| 97 | + } catch (error) { |
| 98 | + respondWithError(res, error, `Error in POST ${Routes.MESSAGE}`); |
| 99 | + } |
| 100 | + }); |
| 101 | + |
| 102 | + app.post(Routes.MCP, async (req: Request, res: Response) => { |
| 103 | + log.info('Received MCP request:', req.body); |
| 104 | + try { |
| 105 | + // Check for existing session ID |
| 106 | + const sessionId = req.headers['mcp-session-id'] as string | undefined; |
| 107 | + let transport: StreamableHTTPServerTransport; |
| 108 | + |
| 109 | + if (sessionId && transports[sessionId]) { |
| 110 | + // Reuse existing transport |
| 111 | + transport = transports[sessionId]; |
| 112 | + } else if (!sessionId && isInitializeRequest(req.body)) { |
| 113 | + // New initialization request - use JSON response mode |
| 114 | + transport = new StreamableHTTPServerTransport({ |
| 115 | + sessionIdGenerator: () => randomUUID(), |
| 116 | + enableJsonResponse: true, // Enable JSON response mode |
| 117 | + }); |
| 118 | + |
| 119 | + // Connect the transport to the MCP server BEFORE handling the request |
| 120 | + await mcpServer.connect(transport); |
| 121 | + |
| 122 | + // After handling the request, if we get a session ID back, store the transport |
| 123 | + await transport.handleRequest(req, res, req.body); |
| 124 | + |
| 125 | + // Store the transport by session ID for future requests |
| 126 | + if (transport.sessionId) { |
| 127 | + transports[transport.sessionId] = transport; |
79 | 128 | } |
80 | | - } catch (error) { |
81 | | - log.error(`Error in POST ${Routes.MESSAGE}: ${error}`); |
82 | | - res.status(500).json({ message: 'Internal Server Error' }).end(); |
| 129 | + return; // Already handled |
| 130 | + } else { |
| 131 | + // Invalid request - no session ID or not initialization request |
| 132 | + res.status(400).json({ |
| 133 | + jsonrpc: '2.0', |
| 134 | + error: { |
| 135 | + code: -32000, |
| 136 | + message: 'Bad Request: No valid session ID provided or not initialization request', |
| 137 | + }, |
| 138 | + id: null, |
| 139 | + }); |
| 140 | + return; |
83 | 141 | } |
84 | | - }); |
| 142 | + |
| 143 | + // Handle the request with existing transport - no need to reconnect |
| 144 | + await transport.handleRequest(req, res, req.body); |
| 145 | + } catch (error) { |
| 146 | + respondWithError(res, error, 'Error handling MCP request'); |
| 147 | + } |
| 148 | + }); |
| 149 | + |
| 150 | + // Handle GET requests for SSE streams according to spec |
| 151 | + app.get(Routes.MCP, async (_req: Request, res: Response) => { |
| 152 | + // We don't support GET requests for this server |
| 153 | + // The spec requires returning 405 Method Not Allowed in this case |
| 154 | + res.status(405).set('Allow', 'POST').send('Method Not Allowed'); |
| 155 | + }); |
85 | 156 |
|
86 | 157 | // Catch-all for undefined routes |
87 | 158 | app.use((req: Request, res: Response) => { |
88 | | - res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${HELP_MESSAGE}` }).end(); |
| 159 | + res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end(); |
89 | 160 | }); |
90 | 161 |
|
91 | 162 | return app; |
92 | 163 | } |
| 164 | + |
| 165 | +// Helper function to detect initialize requests |
| 166 | +function isInitializeRequest(body: unknown): boolean { |
| 167 | + if (Array.isArray(body)) { |
| 168 | + return body.some((msg) => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize'); |
| 169 | + } |
| 170 | + return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize'; |
| 171 | +} |
0 commit comments