1
1
import express from "express" ;
2
2
import http from "http" ;
3
3
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js" ;
4
-
4
+ import { Runner } from "./base.js" ;
5
5
import { config } from "../common/config.js" ;
6
6
import logger , { LogId } from "../common/logger.js" ;
7
7
8
8
const JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED = - 32000 ;
9
+ const JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED = - 32601 ;
9
10
10
- export async function createHttpTransport ( ) : Promise < StreamableHTTPServerTransport > {
11
- const app = express ( ) ;
12
- app . enable ( "trust proxy" ) ; // needed for reverse proxy support
13
- app . use ( express . urlencoded ( { extended : true } ) ) ;
14
- app . use ( express . json ( ) ) ;
11
+ function promiseHandler (
12
+ fn : ( req : express . Request , res : express . Response , next : express . NextFunction ) => Promise < void >
13
+ ) {
14
+ return ( req : express . Request , res : express . Response , next : express . NextFunction ) => {
15
+ fn ( req , res , next ) . catch ( next ) ;
16
+ } ;
17
+ }
15
18
16
- const transport = new StreamableHTTPServerTransport ( {
17
- sessionIdGenerator : undefined ,
18
- } ) ;
19
+ export class StreamableHttpRunner extends Runner {
20
+ private httpServer : http . Server | undefined ;
19
21
20
- app . post ( "/mcp" , async ( req : express . Request , res : express . Response ) => {
21
- try {
22
- await transport . handleRequest ( req , res , req . body ) ;
23
- } catch ( error ) {
24
- logger . error (
25
- LogId . streamableHttpTransportRequestFailure ,
26
- "streamableHttpTransport" ,
27
- `Error handling request: ${ error instanceof Error ? error . message : String ( error ) } `
28
- ) ;
29
- res . status ( 400 ) . json ( {
30
- jsonrpc : "2.0" ,
31
- error : {
32
- code : JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED ,
33
- message : `failed to handle request` ,
34
- data : error instanceof Error ? error . message : String ( error ) ,
35
- } ,
36
- } ) ;
37
- }
38
- } ) ;
22
+ async run ( ) {
23
+ const app = express ( ) ;
24
+ app . enable ( "trust proxy" ) ; // needed for reverse proxy support
25
+ app . use ( express . urlencoded ( { extended : true } ) ) ;
26
+ app . use ( express . json ( ) ) ;
39
27
40
- app . get ( "/mcp" , async ( req : express . Request , res : express . Response ) => {
41
- try {
42
- await transport . handleRequest ( req , res , req . body ) ;
43
- } catch ( error ) {
44
- logger . error (
45
- LogId . streamableHttpTransportRequestFailure ,
46
- "streamableHttpTransport" ,
47
- `Error handling request: ${ error instanceof Error ? error . message : String ( error ) } `
48
- ) ;
49
- res . status ( 400 ) . json ( {
50
- jsonrpc : "2.0" ,
51
- error : {
52
- code : JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED ,
53
- message : `failed to handle request` ,
54
- data : error instanceof Error ? error . message : String ( error ) ,
55
- } ,
56
- } ) ;
57
- }
58
- } ) ;
28
+ app . post (
29
+ "/mcp" ,
30
+ promiseHandler ( async ( req : express . Request , res : express . Response ) => {
31
+ const transport = new StreamableHTTPServerTransport ( {
32
+ sessionIdGenerator : undefined ,
33
+ } ) ;
59
34
60
- app . delete ( "/mcp" , async ( req : express . Request , res : express . Response ) => {
61
- try {
62
- await transport . handleRequest ( req , res , req . body ) ;
63
- } catch ( error ) {
64
- logger . error (
65
- LogId . streamableHttpTransportRequestFailure ,
66
- "streamableHttpTransport" ,
67
- `Error handling request: ${ error instanceof Error ? error . message : String ( error ) } `
68
- ) ;
69
- res . status ( 400 ) . json ( {
70
- jsonrpc : "2.0" ,
71
- error : {
72
- code : JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED ,
73
- message : `failed to handle request` ,
74
- data : error instanceof Error ? error . message : String ( error ) ,
75
- } ,
76
- } ) ;
77
- }
78
- } ) ;
35
+ const server = this . setupServer ( ) ;
36
+
37
+ await server . connect ( transport ) ;
38
+
39
+ res . on ( "close" , async ( ) => {
40
+ try {
41
+ await transport . close ( ) ;
42
+ } catch ( error : unknown ) {
43
+ logger . error (
44
+ LogId . streamableHttpTransportCloseFailure ,
45
+ "streamableHttpTransport" ,
46
+ `Error closing transport: ${ error instanceof Error ? error . message : String ( error ) } `
47
+ ) ;
48
+ }
49
+ try {
50
+ await server . close ( ) ;
51
+ } catch ( error : unknown ) {
52
+ logger . error (
53
+ LogId . streamableHttpTransportCloseFailure ,
54
+ "streamableHttpTransport" ,
55
+ `Error closing server: ${ error instanceof Error ? error . message : String ( error ) } `
56
+ ) ;
57
+ }
58
+ } ) ;
59
+
60
+ try {
61
+ await transport . handleRequest ( req , res , req . body ) ;
62
+ } catch ( error ) {
63
+ logger . error (
64
+ LogId . streamableHttpTransportRequestFailure ,
65
+ "streamableHttpTransport" ,
66
+ `Error handling request: ${ error instanceof Error ? error . message : String ( error ) } `
67
+ ) ;
68
+ res . status ( 400 ) . json ( {
69
+ jsonrpc : "2.0" ,
70
+ error : {
71
+ code : JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED ,
72
+ message : `failed to handle request` ,
73
+ data : error instanceof Error ? error . message : String ( error ) ,
74
+ } ,
75
+ } ) ;
76
+ }
77
+ } )
78
+ ) ;
79
79
80
- try {
81
- const server = await new Promise < http . Server > ( ( resolve , reject ) => {
80
+ app . get (
81
+ "/mcp" ,
82
+ promiseHandler ( async ( req : express . Request , res : express . Response ) => {
83
+ res . status ( 405 ) . json ( {
84
+ jsonrpc : "2.0" ,
85
+ error : {
86
+ code : JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED ,
87
+ message : `method not allowed` ,
88
+ } ,
89
+ } ) ;
90
+ } )
91
+ ) ;
92
+
93
+ app . delete (
94
+ "/mcp" ,
95
+ promiseHandler ( async ( req : express . Request , res : express . Response ) => {
96
+ res . status ( 405 ) . json ( {
97
+ jsonrpc : "2.0" ,
98
+ error : {
99
+ code : JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED ,
100
+ message : `method not allowed` ,
101
+ } ,
102
+ } ) ;
103
+ } )
104
+ ) ;
105
+
106
+ this . httpServer = await new Promise < http . Server > ( ( resolve , reject ) => {
82
107
const result = app . listen ( config . httpPort , config . httpHost , ( err ?: Error ) => {
83
108
if ( err ) {
84
109
reject ( err ) ;
@@ -93,31 +118,16 @@ export async function createHttpTransport(): Promise<StreamableHTTPServerTranspo
93
118
"streamableHttpTransport" ,
94
119
`Server started on http://${ config . httpHost } :${ config . httpPort } `
95
120
) ;
121
+ }
96
122
97
- transport . onclose = ( ) => {
98
- logger . info ( LogId . streamableHttpTransportCloseRequested , "streamableHttpTransport" , `Closing server` ) ;
99
- server . close ( ( err ?: Error ) => {
100
- if ( err ) {
101
- logger . error (
102
- LogId . streamableHttpTransportCloseFailure ,
103
- "streamableHttpTransport" ,
104
- `Error closing server: ${ err . message } `
105
- ) ;
106
- return ;
107
- }
108
- logger . info ( LogId . streamableHttpTransportCloseSuccess , "streamableHttpTransport" , `Server closed` ) ;
109
- } ) ;
110
- } ;
111
-
112
- return transport ;
113
- } catch ( error : unknown ) {
114
- const err = error instanceof Error ? error : new Error ( String ( error ) ) ;
115
- logger . info (
116
- LogId . streamableHttpTransportStartFailure ,
117
- "streamableHttpTransport" ,
118
- `Error starting server: ${ err . message } `
119
- ) ;
120
-
121
- throw err ;
123
+ async close ( ) : Promise < number > {
124
+ try {
125
+ await this . httpServer ?. close ( ) ;
126
+ return 0 ;
127
+ } catch ( error : unknown ) {
128
+ const err = error instanceof Error ? error : new Error ( String ( error ) ) ;
129
+ logger . error ( LogId . serverCloseFailure , "server" , `Error closing server: ${ err . message } ` ) ;
130
+ return 1 ;
131
+ }
122
132
}
123
133
}
0 commit comments