Skip to content

Commit 0a4ecb0

Browse files
committed
WIP: stateless server implementation
Signed-off-by: Dariusz Jędrzejczyk <[email protected]>
1 parent b382ac5 commit 0a4ecb0

File tree

8 files changed

+925
-32
lines changed

8 files changed

+925
-32
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package io.modelcontextprotocol.server.transport;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.server.McpStatelessServerHandler;
5+
import io.modelcontextprotocol.spec.DefaultMcpTransportContext;
6+
import io.modelcontextprotocol.spec.McpError;
7+
import io.modelcontextprotocol.spec.McpSchema;
8+
import io.modelcontextprotocol.spec.McpStatelessServerTransport;
9+
import io.modelcontextprotocol.spec.McpTransportContext;
10+
import io.modelcontextprotocol.util.Assert;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import org.springframework.http.HttpStatus;
14+
import org.springframework.http.MediaType;
15+
import org.springframework.web.reactive.function.server.RouterFunction;
16+
import org.springframework.web.reactive.function.server.RouterFunctions;
17+
import org.springframework.web.reactive.function.server.ServerRequest;
18+
import org.springframework.web.reactive.function.server.ServerResponse;
19+
import reactor.core.publisher.Mono;
20+
21+
import java.io.IOException;
22+
import java.util.function.Function;
23+
24+
public class WebFluxStatelessServerTransport implements McpStatelessServerTransport {
25+
26+
private static final Logger logger = LoggerFactory.getLogger(WebFluxStatelessServerTransport.class);
27+
28+
public static final String DEFAULT_BASE_URL = "";
29+
30+
private final ObjectMapper objectMapper;
31+
32+
private final String baseUrl;
33+
34+
private final String mcpEndpoint;
35+
36+
private final RouterFunction<?> routerFunction;
37+
38+
private McpStatelessServerHandler mcpHandler;
39+
40+
// TODO: add means to specify this
41+
private Function<ServerRequest, McpTransportContext> contextExtractor = req -> new DefaultMcpTransportContext();
42+
43+
/**
44+
* Flag indicating if the transport is shutting down.
45+
*/
46+
private volatile boolean isClosing = false;
47+
48+
/**
49+
* Constructs a new WebFlux SSE server transport provider instance.
50+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
51+
* of MCP messages. Must not be null.
52+
* @param baseUrl webflux message base path
53+
* @param mcpEndpoint The endpoint URI where clients should send their JSON-RPC
54+
* messages. This endpoint will be communicated to clients during SSE connection
55+
* setup. Must not be null.
56+
* @throws IllegalArgumentException if either parameter is null
57+
*/
58+
public WebFluxStatelessServerTransport(ObjectMapper objectMapper, String baseUrl, String mcpEndpoint) {
59+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
60+
Assert.notNull(baseUrl, "Message base path must not be null");
61+
Assert.notNull(mcpEndpoint, "Message endpoint must not be null");
62+
63+
this.objectMapper = objectMapper;
64+
this.baseUrl = baseUrl;
65+
this.mcpEndpoint = mcpEndpoint;
66+
this.routerFunction = RouterFunctions.route()
67+
.GET(this.mcpEndpoint, this::handleGet)
68+
.POST(this.mcpEndpoint, this::handlePost)
69+
.build();
70+
}
71+
72+
@Override
73+
public void setMcpHandler(McpStatelessServerHandler mcpHandler) {
74+
this.mcpHandler = mcpHandler;
75+
}
76+
77+
// FIXME: This javadoc makes claims about using isClosing flag but it's not
78+
// actually
79+
// doing that.
80+
/**
81+
* Initiates a graceful shutdown of all the sessions. This method ensures all active
82+
* sessions are properly closed and cleaned up.
83+
*
84+
* <p>
85+
* The shutdown process:
86+
* <ul>
87+
* <li>Marks the transport as closing to prevent new connections</li>
88+
* <li>Closes each active session</li>
89+
* <li>Removes closed sessions from the sessions map</li>
90+
* <li>Times out after 5 seconds if shutdown takes too long</li>
91+
* </ul>
92+
* @return A Mono that completes when all sessions have been closed
93+
*/
94+
@Override
95+
public Mono<Void> closeGracefully() {
96+
return Mono.empty();
97+
}
98+
99+
/**
100+
* Returns the WebFlux router function that defines the transport's HTTP endpoints.
101+
* This router function should be integrated into the application's web configuration.
102+
*
103+
* <p>
104+
* The router function defines two endpoints:
105+
* <ul>
106+
* <li>GET {sseEndpoint} - For establishing SSE connections</li>
107+
* <li>POST {messageEndpoint} - For receiving client messages</li>
108+
* </ul>
109+
* @return The configured {@link RouterFunction} for handling HTTP requests
110+
*/
111+
public RouterFunction<?> getRouterFunction() {
112+
return this.routerFunction;
113+
}
114+
115+
/**
116+
* Handles GET requests from clients.
117+
* @param request The incoming server request
118+
* @return A Mono which emits a response informing the client that listening stream is
119+
* unavailable
120+
*/
121+
private Mono<ServerResponse> handleGet(ServerRequest request) {
122+
return ServerResponse.status(HttpStatus.METHOD_NOT_ALLOWED).build();
123+
}
124+
125+
/**
126+
* Handles incoming JSON-RPC messages from clients. Deserializes the message and
127+
* processes it through the configured message handler.
128+
*
129+
* <p>
130+
* The handler:
131+
* <ul>
132+
* <li>Deserializes the incoming JSON-RPC message</li>
133+
* <li>Passes it through the message handler chain</li>
134+
* <li>Returns appropriate HTTP responses based on processing results</li>
135+
* <li>Handles various error conditions with appropriate error responses</li>
136+
* </ul>
137+
* @param request The incoming server request containing the JSON-RPC message
138+
* @return A Mono emitting the response indicating the message processing result
139+
*/
140+
private Mono<ServerResponse> handlePost(ServerRequest request) {
141+
if (isClosing) {
142+
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
143+
}
144+
145+
McpTransportContext transportContext = this.contextExtractor.apply(request);
146+
147+
return request.bodyToMono(String.class).<ServerResponse>flatMap(body -> {
148+
try {
149+
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
150+
151+
if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
152+
return this.mcpHandler.handleRequest(transportContext, jsonrpcRequest)
153+
.flatMap(jsonrpcResponse -> ServerResponse.ok()
154+
.contentType(MediaType.APPLICATION_JSON)
155+
.bodyValue(jsonrpcResponse));
156+
}
157+
else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
158+
return this.mcpHandler.handleNotification(transportContext, jsonrpcNotification)
159+
.then(ServerResponse.accepted().build());
160+
}
161+
else {
162+
return ServerResponse.badRequest()
163+
.bodyValue(new McpError("The server accepts either requests or notifications"));
164+
}
165+
}
166+
catch (IllegalArgumentException | IOException e) {
167+
logger.error("Failed to deserialize message: {}", e.getMessage());
168+
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
169+
}
170+
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
171+
}
172+
173+
public static Builder builder() {
174+
return new Builder();
175+
}
176+
177+
/**
178+
* Builder for creating instances of {@link WebFluxStatelessServerTransport}.
179+
* <p>
180+
* This builder provides a fluent API for configuring and creating instances of
181+
* WebFluxSseServerTransportProvider with custom settings.
182+
*/
183+
public static class Builder {
184+
185+
private ObjectMapper objectMapper;
186+
187+
private String baseUrl = DEFAULT_BASE_URL;
188+
189+
private String mcpEndpoint = "/mcp";
190+
191+
/**
192+
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
193+
* messages.
194+
* @param objectMapper The ObjectMapper instance. Must not be null.
195+
* @return this builder instance
196+
* @throws IllegalArgumentException if objectMapper is null
197+
*/
198+
public Builder objectMapper(ObjectMapper objectMapper) {
199+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
200+
this.objectMapper = objectMapper;
201+
return this;
202+
}
203+
204+
/**
205+
* Sets the project basePath as endpoint prefix where clients should send their
206+
* JSON-RPC messages
207+
* @param baseUrl the message basePath . Must not be null.
208+
* @return this builder instance
209+
* @throws IllegalArgumentException if basePath is null
210+
*/
211+
public Builder basePath(String baseUrl) {
212+
Assert.notNull(baseUrl, "basePath must not be null");
213+
this.baseUrl = baseUrl;
214+
return this;
215+
}
216+
217+
/**
218+
* Sets the endpoint URI where clients should send their JSON-RPC messages.
219+
* @param messageEndpoint The message endpoint URI. Must not be null.
220+
* @return this builder instance
221+
* @throws IllegalArgumentException if messageEndpoint is null
222+
*/
223+
public Builder messageEndpoint(String messageEndpoint) {
224+
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
225+
this.mcpEndpoint = messageEndpoint;
226+
return this;
227+
}
228+
229+
/**
230+
* Builds a new instance of {@link WebFluxStatelessServerTransport} with the
231+
* configured settings.
232+
* @return A new WebFluxSseServerTransportProvider instance
233+
* @throws IllegalStateException if required parameters are not set
234+
*/
235+
public WebFluxStatelessServerTransport build() {
236+
Assert.notNull(objectMapper, "ObjectMapper must be set");
237+
Assert.notNull(mcpEndpoint, "Message endpoint must be set");
238+
239+
return new WebFluxStatelessServerTransport(objectMapper, baseUrl, mcpEndpoint);
240+
}
241+
242+
}
243+
244+
}

0 commit comments

Comments
 (0)