-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathstreamableHttp.ts
More file actions
208 lines (187 loc) · 8.03 KB
/
streamableHttp.ts
File metadata and controls
208 lines (187 loc) · 8.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/**
* Node.js HTTP Streamable HTTP Server Transport
*
* This is a thin wrapper around `WebStandardStreamableHTTPServerTransport` that provides
* compatibility with Node.js HTTP server (IncomingMessage/ServerResponse).
*
* For web-standard environments (Cloudflare Workers, Deno, Bun), use `WebStandardStreamableHTTPServerTransport` directly.
*/
import { IncomingMessage, ServerResponse } from 'node:http';
import { getRequestListener } from '@hono/node-server';
import { Transport } from '../shared/transport.js';
import { AuthInfo } from './auth/types.js';
import { MessageExtraInfo, JSONRPCMessage, RequestId } from '../types.js';
import {
WebStandardStreamableHTTPServerTransport,
WebStandardStreamableHTTPServerTransportOptions,
EventStore,
StreamId,
EventId
} from './webStandardStreamableHttp.js';
// Re-export types from the core transport for backward compatibility
export type { EventStore, StreamId, EventId };
/**
* Configuration options for StreamableHTTPServerTransport
*
* This is an alias for WebStandardStreamableHTTPServerTransportOptions for backward compatibility.
*/
export type StreamableHTTPServerTransportOptions = WebStandardStreamableHTTPServerTransportOptions;
/**
* Server transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification.
* It supports both SSE streaming and direct HTTP responses.
*
* This is a wrapper around `WebStandardStreamableHTTPServerTransport` that provides Node.js HTTP compatibility.
* It uses the `@hono/node-server` library to convert between Node.js HTTP and Web Standard APIs.
*
* Usage example:
*
* ```typescript
* // Stateful mode - server sets the session ID
* const statefulTransport = new StreamableHTTPServerTransport({
* sessionIdGenerator: () => randomUUID(),
* });
*
* // Stateless mode - explicitly set session ID to undefined
* const statelessTransport = new StreamableHTTPServerTransport({
* sessionIdGenerator: undefined,
* });
*
* // Using with pre-parsed request body
* app.post('/mcp', (req, res) => {
* transport.handleRequest(req, res, req.body);
* });
* ```
*
* In stateful mode:
* - Session ID is generated and included in response headers
* - Session ID is always included in initialization responses
* - Requests with invalid session IDs are rejected with 404 Not Found
* - Non-initialization requests without a session ID are rejected with 400 Bad Request
* - State is maintained in-memory (connections, message history)
*
* In stateless mode:
* - No Session ID is included in any responses
* - No session validation is performed
* - Each transport instance handles one request; create a fresh transport for each request
*/
export class StreamableHTTPServerTransport implements Transport {
private _webStandardTransport: WebStandardStreamableHTTPServerTransport;
private _requestListener: ReturnType<typeof getRequestListener>;
// Store auth and parsedBody per request for passing through to handleRequest
private _requestContext: WeakMap<Request, { authInfo?: AuthInfo; parsedBody?: unknown }> = new WeakMap();
constructor(options: StreamableHTTPServerTransportOptions = {}) {
this._webStandardTransport = new WebStandardStreamableHTTPServerTransport(options);
// Create a request listener that wraps the web standard transport
// getRequestListener converts Node.js HTTP to Web Standard and properly handles SSE streaming
// overrideGlobalObjects: false prevents Hono from overwriting global Response, which would
// break frameworks like Next.js whose response classes extend the native Response
this._requestListener = getRequestListener(
async (webRequest: Request) => {
// Get context if available (set during handleRequest)
const context = this._requestContext.get(webRequest);
return this._webStandardTransport.handleRequest(webRequest, {
authInfo: context?.authInfo,
parsedBody: context?.parsedBody
});
},
{ overrideGlobalObjects: false }
);
}
/**
* Gets the session ID for this transport instance.
*/
get sessionId(): string | undefined {
return this._webStandardTransport.sessionId;
}
/**
* Sets callback for when the transport is closed.
*/
set onclose(handler: (() => void) | undefined) {
this._webStandardTransport.onclose = handler;
}
get onclose(): (() => void) | undefined {
return this._webStandardTransport.onclose;
}
/**
* Sets callback for transport errors.
*/
set onerror(handler: ((error: Error) => void) | undefined) {
this._webStandardTransport.onerror = handler;
}
get onerror(): ((error: Error) => void) | undefined {
return this._webStandardTransport.onerror;
}
/**
* Sets callback for incoming messages.
*/
set onmessage(handler: ((message: JSONRPCMessage, extra?: MessageExtraInfo) => void) | undefined) {
this._webStandardTransport.onmessage = handler;
}
get onmessage(): ((message: JSONRPCMessage, extra?: MessageExtraInfo) => void) | undefined {
return this._webStandardTransport.onmessage;
}
/**
* Starts the transport. This is required by the Transport interface but is a no-op
* for the Streamable HTTP transport as connections are managed per-request.
*/
async start(): Promise<void> {
return this._webStandardTransport.start();
}
/**
* Closes the transport and all active connections.
*/
async close(): Promise<void> {
return this._webStandardTransport.close();
}
/**
* Sends a JSON-RPC message through the transport.
*/
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
return this._webStandardTransport.send(message, options);
}
/**
* Handles an incoming HTTP request, whether GET or POST.
*
* This method converts Node.js HTTP objects to Web Standard Request/Response
* and delegates to the underlying WebStandardStreamableHTTPServerTransport.
*
* @param req - Node.js IncomingMessage, optionally with auth property from middleware
* @param res - Node.js ServerResponse
* @param parsedBody - Optional pre-parsed body from body-parser middleware
*/
async handleRequest(req: IncomingMessage & { auth?: AuthInfo }, res: ServerResponse, parsedBody?: unknown): Promise<void> {
// Store context for this request to pass through auth and parsedBody
// We need to intercept the request creation to attach this context
const authInfo = req.auth;
// Create a custom handler that includes our context
// overrideGlobalObjects: false prevents Hono from overwriting global Response, which would
// break frameworks like Next.js whose response classes extend the native Response
const handler = getRequestListener(
async (webRequest: Request) => {
return this._webStandardTransport.handleRequest(webRequest, {
authInfo,
parsedBody
});
},
{ overrideGlobalObjects: false }
);
// Delegate to the request listener which handles all the Node.js <-> Web Standard conversion
// including proper SSE streaming support
await handler(req, res);
}
/**
* Close an SSE stream for a specific request, triggering client reconnection.
* Use this to implement polling behavior during long-running operations -
* client will reconnect after the retry interval specified in the priming event.
*/
closeSSEStream(requestId: RequestId): void {
this._webStandardTransport.closeSSEStream(requestId);
}
/**
* Close the standalone GET SSE stream, triggering client reconnection.
* Use this to implement polling behavior for server-initiated notifications.
*/
closeStandaloneSSEStream(): void {
this._webStandardTransport.closeStandaloneSSEStream();
}
}