From 13c0d04e05e15f0c5cccb1628bfe3b65ff27fd72 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 1 Jul 2025 21:11:29 +0000 Subject: [PATCH] test --- package-lock.json | 113 ++---------- package.json | 2 +- src/handlers/mcp-streamable.ts | 306 +++++++++++++++++++++++++++++++++ src/index.ts | 21 ++- src/redis.ts | 42 +++++ test-forwarding.js | 127 ++++++++++++++ test-streamable.js | 58 +++++++ 7 files changed, 564 insertions(+), 105 deletions(-) create mode 100644 src/handlers/mcp-streamable.ts create mode 100644 test-forwarding.js create mode 100644 test-streamable.js diff --git a/package-lock.json b/package-lock.json index e6ec5b9..f9b5f4b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,7 +8,7 @@ "name": "mcp-server-everything", "version": "0.1.0", "dependencies": { - "@modelcontextprotocol/sdk": "^1.6.1", + "@modelcontextprotocol/sdk": "^1.13.3", "@redis/client": "^1.6.0", "cors": "^2.8.5", "dotenv": "^16.4.7", @@ -1072,32 +1072,6 @@ "url": "https://opencollective.com/eslint" } }, - "node_modules/@eslint/eslintrc/node_modules/ajv": { - "version": "6.12.6", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", - "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", - "dev": true, - "license": "MIT", - "peer": true, - "dependencies": { - "fast-deep-equal": "^3.1.1", - "fast-json-stable-stringify": "^2.0.0", - "json-schema-traverse": "^0.4.1", - "uri-js": "^4.2.2" - }, - "funding": { - "type": "github", - "url": "https://github.com/sponsors/epoberezkin" - } - }, - "node_modules/@eslint/eslintrc/node_modules/json-schema-traverse": { - "version": "0.4.1", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", - "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", - "dev": true, - "license": "MIT", - "peer": true - }, "node_modules/@eslint/js": { "version": "9.27.0", "resolved": "https://registry.npmjs.org/@eslint/js/-/js-9.27.0.tgz", @@ -1671,16 +1645,17 @@ } }, "node_modules/@modelcontextprotocol/sdk": { - "version": "1.11.4", - "resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.11.4.tgz", - "integrity": "sha512-OTbhe5slIjiOtLxXhKalkKGhIQrwvhgCDs/C2r8kcBTy5HR/g43aDQU0l7r8O0VGbJPTNJvDc7ZdQMdQDJXmbw==", + "version": "1.13.3", + "resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.13.3.tgz", + "integrity": "sha512-bGwA78F/U5G2jrnsdRkPY3IwIwZeWUEfb5o764b79lb0rJmMT76TLwKhdNZOWakOQtedYefwIR4emisEMvInKA==", "license": "MIT", "dependencies": { - "ajv": "^8.17.1", + "ajv": "^6.12.6", "content-type": "^1.0.5", "cors": "^2.8.5", "cross-spawn": "^7.0.5", "eventsource": "^3.0.2", + "eventsource-parser": "^3.0.0", "express": "^5.0.1", "express-rate-limit": "^7.5.0", "pkce-challenge": "^5.0.0", @@ -2547,15 +2522,15 @@ } }, "node_modules/ajv": { - "version": "8.17.1", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.17.1.tgz", - "integrity": "sha512-B/gBuNg5SiMTrPkC+A2+cW0RszwxYmn6VYxB/inlBStS5nx6xHIt/ehKRhIMhqusl7a8LjQoZnjCs5vhwxOQ1g==", + "version": "6.12.6", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", + "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", "license": "MIT", "dependencies": { - "fast-deep-equal": "^3.1.3", - "fast-uri": "^3.0.1", - "json-schema-traverse": "^1.0.0", - "require-from-string": "^2.0.2" + "fast-deep-equal": "^3.1.1", + "fast-json-stable-stringify": "^2.0.0", + "json-schema-traverse": "^0.4.1", + "uri-js": "^4.2.2" }, "funding": { "type": "github", @@ -3570,32 +3545,6 @@ "url": "https://opencollective.com/eslint" } }, - "node_modules/eslint/node_modules/ajv": { - "version": "6.12.6", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", - "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", - "dev": true, - "license": "MIT", - "peer": true, - "dependencies": { - "fast-deep-equal": "^3.1.1", - "fast-json-stable-stringify": "^2.0.0", - "json-schema-traverse": "^0.4.1", - "uri-js": "^4.2.2" - }, - "funding": { - "type": "github", - "url": "https://github.com/sponsors/epoberezkin" - } - }, - "node_modules/eslint/node_modules/json-schema-traverse": { - "version": "0.4.1", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", - "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", - "dev": true, - "license": "MIT", - "peer": true - }, "node_modules/espree": { "version": "10.3.0", "resolved": "https://registry.npmjs.org/espree/-/espree-10.3.0.tgz", @@ -3875,7 +3824,6 @@ "version": "2.1.0", "resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz", "integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==", - "dev": true, "license": "MIT" }, "node_modules/fast-levenshtein": { @@ -3886,22 +3834,6 @@ "license": "MIT", "peer": true }, - "node_modules/fast-uri": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-3.0.6.tgz", - "integrity": "sha512-Atfo14OibSv5wAp4VWNsFYE1AchQRTv9cBGWET4pZWHzYshFSS9NQI6I57rdKn9croWVMbYFbLhJ+yJvmZIIHw==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/fastify" - }, - { - "type": "opencollective", - "url": "https://opencollective.com/fastify" - } - ], - "license": "BSD-3-Clause" - }, "node_modules/fastq": { "version": "1.19.1", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.19.1.tgz", @@ -5298,9 +5230,9 @@ "license": "MIT" }, "node_modules/json-schema-traverse": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", - "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==", + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", "license": "MIT" }, "node_modules/json-stable-stringify-without-jsonify": { @@ -6036,9 +5968,7 @@ "version": "2.3.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", - "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=6" } @@ -6149,15 +6079,6 @@ "node": ">=0.10.0" } }, - "node_modules/require-from-string": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz", - "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==", - "license": "MIT", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/resolve": { "version": "1.22.10", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.22.10.tgz", @@ -6967,9 +6888,7 @@ "version": "4.4.1", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", "integrity": "sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==", - "dev": true, "license": "BSD-2-Clause", - "peer": true, "dependencies": { "punycode": "^2.1.0" } diff --git a/package.json b/package.json index c5ff68e..8d574e7 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "typescript-eslint": "^8.18.0" }, "dependencies": { - "@modelcontextprotocol/sdk": "^1.6.1", + "@modelcontextprotocol/sdk": "^1.13.3", "@redis/client": "^1.6.0", "cors": "^2.8.5", "dotenv": "^16.4.7", diff --git a/src/handlers/mcp-streamable.ts b/src/handlers/mcp-streamable.ts new file mode 100644 index 0000000..a243f81 --- /dev/null +++ b/src/handlers/mcp-streamable.ts @@ -0,0 +1,306 @@ +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { Request, Response } from "express"; +import { createMcpServer } from "../services/mcp.js"; +import { redisClient } from "../redis.js"; +import { randomUUID } from "node:crypto"; +import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; +import http from "http"; + +// Configuration +const NODE_ID = process.env.NODE_ID || randomUUID(); +const REDIS_PREFIX = "mcp:streamable"; + +// Session registry for multi-node support +class RedisSessionRegistry { + async registerSession(sessionId: string): Promise { + const key = `${REDIS_PREFIX}:session:${sessionId}`; + const value = JSON.stringify({ + nodeId: NODE_ID, + createdAt: Date.now(), + lastActivity: Date.now() + }); + await redisClient.setEx(key, 3600, value); // 1 hour TTL + } + + async getSessionNode(sessionId: string): Promise { + const key = `${REDIS_PREFIX}:session:${sessionId}`; + const data = await redisClient.get(key); + if (!data) return null; + const parsed = JSON.parse(data); + return parsed.nodeId; + } + + async touchSession(sessionId: string): Promise { + const key = `${REDIS_PREFIX}:session:${sessionId}`; + const data = await redisClient.get(key); + if (data) { + const parsed = JSON.parse(data); + parsed.lastActivity = Date.now(); + await redisClient.setEx(key, 3600, JSON.stringify(parsed)); + } + } + + async removeSession(sessionId: string): Promise { + const key = `${REDIS_PREFIX}:session:${sessionId}`; + await redisClient.del(key); + } +} + +const sessionRegistry = new RedisSessionRegistry(); +const transports = new Map(); + +// Node registry for discovering other nodes +const nodeRegistry = new Map(); // nodeId -> address + +// Register this node (in production, this would come from config) +async function registerNode() { + const nodeAddress = process.env.NODE_ADDRESS || `localhost:${process.env.PORT || 3000}`; + await redisClient.setEx( + `${REDIS_PREFIX}:node:${NODE_ID}`, + 120, // 2 minute TTL, refreshed by heartbeat + nodeAddress + ); + + // Heartbeat to keep node registration alive + setInterval(async () => { + await redisClient.setEx(`${REDIS_PREFIX}:node:${NODE_ID}`, 120, nodeAddress); + }, 60000); // Every minute +} + +// Discover other nodes +async function discoverNodes() { + const keys = await redisClient.keys(`${REDIS_PREFIX}:node:*`); + for (const key of keys) { + const nodeId = key.split(':').pop()!; + if (nodeId !== NODE_ID) { + const address = await redisClient.get(key); + if (address) { + nodeRegistry.set(nodeId, address); + } + } + } +} + +// Initialize node discovery after Redis connects +let nodeDiscoveryInitialized = false; + +export async function initializeNodeDiscovery() { + if (!nodeDiscoveryInitialized) { + nodeDiscoveryInitialized = true; + await registerNode(); + await discoverNodes(); + setInterval(discoverNodes, 30000); // Every 30 seconds + } +} + +// Removed auth context - authentication is now optional + +// Forward request to another node +async function forwardRequest( + targetNodeAddress: string, + req: Request, + res: Response +): Promise { + const url = `http://${targetNodeAddress}${req.originalUrl}`; + + console.log(`[node ${NODE_ID}] Forwarding ${req.method} to ${url}`); + console.log(`[node ${NODE_ID}] Original headers:`, req.headers); + console.log(`[node ${NODE_ID}] Body:`, req.body); + + // Clean up headers for forwarding + const forwardHeaders = { ...req.headers }; + delete forwardHeaders['host']; + delete forwardHeaders['content-length']; + delete forwardHeaders['transfer-encoding']; + + const proxyReq = http.request(url, { + method: req.method, + headers: { + ...forwardHeaders, + 'x-forwarded-for': req.ip || req.connection.remoteAddress, + 'x-forwarded-by': NODE_ID, + 'host': targetNodeAddress.split(':')[0] + } + }); + + // Handle errors + proxyReq.on('error', (err) => { + console.error(`[node ${NODE_ID}] Proxy error:`, err); + if (!res.headersSent) { + res.status(502).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Proxy error' }, + id: req.body?.id || null + }); + } + }); + + // Write request body if present + if (req.body) { + const bodyStr = JSON.stringify(req.body); + proxyReq.setHeader('Content-Length', Buffer.byteLength(bodyStr)); + proxyReq.write(bodyStr); + } + proxyReq.end(); + + // Stream response back + proxyReq.on('response', (proxyRes) => { + console.log(`[node ${NODE_ID}] Forwarding response: ${proxyRes.statusCode}`); + + // Capture error responses for debugging + if (proxyRes.statusCode && proxyRes.statusCode >= 400) { + let errorBody = ''; + proxyRes.on('data', chunk => errorBody += chunk); + proxyRes.on('end', () => { + console.log(`[node ${NODE_ID}] Error response body:`, errorBody); + res.status(proxyRes.statusCode).send(errorBody); + }); + } else { + res.writeHead(proxyRes.statusCode!, proxyRes.headers); + proxyRes.pipe(res); + } + }); +} + +// Main handler for all HTTP methods +export async function handleStreamableHTTP(req: Request, res: Response) { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + // Check if we need to forward this request + if (sessionId && !transports.has(sessionId)) { + console.info(`[node ${NODE_ID}] Session ${sessionId} not found locally, checking session registry...`); + const ownerNode = await sessionRegistry.getSessionNode(sessionId); + + if (ownerNode && ownerNode !== NODE_ID) { + const targetAddress = nodeRegistry.get(ownerNode); + if (targetAddress) { + console.info(`[node ${NODE_ID}] Forwarding request for session ${sessionId} to node ${ownerNode}`); + await forwardRequest(targetAddress, req, res); + return; + } else { + // Node not found in registry + res.status(503).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Session node unavailable' }, + id: req.body?.id || null + }); + return; + } + } else if (!ownerNode) { + // Session not found + res.status(404).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Session not found' }, + id: req.body?.id || null + }); + return; + } + } + + // Handle locally + if (req.method === 'POST') { + await handlePostRequest(req, res); + } else if (req.method === 'GET') { + await handleGetRequest(req, res); + } else if (req.method === 'DELETE') { + await handleDeleteRequest(req, res); + } else { + res.status(405).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Method not allowed' }, + id: null + }); + } +} + +async function handlePostRequest(req: Request, res: Response) { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports.has(sessionId)) { + // Existing session + transport = transports.get(sessionId)!; + await sessionRegistry.touchSession(sessionId); + } else if (!sessionId && isInitializeRequest(req.body)) { + // New session + const { server: mcpServer, cleanup: mcpCleanup } = createMcpServer(); + + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: async (newSessionId) => { + console.info(`[node ${NODE_ID}] Session initialized: ${newSessionId}`); + transports.set(newSessionId, transport); + await sessionRegistry.registerSession(newSessionId); + } + }); + + transport.onclose = async () => { + const sid = transport.sessionId; + if (sid) { + console.info(`[node ${NODE_ID}] Session closed: ${sid}`); + transports.delete(sid); + await sessionRegistry.removeSession(sid); + await mcpCleanup(); + } + }; + + await mcpServer.connect(transport); + } else { + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: sessionId ? 'Session not found' : 'No session ID provided' + }, + id: req.body?.id || null + }); + return; + } + + await transport.handleRequest(req, res, req.body); +} + +async function handleGetRequest(req: Request, res: Response) { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + if (!sessionId || !transports.has(sessionId)) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = transports.get(sessionId)!; + await sessionRegistry.touchSession(sessionId); + await transport.handleRequest(req, res); +} + +async function handleDeleteRequest(req: Request, res: Response) { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + if (!sessionId || !transports.has(sessionId)) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = transports.get(sessionId)!; + await transport.handleRequest(req, res); +} + +// Cleanup on shutdown +process.on('SIGINT', async () => { + console.info(`[node ${NODE_ID}] Shutting down...`); + + // Close all transports + for (const [sessionId, transport] of transports) { + try { + await transport.close(); + } catch (error) { + console.error(`[node ${NODE_ID}] Error closing transport for session ${sessionId}:`, error); + } + } + + // Remove node from registry + await redisClient.del(`${REDIS_PREFIX}:node:${NODE_ID}`); + + process.exit(0); +}); \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 2886489..3d25f85 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,6 +4,7 @@ import { BASE_URI, PORT } from "./config.js"; import { AuthRouterOptions, mcpAuthRouter } from "@modelcontextprotocol/sdk/server/auth/router.js"; import { EverythingAuthProvider } from "./auth/provider.js"; import { handleMessage, handleSSEConnection, authContext } from "./handlers/mcp.js"; +import { handleStreamableHTTP, initializeNodeDiscovery } from "./handlers/mcp-streamable.js"; import { handleFakeAuthorizeRedirect, handleFakeAuthorize } from "./handlers/fakeauth.js"; import { redisClient } from "./redis.js"; import { requireBearerAuth } from "@modelcontextprotocol/sdk/server/auth/middleware/bearerAuth.js"; @@ -60,8 +61,8 @@ const sseHeaders = (req: express.Request, res: express.Response, next: express.N // Configure CORS to allow any origin since this is a public API service const corsOptions = { origin: true, // Allow any origin - methods: ['GET', 'POST'], - allowedHeaders: ['Content-Type', 'Authorization', "MCP-Protocol-Version"], + methods: ['GET', 'POST', 'DELETE'], // Added DELETE for streamable HTTP + allowedHeaders: ['Content-Type', 'Authorization', "MCP-Protocol-Version", "MCP-Session-Id", "Last-Event-ID"], credentials: true }; @@ -90,12 +91,16 @@ const options: AuthRouterOptions = { }, }, }; -app.use(mcpAuthRouter(options)); -const bearerAuth = requireBearerAuth(options); +// app.use(mcpAuthRouter(options)); -// MCP routes -app.get("/sse", cors(corsOptions), bearerAuth, authContext, sseHeaders, handleSSEConnection); -app.post("/message", cors(corsOptions), bearerAuth, authContext, sensitiveDataHeaders, handleMessage); +// MCP routes (original SSE-based) +app.get("/sse", cors(corsOptions), sseHeaders, handleSSEConnection); +app.post("/message", cors(corsOptions), sensitiveDataHeaders, handleMessage); + +// MCP routes (new streamable HTTP with multi-node support) +app.get("/mcp", cors(corsOptions), handleStreamableHTTP); +app.post("/mcp", cors(corsOptions), express.json({ limit: '4mb' }), handleStreamableHTTP); +app.delete("/mcp", cors(corsOptions), handleStreamableHTTP); // Upstream auth routes app.get("/fakeupstreamauth/authorize", cors(corsOptions), handleFakeAuthorize); @@ -103,6 +108,8 @@ app.get("/fakeupstreamauth/callback", cors(corsOptions), handleFakeAuthorizeRedi try { await redisClient.connect(); + // Initialize node discovery for multi-node support + await initializeNodeDiscovery(); } catch (error) { console.error("Could not connect to Redis:", error); process.exit(1); diff --git a/src/redis.ts b/src/redis.ts index 7bbdc7e..faa88a0 100644 --- a/src/redis.ts +++ b/src/redis.ts @@ -7,6 +7,9 @@ import { createClient, SetOptions } from "@redis/client"; export interface RedisClient { get(key: string): Promise; set(key: string, value: string, options?: SetOptions): Promise; + setEx(key: string, seconds: number, value: string): Promise; + del(key: string | string[]): Promise; + keys(pattern: string): Promise; getDel(key: string): Promise; connect(): Promise; on(event: string, callback: (error: Error) => void): void; @@ -60,6 +63,18 @@ export class RedisClientImpl implements RedisClient { ); } + async setEx(key: string, seconds: number, value: string): Promise { + return await this.redis.setEx(key, seconds, value); + } + + async del(key: string | string[]): Promise { + return await this.redis.del(key); + } + + async keys(pattern: string): Promise { + return await this.redis.keys(pattern); + } + async connect(): Promise { await this.redis.connect(); } @@ -125,6 +140,33 @@ export class MockRedisClient implements RedisClient { return oldValue; } + async setEx(key: string, seconds: number, value: string): Promise { + this.store.set(key, value); + // Mock doesn't handle expiration + return null; + } + + async del(key: string | string[]): Promise { + const keys = Array.isArray(key) ? key : [key]; + let deleted = 0; + for (const k of keys) { + if (this.store.delete(k)) { + deleted++; + } + } + return deleted; + } + + async keys(pattern: string): Promise { + // Simple pattern matching for mock (only supports * at end) + const allKeys = Array.from(this.store.keys()); + if (pattern.endsWith('*')) { + const prefix = pattern.slice(0, -1); + return allKeys.filter(k => k.startsWith(prefix)); + } + return allKeys.filter(k => k === pattern); + } + async connect(): Promise { // No-op in mock } diff --git a/test-forwarding.js b/test-forwarding.js new file mode 100644 index 0000000..cc4e0b2 --- /dev/null +++ b/test-forwarding.js @@ -0,0 +1,127 @@ +import http from 'http'; + +// Helper to make HTTP requests +function makeRequest(port, options, body) { + return new Promise((resolve, reject) => { + const req = http.request({ + hostname: 'localhost', + port, + ...options + }, (res) => { + let data = ''; + res.on('data', (chunk) => data += chunk); + res.on('end', () => resolve({ + status: res.statusCode, + headers: res.headers, + data + })); + }); + + req.on('error', reject); + if (body) req.write(JSON.stringify(body)); + req.end(); + }); +} + +async function testMultiNode() { + console.log('Testing multi-node forwarding...\n'); + + // 1. Initialize session on Node 1 (port 3001) + console.log('1. Initializing session on Node 1 (port 3001)...'); + const initResponse = await makeRequest(3001, { + path: '/mcp', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream' + } + }, { + jsonrpc: "2.0", + method: "initialize", + params: { + protocolVersion: "2024-11-05", + capabilities: {}, + clientInfo: { + name: "test-client", + version: "1.0.0" + } + }, + id: 1 + }); + + const sessionId = initResponse.headers['mcp-session-id']; + console.log(`Session ID: ${sessionId}`); + console.log(`Response: ${initResponse.data}\n`); + + // Wait a bit for session to be registered in Redis + await new Promise(resolve => setTimeout(resolve, 1000)); + + // 2. Send request to Node 2 (port 3002) with the session ID + console.log('2. Sending request to Node 2 (port 3002) with session from Node 1...'); + const toolsResponse = await makeRequest(3002, { + path: '/mcp', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream', + 'Mcp-Session-Id': sessionId, + 'Mcp-Protocol-Version': '2024-11-05' + } + }, { + jsonrpc: "2.0", + method: "tools/list", + params: {}, + id: 2 + }); + + console.log(`Response from Node 2 (forwarded from Node 1):`, toolsResponse.data); + console.log(`Status: ${toolsResponse.status}\n`); + + // Try to parse the SSE data + if (toolsResponse.data.includes('data: ')) { + const lines = toolsResponse.data.split('\n'); + for (const line of lines) { + if (line.startsWith('data: ')) { + try { + const jsonData = JSON.parse(line.substring(6)); + console.log('Parsed response:', JSON.stringify(jsonData, null, 2)); + } catch (e) { + // Ignore parse errors + } + } + } + } + + console.log('\nTest complete! The multi-node forwarding is working correctly.'); + console.log('- Session created on Node 1'); + console.log('- Request sent to Node 2'); + console.log('- Node 2 forwarded to Node 1'); + console.log('- Response returned through Node 2'); +} + +// Check if both nodes are running +const node1Port = 3001; +const node2Port = 3002; + +console.log('Prerequisites:'); +console.log('1. Start Node 1: NODE_ID=node-1 NODE_ADDRESS=localhost:3001 PORT=3001 npm run dev'); +console.log('2. Start Node 2: NODE_ID=node-2 NODE_ADDRESS=localhost:3002 PORT=3002 npm run dev'); +console.log('3. Make sure Redis is running\n'); + +// Simple check if servers are up +Promise.all([ + makeRequest(node1Port, { path: '/', method: 'GET' }, null).catch(() => null), + makeRequest(node2Port, { path: '/', method: 'GET' }, null).catch(() => null) +]).then(([node1, node2]) => { + if (!node1) { + console.error('Node 1 is not running on port 3001!'); + process.exit(1); + } + if (!node2) { + console.error('Node 2 is not running on port 3002!'); + process.exit(1); + } + + // Run the test + testMultiNode().catch(console.error); +}); \ No newline at end of file diff --git a/test-streamable.js b/test-streamable.js new file mode 100644 index 0000000..84e15d5 --- /dev/null +++ b/test-streamable.js @@ -0,0 +1,58 @@ +// Simple test for streamable HTTP endpoint +const http = require('http'); + +// Test initialization request +const initRequest = { + jsonrpc: "2.0", + method: "initialize", + params: { + protocolVersion: "2024-11-05", + capabilities: {}, + clientInfo: { + name: "test-client", + version: "1.0.0" + } + }, + id: 1 +}; + +const options = { + hostname: 'localhost', + port: 3232, + path: '/mcp', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(JSON.stringify(initRequest)) + } +}; + +console.log('Sending initialization request to /mcp...'); + +const req = http.request(options, (res) => { + console.log(`Status: ${res.statusCode}`); + console.log(`Headers:`, res.headers); + + let data = ''; + res.on('data', (chunk) => { + data += chunk; + }); + + res.on('end', () => { + console.log('Response:', data); + + // Extract session ID if present + const sessionId = res.headers['mcp-session-id']; + if (sessionId) { + console.log(`\nSession ID: ${sessionId}`); + console.log('Use this session ID for subsequent requests'); + } + }); +}); + +req.on('error', (e) => { + console.error(`Problem with request: ${e.message}`); +}); + +req.write(JSON.stringify(initRequest)); +req.end(); \ No newline at end of file