|
2 | 2 | * Express server implementation used for standby Actor mode. |
3 | 3 | */ |
4 | 4 |
|
| 5 | +import { randomUUID } from 'node:crypto'; |
| 6 | + |
5 | 7 | import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; |
| 8 | +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; |
6 | 9 | import type { Request, Response } from 'express'; |
7 | 10 | import express from 'express'; |
8 | 11 |
|
9 | 12 | import log from '@apify/log'; |
10 | 13 |
|
11 | | -import { HEADER_READINESS_PROBE, Routes } from './const.js'; |
12 | 14 | import { type ActorsMcpServer } from '../mcp/server.js'; |
13 | | -import { getActorRunData } from './utils.js'; |
14 | 15 | import { processParamsGetTools } from '../mcp/utils.js'; |
| 16 | +import { getHelpMessage, HEADER_READINESS_PROBE, Routes } from './const.js'; |
| 17 | +import { getActorRunData } from './utils.js'; |
15 | 18 |
|
16 | 19 | export function createExpressApp( |
17 | 20 | host: string, |
18 | 21 | mcpServer: ActorsMcpServer, |
19 | 22 | ): express.Express { |
20 | | - const HELP_MESSAGE = `Connect to the server with GET request to ${host}/sse?token=YOUR-APIFY-TOKEN` |
21 | | - + ` and then send POST requests to ${host}/message?token=YOUR-APIFY-TOKEN`; |
22 | | - |
23 | 23 | const app = express(); |
| 24 | + app.use(express.json()); |
| 25 | + let transportSSE: SSEServerTransport; |
| 26 | + const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; |
24 | 27 |
|
25 | | - let transport: SSEServerTransport; |
| 28 | + function respondWithError(res: Response, error: unknown, logMessage: string, statusCode = 500) { |
| 29 | + log.error(`${logMessage}: ${error}`); |
| 30 | + if (!res.headersSent) { |
| 31 | + res.status(statusCode).json({ |
| 32 | + jsonrpc: '2.0', |
| 33 | + error: { |
| 34 | + code: statusCode === 500 ? -32603 : -32000, |
| 35 | + message: statusCode === 500 ? 'Internal server error' : 'Bad Request', |
| 36 | + }, |
| 37 | + id: null, |
| 38 | + }); |
| 39 | + } |
| 40 | + } |
26 | 41 |
|
27 | | - app.route(Routes.ROOT) |
28 | | - .get(async (req: Request, res: Response) => { |
29 | | - if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) { |
30 | | - log.debug('Received readiness probe'); |
31 | | - res.status(200).json({ message: 'Server is ready' }).end(); |
32 | | - return; |
| 42 | + app.get(Routes.ROOT, async (req: Request, res: Response) => { |
| 43 | + if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) { |
| 44 | + log.debug('Received readiness probe'); |
| 45 | + res.status(200).json({ message: 'Server is ready' }).end(); |
| 46 | + return; |
| 47 | + } |
| 48 | + try { |
| 49 | + log.info(`Received GET message at: ${Routes.ROOT}`); |
| 50 | + const tools = await processParamsGetTools(req.url, process.env.APIFY_TOKEN as string); |
| 51 | + if (tools) { |
| 52 | + mcpServer.updateTools(tools); |
33 | 53 | } |
34 | | - try { |
35 | | - log.info(`Received GET message at: ${Routes.ROOT}`); |
36 | | - const tools = await processParamsGetTools(req.url, process.env.APIFY_TOKEN as string); |
37 | | - if (tools) { |
38 | | - mcpServer.updateTools(tools); |
39 | | - } |
40 | | - res.setHeader('Content-Type', 'text/event-stream'); |
41 | | - res.setHeader('Cache-Control', 'no-cache'); |
42 | | - res.setHeader('Connection', 'keep-alive'); |
43 | | - res.status(200).json({ message: `Actor is using Model Context Protocol. ${HELP_MESSAGE}`, data: getActorRunData() }).end(); |
44 | | - } catch (error) { |
45 | | - log.error(`Error in GET ${Routes.ROOT} ${error}`); |
46 | | - res.status(500).json({ message: 'Internal Server Error' }).end(); |
| 54 | + res.setHeader('Content-Type', 'text/event-stream'); |
| 55 | + res.setHeader('Cache-Control', 'no-cache'); |
| 56 | + res.setHeader('Connection', 'keep-alive'); |
| 57 | + res.status(200).json({ message: `Actor is using Model Context Protocol. ${getHelpMessage(host)}`, data: getActorRunData() }).end(); |
| 58 | + } catch (error) { |
| 59 | + respondWithError(res, error, `Error in GET ${Routes.ROOT}`); |
| 60 | + } |
| 61 | + }); |
| 62 | + |
| 63 | + app.head(Routes.ROOT, (_req: Request, res: Response) => { |
| 64 | + res.status(200).end(); |
| 65 | + }); |
| 66 | + |
| 67 | + app.get(Routes.SSE, async (req: Request, res: Response) => { |
| 68 | + try { |
| 69 | + log.info(`Received GET message at: ${Routes.SSE}`); |
| 70 | + const tools = await processParamsGetTools(req.url, process.env.APIFY_TOKEN as string); |
| 71 | + if (tools) { |
| 72 | + mcpServer.updateTools(tools); |
47 | 73 | } |
48 | | - }) |
49 | | - .head((_req: Request, res: Response) => { |
50 | | - res.status(200).end(); |
51 | | - }); |
52 | | - |
53 | | - app.route(Routes.SSE) |
54 | | - .get(async (req: Request, res: Response) => { |
55 | | - try { |
56 | | - log.info(`Received GET message at: ${Routes.SSE}`); |
57 | | - const tools = await processParamsGetTools(req.url, process.env.APIFY_TOKEN as string); |
58 | | - if (tools) { |
59 | | - mcpServer.updateTools(tools); |
60 | | - } |
61 | | - transport = new SSEServerTransport(Routes.MESSAGE, res); |
62 | | - await mcpServer.connect(transport); |
63 | | - } catch (error) { |
64 | | - log.error(`Error in GET ${Routes.SSE}: ${error}`); |
65 | | - res.status(500).json({ message: 'Internal Server Error' }).end(); |
| 74 | + transportSSE = new SSEServerTransport(Routes.MESSAGE, res); |
| 75 | + await mcpServer.connect(transportSSE); |
| 76 | + } catch (error) { |
| 77 | + respondWithError(res, error, `Error in GET ${Routes.SSE}`); |
| 78 | + } |
| 79 | + }); |
| 80 | + |
| 81 | + app.post(Routes.MESSAGE, async (req: Request, res: Response) => { |
| 82 | + try { |
| 83 | + log.info(`Received POST message at: ${Routes.MESSAGE}`); |
| 84 | + if (transportSSE) { |
| 85 | + await transportSSE.handlePostMessage(req, res); |
| 86 | + } else { |
| 87 | + log.error('Server is not connected to the client.'); |
| 88 | + res.status(400).json({ |
| 89 | + jsonrpc: '2.0', |
| 90 | + error: { |
| 91 | + code: -32000, |
| 92 | + message: 'Bad Request: Server is not connected to the client. ' |
| 93 | + + 'Connect to the server with GET request to /sse endpoint', |
| 94 | + }, |
| 95 | + id: null, |
| 96 | + }); |
66 | 97 | } |
67 | | - }); |
68 | | - |
69 | | - app.route(Routes.MESSAGE) |
70 | | - .post(async (req: Request, res: Response) => { |
71 | | - try { |
72 | | - log.info(`Received POST message at: ${Routes.MESSAGE}`); |
73 | | - if (transport) { |
74 | | - await transport.handlePostMessage(req, res); |
75 | | - } else { |
76 | | - res.status(400).json({ |
77 | | - message: 'Server is not connected to the client. ' |
78 | | - + 'Connect to the server with GET request to /sse endpoint', |
79 | | - }); |
| 98 | + } catch (error) { |
| 99 | + respondWithError(res, error, `Error in POST ${Routes.MESSAGE}`); |
| 100 | + } |
| 101 | + }); |
| 102 | + |
| 103 | + app.post(Routes.MCP, async (req: Request, res: Response) => { |
| 104 | + log.info('Received MCP request:', req.body); |
| 105 | + try { |
| 106 | + // Check for existing session ID |
| 107 | + const sessionId = req.headers['mcp-session-id'] as string | undefined; |
| 108 | + let transport: StreamableHTTPServerTransport; |
| 109 | + |
| 110 | + if (sessionId && transports[sessionId]) { |
| 111 | + // Reuse existing transport |
| 112 | + transport = transports[sessionId]; |
| 113 | + } else if (!sessionId && isInitializeRequest(req.body)) { |
| 114 | + // New initialization request - use JSON response mode |
| 115 | + transport = new StreamableHTTPServerTransport({ |
| 116 | + sessionIdGenerator: () => randomUUID(), |
| 117 | + enableJsonResponse: true, // Enable JSON response mode |
| 118 | + }); |
| 119 | + |
| 120 | + // Connect the transport to the MCP server BEFORE handling the request |
| 121 | + await mcpServer.connect(transport); |
| 122 | + |
| 123 | + // After handling the request, if we get a session ID back, store the transport |
| 124 | + await transport.handleRequest(req, res, req.body); |
| 125 | + |
| 126 | + // Store the transport by session ID for future requests |
| 127 | + if (transport.sessionId) { |
| 128 | + transports[transport.sessionId] = transport; |
80 | 129 | } |
81 | | - } catch (error) { |
82 | | - log.error(`Error in POST ${Routes.MESSAGE}: ${error}`); |
83 | | - res.status(500).json({ message: 'Internal Server Error' }).end(); |
| 130 | + return; // Already handled |
| 131 | + } else { |
| 132 | + // Invalid request - no session ID or not initialization request |
| 133 | + res.status(400).json({ |
| 134 | + jsonrpc: '2.0', |
| 135 | + error: { |
| 136 | + code: -32000, |
| 137 | + message: 'Bad Request: No valid session ID provided or not initialization request', |
| 138 | + }, |
| 139 | + id: null, |
| 140 | + }); |
| 141 | + return; |
84 | 142 | } |
85 | | - }); |
| 143 | + |
| 144 | + // Handle the request with existing transport - no need to reconnect |
| 145 | + await transport.handleRequest(req, res, req.body); |
| 146 | + } catch (error) { |
| 147 | + respondWithError(res, error, 'Error handling MCP request'); |
| 148 | + } |
| 149 | + }); |
| 150 | + |
| 151 | + // Handle GET requests for SSE streams according to spec |
| 152 | + app.get(Routes.MCP, async (_req: Request, res: Response) => { |
| 153 | + // We don't support GET requests for this server |
| 154 | + // The spec requires returning 405 Method Not Allowed in this case |
| 155 | + res.status(405).set('Allow', 'POST').send('Method Not Allowed'); |
| 156 | + }); |
86 | 157 |
|
87 | 158 | // Catch-all for undefined routes |
88 | 159 | app.use((req: Request, res: Response) => { |
89 | | - res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${HELP_MESSAGE}` }).end(); |
| 160 | + res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end(); |
90 | 161 | }); |
91 | 162 |
|
92 | 163 | return app; |
93 | 164 | } |
| 165 | + |
| 166 | +// Helper function to detect initialize requests |
| 167 | +function isInitializeRequest(body: unknown): boolean { |
| 168 | + if (Array.isArray(body)) { |
| 169 | + return body.some((msg) => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize'); |
| 170 | + } |
| 171 | + return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize'; |
| 172 | +} |
0 commit comments