diff --git a/src/server/index.test.ts b/src/server/index.test.ts index 48b7f7340..d91b90a9c 100644 --- a/src/server/index.test.ts +++ b/src/server/index.test.ts @@ -15,7 +15,7 @@ import { ListResourcesRequestSchema, ListToolsRequestSchema, SetLevelRequestSchema, - ErrorCode, + ErrorCode } from "../types.js"; import { Transport } from "../shared/transport.js"; import { InMemoryTransport } from "../inMemory.js"; diff --git a/src/server/mcp.test.ts b/src/server/mcp.test.ts index ab728c6ac..0764ffe88 100644 --- a/src/server/mcp.test.ts +++ b/src/server/mcp.test.ts @@ -14,7 +14,7 @@ import { LoggingMessageNotificationSchema, Notification, TextContent, - ElicitRequestSchema, + ElicitRequestSchema } from "../types.js"; import { ResourceTemplate } from "./mcp.js"; import { completable } from "./completable.js"; diff --git a/src/server/sse.test.ts b/src/server/sse.test.ts index 2fd2c0424..703cc5146 100644 --- a/src/server/sse.test.ts +++ b/src/server/sse.test.ts @@ -1,20 +1,146 @@ import http from 'http'; import { jest } from '@jest/globals'; import { SSEServerTransport } from './sse.js'; +import { McpServer } from './mcp.js'; +import { createServer, type Server } from "node:http"; +import { AddressInfo } from "node:net"; +import { z } from 'zod'; +import { CallToolResult, JSONRPCMessage } from 'src/types.js'; const createMockResponse = () => { const res = { - writeHead: jest.fn(), - write: jest.fn().mockReturnValue(true), - on: jest.fn(), + writeHead: jest.fn().mockReturnThis(), + write: jest.fn().mockReturnThis(), + on: jest.fn().mockReturnThis(), + end: jest.fn().mockReturnThis(), }; - res.writeHead.mockReturnThis(); - res.on.mockReturnThis(); - return res as unknown as http.ServerResponse; + return res as unknown as jest.Mocked; }; +/** + * Helper to create and start test HTTP server with MCP setup + */ +async function createTestServerWithSse(args: { + mockRes: http.ServerResponse; +}): Promise<{ + server: Server; + transport: SSEServerTransport; + mcpServer: McpServer; + baseUrl: URL; + sessionId: string + serverPort: number; +}> { + const mcpServer = new McpServer( + { name: "test-server", version: "1.0.0" }, + { capabilities: { logging: {} } } + ); + + mcpServer.tool( + "greet", + "A simple greeting tool", + { name: z.string().describe("Name to greet") }, + async ({ name }): Promise => { + return { content: [{ type: "text", text: `Hello, ${name}!` }] }; + } + ); + + const endpoint = '/messages'; + + const transport = new SSEServerTransport(endpoint, args.mockRes); + const sessionId = transport.sessionId; + + await mcpServer.connect(transport); + + const server = createServer(async (req, res) => { + try { + await transport.handlePostMessage(req, res); + } catch (error) { + console.error("Error handling request:", error); + if (!res.headersSent) res.writeHead(500).end(); + } + }); + + const baseUrl = await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const addr = server.address() as AddressInfo; + resolve(new URL(`http://127.0.0.1:${addr.port}`)); + }); + }); + + const port = (server.address() as AddressInfo).port; + + return { server, transport, mcpServer, baseUrl, sessionId, serverPort: port }; +} + +async function readAllSSEEvents(response: Response): Promise { + const reader = response.body?.getReader(); + if (!reader) throw new Error('No readable stream'); + + const events: string[] = []; + const decoder = new TextDecoder(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + if (value) { + events.push(decoder.decode(value)); + } + } + } finally { + reader.releaseLock(); + } + + return events; +} + +/** + * Helper to send JSON-RPC request + */ +async function sendSsePostRequest(baseUrl: URL, message: JSONRPCMessage | JSONRPCMessage[], sessionId?: string, extraHeaders?: Record): Promise { + const headers: Record = { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + ...extraHeaders + }; + + if (sessionId) { + baseUrl.searchParams.set('sessionId', sessionId); + } + + return fetch(baseUrl, { + method: "POST", + headers, + body: JSON.stringify(message), + }); +} + describe('SSEServerTransport', () => { + + async function initializeServer(baseUrl: URL): Promise { + const response = await sendSsePostRequest(baseUrl, { + jsonrpc: "2.0", + method: "initialize", + params: { + clientInfo: { name: "test-client", version: "1.0" }, + protocolVersion: "2025-03-26", + capabilities: { + }, + }, + + id: "init-1", + } as JSONRPCMessage); + + expect(response.status).toBe(202); + + const text = await readAllSSEEvents(response); + + expect(text).toHaveLength(1); + expect(text[0]).toBe('Accepted'); + } + describe('start method', () => { it('should correctly append sessionId to a simple relative endpoint', async () => { const mockRes = createMockResponse(); @@ -105,5 +231,71 @@ describe('SSEServerTransport', () => { `event: endpoint\ndata: /?sessionId=${expectedSessionId}\n\n` ); }); + + /** + * Test: Tool With Request Info + */ + it("should pass request info to tool callback", async () => { + const mockRes = createMockResponse(); + const { mcpServer, baseUrl, sessionId, serverPort } = await createTestServerWithSse({ mockRes }); + await initializeServer(baseUrl); + + mcpServer.tool( + "test-request-info", + "A simple test tool with request info", + { name: z.string().describe("Name to greet") }, + async ({ name }, { requestInfo }): Promise => { + return { content: [{ type: "text", text: `Hello, ${name}!` }, { type: "text", text: `${JSON.stringify(requestInfo)}` }] }; + } + ); + + const toolCallMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "tools/call", + params: { + name: "test-request-info", + arguments: { + name: "Test User", + }, + }, + id: "call-1", + }; + + const response = await sendSsePostRequest(baseUrl, toolCallMessage, sessionId); + + expect(response.status).toBe(202); + + expect(mockRes.write).toHaveBeenCalledWith(`event: endpoint\ndata: /messages?sessionId=${sessionId}\n\n`); + + const expectedMessage = { + result: { + content: [ + { + type: "text", + text: "Hello, Test User!", + }, + { + type: "text", + text: JSON.stringify({ + headers: { + host: `127.0.0.1:${serverPort}`, + connection: 'keep-alive', + 'content-type': 'application/json', + accept: 'application/json, text/event-stream', + 'accept-language': '*', + 'sec-fetch-mode': 'cors', + 'user-agent': 'node', + 'accept-encoding': 'gzip, deflate', + 'content-length': '124' + }, + }) + }, + ], + }, + jsonrpc: "2.0", + id: "call-1", + }; + expect(mockRes.write).toHaveBeenCalledWith(`event: message\ndata: ${JSON.stringify(expectedMessage)}\n\n`); + }); }); }); diff --git a/src/server/sse.ts b/src/server/sse.ts index e9a4d53ab..de4dd60a6 100644 --- a/src/server/sse.ts +++ b/src/server/sse.ts @@ -1,7 +1,7 @@ import { randomUUID } from "node:crypto"; import { IncomingMessage, ServerResponse } from "node:http"; import { Transport } from "../shared/transport.js"; -import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; +import { JSONRPCMessage, JSONRPCMessageSchema, MessageExtraInfo, RequestInfo } from "../types.js"; import getRawBody from "raw-body"; import contentType from "content-type"; import { AuthInfo } from "./auth/types.js"; @@ -19,7 +19,7 @@ export class SSEServerTransport implements Transport { private _sessionId: string; onclose?: () => void; onerror?: (error: Error) => void; - onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void; + onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; /** * Creates a new SSE server transport, which will direct the client to POST messages to the relative or absolute URL identified by `_endpoint`. @@ -86,6 +86,7 @@ export class SSEServerTransport implements Transport { throw new Error(message); } const authInfo: AuthInfo | undefined = req.auth; + const requestInfo: RequestInfo = { headers: req.headers }; let body: string | unknown; try { @@ -105,7 +106,7 @@ export class SSEServerTransport implements Transport { } try { - await this.handleMessage(typeof body === 'string' ? JSON.parse(body) : body, { authInfo }); + await this.handleMessage(typeof body === 'string' ? JSON.parse(body) : body, { requestInfo, authInfo }); } catch { res.writeHead(400).end(`Invalid message: ${body}`); return; @@ -117,7 +118,7 @@ export class SSEServerTransport implements Transport { /** * Handle a client message, regardless of how it arrived. This can be used to inform the server of messages that arrive via a means different than HTTP POST. */ - async handleMessage(message: unknown, extra?: { authInfo?: AuthInfo }): Promise { + async handleMessage(message: unknown, extra?: MessageExtraInfo): Promise { let parsedMessage: JSONRPCMessage; try { parsedMessage = JSONRPCMessageSchema.parse(message); diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index d66083fe8..ce5c7446a 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -208,6 +208,7 @@ function expectErrorResponse(data: unknown, expectedCode: number, expectedMessag describe("StreamableHTTPServerTransport", () => { let server: Server; + let mcpServer: McpServer; let transport: StreamableHTTPServerTransport; let baseUrl: URL; let sessionId: string; @@ -216,6 +217,7 @@ describe("StreamableHTTPServerTransport", () => { const result = await createTestServer(); server = result.server; transport = result.transport; + mcpServer = result.mcpServer; baseUrl = result.baseUrl; }); @@ -347,6 +349,69 @@ describe("StreamableHTTPServerTransport", () => { }); }); + /*** + * Test: Tool With Request Info + */ + it("should pass request info to tool callback", async () => { + sessionId = await initializeServer(); + + mcpServer.tool( + "test-request-info", + "A simple test tool with request info", + { name: z.string().describe("Name to greet") }, + async ({ name }, { requestInfo }): Promise => { + return { content: [{ type: "text", text: `Hello, ${name}!` }, { type: "text", text: `${JSON.stringify(requestInfo)}` }] }; + } + ); + + const toolCallMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "tools/call", + params: { + name: "test-request-info", + arguments: { + name: "Test User", + }, + }, + id: "call-1", + }; + + const response = await sendPostRequest(baseUrl, toolCallMessage, sessionId); + expect(response.status).toBe(200); + + const text = await readSSEEvent(response); + const eventLines = text.split("\n"); + const dataLine = eventLines.find(line => line.startsWith("data:")); + expect(dataLine).toBeDefined(); + + const eventData = JSON.parse(dataLine!.substring(5)); + + expect(eventData).toMatchObject({ + jsonrpc: "2.0", + result: { + content: [ + { type: "text", text: "Hello, Test User!" }, + { type: "text", text: expect.any(String) } + ], + }, + id: "call-1", + }); + + const requestInfo = JSON.parse(eventData.result.content[1].text); + expect(requestInfo).toMatchObject({ + headers: { + 'content-type': 'application/json', + accept: 'application/json, text/event-stream', + connection: 'keep-alive', + 'mcp-session-id': sessionId, + 'accept-language': '*', + 'user-agent': expect.any(String), + 'accept-encoding': expect.any(String), + 'content-length': expect.any(String), + }, + }); + }); + it("should reject requests without a valid session ID", async () => { const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList); diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 34b2ab68a..677da45ea 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -1,6 +1,6 @@ import { IncomingMessage, ServerResponse } from "node:http"; import { Transport } from "../shared/transport.js"; -import { isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION } from "../types.js"; +import { MessageExtraInfo, RequestInfo, isInitializeRequest, isJSONRPCError, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId, SUPPORTED_PROTOCOL_VERSIONS, DEFAULT_NEGOTIATED_PROTOCOL_VERSION } from "../types.js"; import getRawBody from "raw-body"; import contentType from "content-type"; import { randomUUID } from "node:crypto"; @@ -113,7 +113,7 @@ export class StreamableHTTPServerTransport implements Transport { sessionId?: string; onclose?: () => void; onerror?: (error: Error) => void; - onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void; + onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; constructor(options: StreamableHTTPServerTransportOptions) { this.sessionIdGenerator = options.sessionIdGenerator; @@ -321,6 +321,7 @@ export class StreamableHTTPServerTransport implements Transport { } const authInfo: AuthInfo | undefined = req.auth; + const requestInfo: RequestInfo = { headers: req.headers }; let rawMessage; if (parsedBody !== undefined) { @@ -404,7 +405,7 @@ export class StreamableHTTPServerTransport implements Transport { // handle each message for (const message of messages) { - this.onmessage?.(message, { authInfo }); + this.onmessage?.(message, { authInfo, requestInfo }); } } else if (hasRequests) { // The default behavior is to use SSE streaming @@ -439,7 +440,7 @@ export class StreamableHTTPServerTransport implements Transport { // handle each message for (const message of messages) { - this.onmessage?.(message, { authInfo }); + this.onmessage?.(message, { authInfo, requestInfo }); } // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses // This will be handled by the send() method when responses are ready diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index a04f26eb2..35839a4f8 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -22,6 +22,8 @@ import { Result, ServerCapabilities, RequestMeta, + MessageExtraInfo, + RequestInfo, } from "../types.js"; import { Transport, TransportSendOptions } from "./transport.js"; import { AuthInfo } from "../server/auth/types.js"; @@ -127,6 +129,11 @@ export type RequestHandlerExtra void; + onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; /** * The session ID generated for this connection. diff --git a/src/types.ts b/src/types.ts index 3606a6be7..f66d2c4b6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,5 @@ import { z, ZodTypeAny } from "zod"; +import { AuthInfo } from "./server/auth/types.js"; export const LATEST_PROTOCOL_VERSION = "2025-06-18"; export const DEFAULT_NEGOTIATED_PROTOCOL_VERSION = "2025-03-26"; @@ -1463,6 +1464,36 @@ type Flatten = T extends Primitive type Infer = Flatten>; +/** + * Headers that are compatible with both Node.js and the browser. + */ +export type IsomorphicHeaders = Record; + +/** + * Information about the incoming request. + */ +export interface RequestInfo { + /** + * The headers of the request. + */ + headers: IsomorphicHeaders; +} + +/** + * Extra information about a message. + */ +export interface MessageExtraInfo { + /** + * The request information. + */ + requestInfo?: RequestInfo; + + /** + * The authentication information. + */ + authInfo?: AuthInfo; +} + /* JSON-RPC types */ export type ProgressToken = Infer; export type Cursor = Infer;