2
2
3
3
import com .fasterxml .jackson .databind .ObjectMapper ;
4
4
import io .modelcontextprotocol .server .McpStatelessServerHandler ;
5
- import io .modelcontextprotocol .spec .DefaultMcpTransportContext ;
5
+ import io .modelcontextprotocol .server .DefaultMcpTransportContext ;
6
+ import io .modelcontextprotocol .server .McpTransportContextExtractor ;
6
7
import io .modelcontextprotocol .spec .McpError ;
7
8
import io .modelcontextprotocol .spec .McpSchema ;
8
9
import io .modelcontextprotocol .spec .McpStatelessServerTransport ;
9
- import io .modelcontextprotocol .spec .McpTransportContext ;
10
+ import io .modelcontextprotocol .server .McpTransportContext ;
10
11
import io .modelcontextprotocol .util .Assert ;
11
12
import org .slf4j .Logger ;
12
13
import org .slf4j .LoggerFactory ;
19
20
import reactor .core .publisher .Mono ;
20
21
21
22
import java .io .IOException ;
23
+ import java .util .List ;
22
24
import java .util .function .Function ;
23
25
26
+ /**
27
+ * Implementation of a WebFlux based {@link McpStatelessServerTransport}.
28
+ *
29
+ * @author Dariusz Jędrzejczyk
30
+ */
24
31
public class WebFluxStatelessServerTransport implements McpStatelessServerTransport {
25
32
26
33
private static final Logger logger = LoggerFactory .getLogger (WebFluxStatelessServerTransport .class );
27
34
28
- public static final String DEFAULT_BASE_URL = "" ;
29
-
30
35
private final ObjectMapper objectMapper ;
31
36
32
- private final String baseUrl ;
33
-
34
37
private final String mcpEndpoint ;
35
38
36
39
private final RouterFunction <?> routerFunction ;
37
40
38
41
private McpStatelessServerHandler mcpHandler ;
39
42
40
- // TODO: add means to specify this
41
- private Function <ServerRequest , McpTransportContext > contextExtractor = req -> new DefaultMcpTransportContext ();
43
+ private McpTransportContextExtractor <ServerRequest > contextExtractor ;
42
44
43
- /**
44
- * Flag indicating if the transport is shutting down.
45
- */
46
45
private volatile boolean isClosing = false ;
47
46
48
- /**
49
- * Constructs a new WebFlux SSE server transport provider instance.
50
- * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
51
- * of MCP messages. Must not be null.
52
- * @param baseUrl webflux message base path
53
- * @param mcpEndpoint The endpoint URI where clients should send their JSON-RPC
54
- * messages. This endpoint will be communicated to clients during SSE connection
55
- * setup. Must not be null.
56
- * @throws IllegalArgumentException if either parameter is null
57
- */
58
- public WebFluxStatelessServerTransport (ObjectMapper objectMapper , String baseUrl , String mcpEndpoint ) {
59
- Assert .notNull (objectMapper , "ObjectMapper must not be null" );
60
- Assert .notNull (baseUrl , "Message base path must not be null" );
61
- Assert .notNull (mcpEndpoint , "Message endpoint must not be null" );
47
+ private WebFluxStatelessServerTransport (ObjectMapper objectMapper , String mcpEndpoint ,
48
+ McpTransportContextExtractor <ServerRequest > contextExtractor ) {
49
+ Assert .notNull (objectMapper , "objectMapper must not be null" );
50
+ Assert .notNull (mcpEndpoint , "mcpEndpoint must not be null" );
51
+ Assert .notNull (contextExtractor , "contextExtractor must not be null" );
62
52
63
53
this .objectMapper = objectMapper ;
64
- this .baseUrl = baseUrl ;
65
54
this .mcpEndpoint = mcpEndpoint ;
55
+ this .contextExtractor = contextExtractor ;
66
56
this .routerFunction = RouterFunctions .route ()
67
57
.GET (this .mcpEndpoint , this ::handleGet )
68
58
.POST (this .mcpEndpoint , this ::handlePost )
@@ -74,75 +64,43 @@ public void setMcpHandler(McpStatelessServerHandler mcpHandler) {
74
64
this .mcpHandler = mcpHandler ;
75
65
}
76
66
77
- // FIXME: This javadoc makes claims about using isClosing flag but it's not
78
- // actually
79
- // doing that.
80
- /**
81
- * Initiates a graceful shutdown of all the sessions. This method ensures all active
82
- * sessions are properly closed and cleaned up.
83
- *
84
- * <p>
85
- * The shutdown process:
86
- * <ul>
87
- * <li>Marks the transport as closing to prevent new connections</li>
88
- * <li>Closes each active session</li>
89
- * <li>Removes closed sessions from the sessions map</li>
90
- * <li>Times out after 5 seconds if shutdown takes too long</li>
91
- * </ul>
92
- * @return A Mono that completes when all sessions have been closed
93
- */
94
67
@ Override
95
68
public Mono <Void > closeGracefully () {
96
- return Mono .empty ( );
69
+ return Mono .fromRunnable (() -> this . isClosing = true );
97
70
}
98
71
99
72
/**
100
73
* Returns the WebFlux router function that defines the transport's HTTP endpoints.
101
74
* This router function should be integrated into the application's web configuration.
102
75
*
103
76
* <p>
104
- * The router function defines two endpoints :
77
+ * The router function defines one endpoint handling two HTTP methods :
105
78
* <ul>
106
- * <li>GET {sseEndpoint } - For establishing SSE connections </li>
107
- * <li>POST {messageEndpoint} - For receiving client messages </li>
79
+ * <li>GET {messageEndpoint } - Unsupported, returns 405 METHOD NOT ALLOWED </li>
80
+ * <li>POST {messageEndpoint} - For handling client requests and notifications </li>
108
81
* </ul>
109
82
* @return The configured {@link RouterFunction} for handling HTTP requests
110
83
*/
111
84
public RouterFunction <?> getRouterFunction () {
112
85
return this .routerFunction ;
113
86
}
114
87
115
- /**
116
- * Handles GET requests from clients.
117
- * @param request The incoming server request
118
- * @return A Mono which emits a response informing the client that listening stream is
119
- * unavailable
120
- */
121
88
private Mono <ServerResponse > handleGet (ServerRequest request ) {
122
89
return ServerResponse .status (HttpStatus .METHOD_NOT_ALLOWED ).build ();
123
90
}
124
91
125
- /**
126
- * Handles incoming JSON-RPC messages from clients. Deserializes the message and
127
- * processes it through the configured message handler.
128
- *
129
- * <p>
130
- * The handler:
131
- * <ul>
132
- * <li>Deserializes the incoming JSON-RPC message</li>
133
- * <li>Passes it through the message handler chain</li>
134
- * <li>Returns appropriate HTTP responses based on processing results</li>
135
- * <li>Handles various error conditions with appropriate error responses</li>
136
- * </ul>
137
- * @param request The incoming server request containing the JSON-RPC message
138
- * @return A Mono emitting the response indicating the message processing result
139
- */
140
92
private Mono <ServerResponse > handlePost (ServerRequest request ) {
141
93
if (isClosing ) {
142
94
return ServerResponse .status (HttpStatus .SERVICE_UNAVAILABLE ).bodyValue ("Server is shutting down" );
143
95
}
144
96
145
- McpTransportContext transportContext = this .contextExtractor .apply (request );
97
+ McpTransportContext transportContext = this .contextExtractor .extract (request , new DefaultMcpTransportContext ());
98
+
99
+ List <MediaType > acceptHeaders = request .headers ().asHttpHeaders ().getAccept ();
100
+ if (!(acceptHeaders .contains (MediaType .APPLICATION_JSON )
101
+ && acceptHeaders .contains (MediaType .TEXT_EVENT_STREAM ))) {
102
+ return ServerResponse .badRequest ().build ();
103
+ }
146
104
147
105
return request .bodyToMono (String .class ).<ServerResponse >flatMap (body -> {
148
106
try {
@@ -170,6 +128,10 @@ else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
170
128
}).contextWrite (ctx -> ctx .put (McpTransportContext .KEY , transportContext ));
171
129
}
172
130
131
+ /**
132
+ * Create a builder for the server.
133
+ * @return a fresh {@link Builder} instance.
134
+ */
173
135
public static Builder builder () {
174
136
return new Builder ();
175
137
}
@@ -184,10 +146,14 @@ public static class Builder {
184
146
185
147
private ObjectMapper objectMapper ;
186
148
187
- private String baseUrl = DEFAULT_BASE_URL ;
188
-
189
149
private String mcpEndpoint = "/mcp" ;
190
150
151
+ private McpTransportContextExtractor <ServerRequest > contextExtractor = (serverRequest , context ) -> context ;
152
+
153
+ private Builder () {
154
+ // used by a static method
155
+ }
156
+
191
157
/**
192
158
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
193
159
* messages.
@@ -201,19 +167,6 @@ public Builder objectMapper(ObjectMapper objectMapper) {
201
167
return this ;
202
168
}
203
169
204
- /**
205
- * Sets the project basePath as endpoint prefix where clients should send their
206
- * JSON-RPC messages
207
- * @param baseUrl the message basePath . Must not be null.
208
- * @return this builder instance
209
- * @throws IllegalArgumentException if basePath is null
210
- */
211
- public Builder basePath (String baseUrl ) {
212
- Assert .notNull (baseUrl , "basePath must not be null" );
213
- this .baseUrl = baseUrl ;
214
- return this ;
215
- }
216
-
217
170
/**
218
171
* Sets the endpoint URI where clients should send their JSON-RPC messages.
219
172
* @param messageEndpoint The message endpoint URI. Must not be null.
@@ -226,6 +179,22 @@ public Builder messageEndpoint(String messageEndpoint) {
226
179
return this ;
227
180
}
228
181
182
+ /**
183
+ * Sets the context extractor that allows providing the MCP feature
184
+ * implementations to inspect HTTP transport level metadata that was present at
185
+ * HTTP request processing time. This allows to extract custom headers and other
186
+ * useful data for use during execution later on in the process.
187
+ * @param contextExtractor The contextExtractor to fill in a
188
+ * {@link McpTransportContext}.
189
+ * @return this builder instance
190
+ * @throws IllegalArgumentException if contextExtractor is null
191
+ */
192
+ public Builder contextExtractor (McpTransportContextExtractor <ServerRequest > contextExtractor ) {
193
+ Assert .notNull (contextExtractor , "Context extractor must not be null" );
194
+ this .contextExtractor = contextExtractor ;
195
+ return this ;
196
+ }
197
+
229
198
/**
230
199
* Builds a new instance of {@link WebFluxStatelessServerTransport} with the
231
200
* configured settings.
@@ -236,7 +205,7 @@ public WebFluxStatelessServerTransport build() {
236
205
Assert .notNull (objectMapper , "ObjectMapper must be set" );
237
206
Assert .notNull (mcpEndpoint , "Message endpoint must be set" );
238
207
239
- return new WebFluxStatelessServerTransport (objectMapper , baseUrl , mcpEndpoint );
208
+ return new WebFluxStatelessServerTransport (objectMapper , mcpEndpoint , contextExtractor );
240
209
}
241
210
242
211
}
0 commit comments