Skip to content

add streamableHttp server support for everything server #1496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion src/everything/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ The server sends random-leveled log messages every 15 seconds, e.g.:
}
```

## Usage with Claude Desktop
## Usage with Claude Desktop (uses [stdio Transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#stdio))

Add to your `claude_desktop_config.json`:

Expand Down Expand Up @@ -172,3 +172,19 @@ Optionally, you can add it to a file called `.vscode/mcp.json` in your workspace
}
}
```

## Run with [HTTP+SSE Transport](https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse) (deprecated as of [2025-03-26](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports))

```shell
cd src/everything
npm install
npm run start:sse
```

## Run with [Streamable HTTP Transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)

```shell
cd src/everything
npm install
npm run start:streamableHttp
```
5 changes: 3 additions & 2 deletions src/everything/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
"prepare": "npm run build",
"watch": "tsc --watch",
"start": "node dist/index.js",
"start:sse": "node dist/sse.js"
"start:sse": "node dist/sse.js",
"start:streamableHttp": "node dist/streamableHttp.js"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.9.0",
"@modelcontextprotocol/sdk": "^1.10.1",
"express": "^4.21.1",
"zod": "^3.23.8",
"zod-to-json-schema": "^3.23.5"
Expand Down
172 changes: 172 additions & 0 deletions src/everything/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js';
import express, { Request, Response } from "express";
import { createServer } from "./everything.js";
import { randomUUID } from 'node:crypto';

const app = express();

const { server, cleanup } = createServer();

const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

app.post('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP POST request');
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;

if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
} else if (!sessionId) {
// New initialization request
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore, // Enable resumability
onsessioninitialized: (sessionId) => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});

// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
console.log(`Transport closed for session ${sid}, removing from transports map`);
delete transports[sid];
}
};

// Connect the transport to the MCP server BEFORE handling the request
// so responses can flow back through the same transport
await server.connect(transport);

await transport.handleRequest(req, res);
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: req?.body?.id,
});
return;
}

// Handle the request with existing transport - no need to reconnect
// The existing transport is already connected to the server
await transport.handleRequest(req, res);
} catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: req?.body?.id,
});
return;
}
}
});

// Handle GET requests for SSE streams (using built-in support from StreamableHTTP)
app.get('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP GET request');
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: req?.body?.id,
});
return;
}

// Check for Last-Event-ID header for resumability
const lastEventId = req.headers['last-event-id'] as string | undefined;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% certain how to test this part. I think we need to beef up the logic in the Inspector to make the Reconnect button send the last-event-id header.

But this endpoint is working correctly for initiating and maintaining the SSE stream, and the only thing we're doing here is logging the fact that this proxy saw the header. The SDK deals with recognizing that header and doing a replay from it, so I think this server is just fine.

if (lastEventId) {
console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
} else {
console.log(`Establishing new SSE stream for session ${sessionId}`);
}

const transport = transports[sessionId];
await transport.handleRequest(req, res);
});

// Handle DELETE requests for session termination (according to MCP spec)
app.delete('/mcp', async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: req?.body?.id,
});
return;
}

console.log(`Received session termination request for session ${sessionId}`);

try {
const transport = transports[sessionId];
await transport.handleRequest(req, res);
} catch (error) {
console.error('Error handling session termination:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Error handling session termination',
},
id: req?.body?.id,
});
return;
}
}
});

// Start the server
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`MCP Streamable HTTP Server listening on port ${PORT}`);
});

// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
await cleanup();
await server.close();
console.log('Server shutdown complete');
process.exit(0);
});