Skip to content

Commit 6e4dcd6

Browse files
committed
WIP: Attempting to proxy streamable-http connections. Inspector still works fine with STDIO and SSE servers.
* In index.ts, - refactor transport webAppTransports to be a map with the session id as key and transport as value. * Implement /mcp GET and POST endpoints using StreamableHTTPServerTransport and doing the new session in the POST (opposite from SSE) handler. * In package.json - update the SDK to 1.10.2 * In useConnection.ts - import StreamableHTTPClientTransport - NOTE: while we NEED to do this, it causes useConnection.test.ts to fail with " ReferenceError: TransformStream is not defined" - in connect method - instantiate the appropriate transport
1 parent 3a2e248 commit 6e4dcd6

File tree

4 files changed

+106
-73
lines changed

4 files changed

+106
-73
lines changed

client/src/lib/hooks/useConnection.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
SSEClientTransport,
44
SseError,
55
} from "@modelcontextprotocol/sdk/client/sse.js";
6+
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
67
import {
78
ClientNotification,
89
ClientRequest,
@@ -286,6 +287,7 @@ export function useConnection({
286287
mcpProxyServerUrl.searchParams.append("args", args);
287288
mcpProxyServerUrl.searchParams.append("env", JSON.stringify(env));
288289
break;
290+
289291
case "sse":
290292
mcpProxyServerUrl = new URL(`${getMCPProxyAddress(config)}/sse`);
291293
mcpProxyServerUrl.searchParams.append("url", sseUrl);
@@ -317,14 +319,24 @@ export function useConnection({
317319
headers[authHeaderName] = `Bearer ${token}`;
318320
}
319321

320-
const clientTransport = new SSEClientTransport(mcpProxyServerUrl as URL, {
322+
// Create appropriate transport
323+
const transportOptions = {
321324
eventSourceInit: {
322-
fetch: (url, init) => fetch(url, { ...init, headers }),
325+
fetch: (
326+
url: string | URL | globalThis.Request,
327+
init: RequestInit | undefined,
328+
) => fetch(url, { ...init, headers }),
323329
},
324330
requestInit: {
325331
headers,
326332
},
327-
});
333+
};
334+
const clientTransport =
335+
transportType === "streamable-http"
336+
? new StreamableHTTPClientTransport(mcpProxyServerUrl as URL, {
337+
sessionId: undefined,
338+
})
339+
: new SSEClientTransport(mcpProxyServerUrl as URL, transportOptions);
328340

329341
if (onNotification) {
330342
[

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"@modelcontextprotocol/inspector-cli": "^0.10.2",
4242
"@modelcontextprotocol/inspector-client": "^0.10.2",
4343
"@modelcontextprotocol/inspector-server": "^0.10.2",
44-
"@modelcontextprotocol/sdk": "^1.10.0",
44+
"@modelcontextprotocol/sdk": "^1.10.2",
4545
"concurrently": "^9.0.1",
4646
"shell-quote": "^1.8.2",
4747
"spawn-rx": "^5.1.2",

server/src/index.ts

Lines changed: 83 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ import {
1212
StdioClientTransport,
1313
getDefaultEnvironment,
1414
} from "@modelcontextprotocol/sdk/client/stdio.js";
15-
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
16-
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
1715
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
16+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
17+
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
18+
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
1819
import express from "express";
1920
import { findActualExecutable } from "spawn-rx";
2021
import mcpProxy from "./mcpProxy.js";
22+
import { randomUUID } from "node:crypto";
2123

2224
const SSE_HEADERS_PASSTHROUGH = ["authorization"];
23-
const STREAMABLE_HTTP_HEADERS_PASSTHROUGH = ["authorization"];
25+
const STREAMABLE_HTTP_HEADERS_PASSTHROUGH = ["authorization", "mcp-session-id"];
2426

2527
const defaultEnvironment = {
2628
...getDefaultEnvironment(),
@@ -38,7 +40,7 @@ const { values } = parseArgs({
3840
const app = express();
3941
app.use(cors());
4042

41-
let webAppTransports: SSEServerTransport[] = [];
43+
const webAppTransports: Map<string, Transport> = new Map<string, Transport>(); // Transports by sessionId
4244

4345
const createTransport = async (req: express.Request): Promise<Transport> => {
4446
const query = req.query;
@@ -130,71 +132,89 @@ const createTransport = async (req: express.Request): Promise<Transport> => {
130132
let backingServerTransport: Transport | undefined;
131133

132134
app.get("/mcp", async (req, res) => {
135+
const sessionId = req.headers["mcp-session-id"] as string;
136+
console.log(`Received GET message for sessionId ${sessionId}`);
133137
try {
134-
console.log("New streamable-http connection");
138+
const transport = webAppTransports.get(
139+
sessionId,
140+
) as StreamableHTTPServerTransport;
141+
if (!transport) {
142+
res.status(404).end("Session not found");
143+
return;
144+
} else {
145+
await transport.handleRequest(req, res);
146+
}
147+
} catch (error) {
148+
console.error("Error in /mcp route:", error);
149+
res.status(500).json(error);
150+
}
151+
});
135152

153+
app.post("/mcp", async (req, res) => {
154+
const sessionId = req.headers["mcp-session-id"] as string | undefined;
155+
console.log(`Received POST message for sessionId ${sessionId}`);
156+
if (!sessionId) {
136157
try {
137-
await backingServerTransport?.close();
138-
backingServerTransport = await createTransport(req);
139-
} catch (error) {
140-
if (error instanceof SseError && error.code === 401) {
141-
console.error(
142-
"Received 401 Unauthorized from MCP server:",
143-
error.message,
144-
);
145-
res.status(401).json(error);
146-
return;
158+
console.log("New streamable-http connection");
159+
try {
160+
await backingServerTransport?.close();
161+
backingServerTransport = await createTransport(req);
162+
} catch (error) {
163+
if (error instanceof SseError && error.code === 401) {
164+
console.error(
165+
"Received 401 Unauthorized from MCP server:",
166+
error.message,
167+
);
168+
res.status(401).json(error);
169+
return;
170+
}
171+
172+
throw error;
147173
}
148174

149-
throw error;
150-
}
151-
152-
console.log("Connected MCP client to backing server transport");
153-
154-
const webAppTransport = new SSEServerTransport("/mcp", res);
155-
webAppTransports.push(webAppTransport);
156-
console.log("Created web app transport");
157-
158-
await webAppTransport.start();
175+
console.log("Connected MCP client to backing server transport");
159176

160-
if (backingServerTransport instanceof StdioClientTransport) {
161-
backingServerTransport.stderr!.on("data", (chunk) => {
162-
webAppTransport.send({
163-
jsonrpc: "2.0",
164-
method: "notifications/stderr",
165-
params: {
166-
content: chunk.toString(),
167-
},
168-
});
177+
const webAppTransport = new StreamableHTTPServerTransport({
178+
sessionIdGenerator: randomUUID,
179+
onsessioninitialized: (sessionId) => {
180+
webAppTransports.set(sessionId, webAppTransport);
181+
console.log("Created streamable web app transport " + sessionId);
182+
},
169183
});
170-
}
171-
172-
mcpProxy({
173-
transportToClient: webAppTransport,
174-
transportToServer: backingServerTransport,
175-
});
176184

177-
console.log("Set up MCP proxy");
178-
} catch (error) {
179-
console.error("Error in /sse route:", error);
180-
res.status(500).json(error);
181-
}
182-
});
185+
await webAppTransport.start();
183186

184-
app.post("/mcp", async (req, res) => {
185-
try {
186-
const sessionId = req.query.sessionId;
187-
console.log(`Received message for sessionId ${sessionId}`);
187+
mcpProxy({
188+
transportToClient: webAppTransport,
189+
transportToServer: backingServerTransport,
190+
});
188191

189-
const transport = webAppTransports.find((t) => t.sessionId === sessionId);
190-
if (!transport) {
191-
res.status(404).end("Session not found");
192-
return;
192+
await (webAppTransport as StreamableHTTPServerTransport).handleRequest(
193+
req,
194+
res,
195+
req.body,
196+
);
197+
} catch (error) {
198+
console.error("Error in /mcp POST route:", error);
199+
res.status(500).json(error);
200+
}
201+
} else {
202+
try {
203+
const transport = webAppTransports.get(
204+
sessionId,
205+
) as StreamableHTTPServerTransport;
206+
if (!transport) {
207+
res.status(404).end("Transport not found for sessionId " + sessionId);
208+
} else {
209+
await (transport as StreamableHTTPServerTransport).handleRequest(
210+
req,
211+
res,
212+
);
213+
}
214+
} catch (error) {
215+
console.error("Error in /mcp route:", error);
216+
res.status(500).json(error);
193217
}
194-
await transport.handlePostMessage(req, res);
195-
} catch (error) {
196-
console.error("Error in /mcp route:", error);
197-
res.status(500).json(error);
198218
}
199219
});
200220

@@ -221,7 +241,7 @@ app.get("/stdio", async (req, res) => {
221241
console.log("Connected MCP client to backing server transport");
222242

223243
const webAppTransport = new SSEServerTransport("/message", res);
224-
webAppTransports.push(webAppTransport);
244+
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
225245

226246
console.log("Created web app transport");
227247

@@ -276,8 +296,7 @@ app.get("/sse", async (req, res) => {
276296
console.log("Connected MCP client to backing server transport");
277297

278298
const webAppTransport = new SSEServerTransport("/message", res);
279-
webAppTransports.push(webAppTransport);
280-
299+
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
281300
console.log("Created web app transport");
282301

283302
await webAppTransport.start();
@@ -299,7 +318,9 @@ app.post("/message", async (req, res) => {
299318
const sessionId = req.query.sessionId;
300319
console.log(`Received message for sessionId ${sessionId}`);
301320

302-
const transport = webAppTransports.find((t) => t.sessionId === sessionId);
321+
const transport = webAppTransports.get(
322+
sessionId as string,
323+
) as SSEServerTransport;
303324
if (!transport) {
304325
res.status(404).end("Session not found");
305326
return;

0 commit comments

Comments
 (0)