Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 35 additions & 100 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,124 +1,61 @@
import { randomUUID } from "node:crypto";
import express, { type Request, type Response } from "express";
import express from "express";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { Server } from "http";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Logger } from "./utils/logger.js";

let httpServer: Server | null = null;
const transports = {
streamable: {} as Record<string, StreamableHTTPServerTransport>,
sse: {} as Record<string, SSEServerTransport>,
};

// Create a single transport for stateless mode
const streamableHttpTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined, // Stateless mode: no session ID
});

export async function startHttpServer(port: number, mcpServer: McpServer): Promise<void> {
const app = express();

mcpServer.connect(streamableHttpTransport);

// Parse JSON requests for the Streamable HTTP endpoint only, will break SSE endpoint
app.use("/mcp", express.json());

// Modern Streamable HTTP endpoint
// Modern Streamable HTTP endpoint (Stateless mode)
app.post("/mcp", async (req, res) => {
Logger.log("Received StreamableHTTP request");
const sessionId = req.headers["mcp-session-id"] as string | undefined;
// Logger.log("Session ID:", sessionId);
// Logger.log("Headers:", req.headers);
// Logger.log("Body:", req.body);
// Logger.log("Is Initialize Request:", isInitializeRequest(req.body));
let transport: StreamableHTTPServerTransport;

if (sessionId && transports.streamable[sessionId]) {
// Reuse existing transport
Logger.log("Reusing existing StreamableHTTP transport for sessionId", sessionId);
transport = transports.streamable[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
Logger.log("New initialization request for StreamableHTTP sessionId", sessionId);
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
// Store the transport by session ID
transports.streamable[sessionId] = transport;
},
});
transport.onclose = () => {
if (transport.sessionId) {
delete transports.streamable[transport.sessionId];
}
};
// TODO? There semes to be an issue—at least in Cursor—where after a connection is made to an HTTP Streamable endpoint, SSE connections to the same Express server fail with "Received a response for an unknown message ID"
await mcpServer.connect(transport);
} else {
// Invalid request
Logger.log("Invalid request:", req.body);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: null,
});
return;
}

let progressInterval: NodeJS.Timeout | null = null;
const progressToken = req.body.params?._meta?.progressToken;
// Logger.log("Progress token:", progressToken);
let progress = 0;
if (progressToken) {
Logger.log(
`Setting up progress notifications for token ${progressToken} on session ${sessionId}`,
);
progressInterval = setInterval(async () => {
Logger.log("Sending progress notification", progress);
await mcpServer.server.notification({
method: "notifications/progress",
params: {
progress,
progressToken,
},
});
progress++;
}, 1000);
}

Logger.log("Handling StreamableHTTP request");
await transport.handleRequest(req, res, req.body);
await streamableHttpTransport.handleRequest(req, res, req.body);

if (progressInterval) {
clearInterval(progressInterval);
}
Logger.log("StreamableHTTP request handled");
});

// Handle GET requests for SSE streams (using built-in support from StreamableHTTP)
const handleSessionRequest = async (req: Request, res: Response) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports.streamable[sessionId]) {
res.status(400).send("Invalid or missing session ID");
return;
}

console.log(`Received session termination request for session ${sessionId}`);

try {
const transport = transports.streamable[sessionId];
await transport.handleRequest(req, res);
} catch (error) {
console.error("Error handling session termination:", error);
if (!res.headersSent) {
res.status(500).send("Error processing session termination");
}
}
};

// Handle GET requests for server-to-client notifications via SSE
app.get("/mcp", handleSessionRequest);
// In stateless mode, GET and DELETE endpoints return 405 Method Not Allowed
app.get("/mcp", async (req, res) => {
Logger.log("Received GET /mcp request - not allowed in stateless mode");
res.status(405).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed in stateless mode",
},
id: null,
});
});

// Handle DELETE requests for session termination
app.delete("/mcp", handleSessionRequest);
app.delete("/mcp", async (req, res) => {
Logger.log("Received DELETE /mcp request - not allowed in stateless mode");
res.status(405).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed in stateless mode",
},
id: null,
});
});

app.get("/sse", async (req, res) => {
Logger.log("Establishing new SSE connection");
Expand Down Expand Up @@ -161,16 +98,14 @@ export async function startHttpServer(port: number, mcpServer: McpServer): Promi

// Close all active transports to properly clean up resources
await closeTransports(transports.sse);
await closeTransports(transports.streamable);
await streamableHttpTransport.close();

Logger.log("Server shutdown complete");
process.exit(0);
});
}

async function closeTransports(
transports: Record<string, SSEServerTransport | StreamableHTTPServerTransport>,
) {
async function closeTransports(transports: Record<string, SSEServerTransport>) {
for (const sessionId in transports) {
try {
await transports[sessionId]?.close();
Expand Down