Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit 299eee9

Browse files
committed
docs: improve transport layer documentation and code cleanup
- Add comprehensive Javadoc for SSE transport implementations - Document key features and implementation details for WebFlux/WebMVC transports - Refactor FlowSseClient to use records for better immutability - Fix typo in prompt property name - Remove unused code and improve readability
1 parent 2a61236 commit 299eee9

File tree

7 files changed

+511
-111
lines changed

7 files changed

+511
-111
lines changed

mcp-transport/mcp-webflux-sse-transport/src/main/java/org/springframework/ai/mcp/client/transport/WebFluxSseClientTransport.java

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,9 @@
6161
* <li>The client establishes an SSE connection to the server's /sse endpoint</li>
6262
* <li>The server sends an 'endpoint' event containing the URI for sending messages</li>
6363
* </ol>
64-
*
65-
* This implementation handles automatic reconnection for transient failures and provides
66-
* graceful shutdown capabilities. It uses {@link WebClient} for HTTP communications and
67-
* supports JSON serialization/deserialization of messages.
64+
*
65+
* This implementation uses {@link WebClient} for HTTP communications and supports JSON
66+
* serialization/deserialization of messages.
6867
*
6968
* @author Christian Tzolov
7069
* @see <a href=
@@ -157,6 +156,28 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe
157156
this.webClient = webClientBuilder.build();
158157
}
159158

159+
/**
160+
* Establishes a connection to the MCP server using Server-Sent Events (SSE). This
161+
* method initiates the SSE connection and sets up the message processing pipeline.
162+
*
163+
* <p>
164+
* The connection process follows these steps:
165+
* <ol>
166+
* <li>Establishes an SSE connection to the server's /sse endpoint</li>
167+
* <li>Waits for the server to send an 'endpoint' event with the message posting
168+
* URI</li>
169+
* <li>Sets up message handling for incoming JSON-RPC messages</li>
170+
* </ol>
171+
*
172+
* <p>
173+
* The connection is considered established only after receiving the endpoint event
174+
* from the server.
175+
* @param handler a function that processes incoming JSON-RPC messages and returns
176+
* responses
177+
* @return a Mono that completes when the connection is fully established
178+
* @throws McpError if there's an error processing SSE events or if an unrecognized
179+
* event type is received
180+
*/
160181
@Override
161182
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
162183
Flux<ServerSentEvent<String>> events = eventStream();
@@ -190,6 +211,18 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
190211
return messageEndpointSink.asMono().then();
191212
}
192213

214+
/**
215+
* Sends a JSON-RPC message to the server using the endpoint provided during
216+
* connection.
217+
*
218+
* <p>
219+
* Messages are sent via HTTP POST requests to the server-provided endpoint URI. The
220+
* message is serialized to JSON before transmission. If the transport is in the
221+
* process of closing, the message send operation is skipped gracefully.
222+
* @param message the JSON-RPC message to send
223+
* @return a Mono that completes when the message has been sent successfully
224+
* @throws RuntimeException if message serialization fails
225+
*/
193226
@Override
194227
public Mono<Void> sendMessage(JSONRPCMessage message) {
195228
// The messageEndpoint is the endpoint URI to send the messages
@@ -281,6 +314,20 @@ public Mono<Void> closeGracefully() { // @formatter:off
281314
.subscribeOn(Schedulers.boundedElastic());
282315
} // @formatter:on
283316

317+
/**
318+
* Unmarshals data from a generic Object into the specified type using the configured
319+
* ObjectMapper.
320+
*
321+
* <p>
322+
* This method is particularly useful when working with JSON-RPC parameters or result
323+
* objects that need to be converted to specific Java types. It leverages Jackson's
324+
* type conversion capabilities to handle complex object structures.
325+
* @param <T> the target type to convert the data into
326+
* @param data the source object to convert
327+
* @param typeRef the TypeReference describing the target type
328+
* @return the unmarshaled object of type T
329+
* @throws IllegalArgumentException if the conversion cannot be performed
330+
*/
284331
@Override
285332
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
286333
return this.objectMapper.convertValue(data, typeRef);

mcp-transport/mcp-webflux-sse-transport/src/main/java/org/springframework/ai/mcp/server/transport/WebFluxSseServerTransport.java

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,37 @@
2828
import org.springframework.web.reactive.function.server.ServerResponse;
2929

3030
/**
31-
* Server-side implementation of the MCP HTTP with SSE transport specification. Provides
32-
* endpoints for SSE connections and message handling.
31+
* Server-side implementation of the MCP (Model Context Protocol) HTTP transport using
32+
* Server-Sent Events (SSE). This implementation provides a bidirectional communication
33+
* channel between MCP clients and servers using HTTP POST for client-to-server messages
34+
* and SSE for server-to-client messages.
35+
*
36+
* <p>
37+
* Key features:
38+
* <ul>
39+
* <li>Implements the {@link ServerMcpTransport} interface for MCP server transport
40+
* functionality</li>
41+
* <li>Uses WebFlux for non-blocking request handling and SSE support</li>
42+
* <li>Maintains client sessions for reliable message delivery</li>
43+
* <li>Supports graceful shutdown with session cleanup</li>
44+
* <li>Thread-safe message broadcasting to multiple clients</li>
45+
* </ul>
46+
*
47+
* <p>
48+
* The transport sets up two main endpoints:
49+
* <ul>
50+
* <li>SSE endpoint (/sse) - For establishing SSE connections with clients</li>
51+
* <li>Message endpoint (configurable) - For receiving JSON-RPC messages from clients</li>
52+
* </ul>
53+
*
54+
* <p>
55+
* This implementation is thread-safe and can handle multiple concurrent client
56+
* connections. It uses {@link ConcurrentHashMap} for session management and Reactor's
57+
* {@link Sinks} for thread-safe message broadcasting.
3358
*
3459
* @author Christian Tzolov
60+
* @see ServerMcpTransport
61+
* @see ServerSentEvent
3562
*/
3663
public class WebFluxSseServerTransport implements ServerMcpTransport {
3764

@@ -71,9 +98,13 @@ public class WebFluxSseServerTransport implements ServerMcpTransport {
7198
private Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> connectHandler;
7299

73100
/**
74-
* Constructs a new SseServerTransport.
101+
* Constructs a new WebFlux SSE server transport instance.
75102
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
76-
* @param messageEndpoint The endpoint URI where clients should send messages
103+
* of MCP messages. Must not be null.
104+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
105+
* messages. This endpoint will be communicated to clients during SSE connection
106+
* setup. Must not be null.
107+
* @throws IllegalArgumentException if either parameter is null
77108
*/
78109
public WebFluxSseServerTransport(ObjectMapper objectMapper, String messageEndpoint) {
79110
Assert.notNull(objectMapper, "ObjectMapper must not be null");
@@ -87,13 +118,40 @@ public WebFluxSseServerTransport(ObjectMapper objectMapper, String messageEndpoi
87118
.build();
88119
}
89120

121+
/**
122+
* Configures the message handler for this transport. In the WebFlux SSE
123+
* implementation, this method stores the handler for processing incoming messages but
124+
* doesn't establish any connections since the server accepts connections rather than
125+
* initiating them.
126+
* @param handler A function that processes incoming JSON-RPC messages and returns
127+
* responses. This handler will be called for each message received through the
128+
* message endpoint.
129+
* @return An empty Mono since the server doesn't initiate connections
130+
*/
90131
@Override
91132
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
92133
this.connectHandler = handler;
93134
// Server-side transport doesn't initiate connections
94135
return Mono.empty().then();
95136
}
96137

138+
/**
139+
* Broadcasts a JSON-RPC message to all connected clients through their SSE
140+
* connections. The message is serialized to JSON and sent as a server-sent event to
141+
* each active session.
142+
*
143+
* <p>
144+
* The method:
145+
* <ul>
146+
* <li>Serializes the message to JSON</li>
147+
* <li>Creates a server-sent event with the message data</li>
148+
* <li>Attempts to send the event to all active sessions</li>
149+
* <li>Tracks and reports any delivery failures</li>
150+
* </ul>
151+
* @param message The JSON-RPC message to broadcast
152+
* @return A Mono that completes when the message has been sent to all sessions, or
153+
* errors if any session fails to receive the message
154+
*/
97155
@Override
98156
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
99157
if (sessions.isEmpty()) {
@@ -133,11 +191,35 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
133191
});
134192
}
135193

194+
/**
195+
* Converts data from one type to another using the configured ObjectMapper. This
196+
* method is primarily used for converting between different representations of
197+
* JSON-RPC message data.
198+
* @param <T> The target type to convert to
199+
* @param data The source data to convert
200+
* @param typeRef Type reference describing the target type
201+
* @return The converted data
202+
* @throws IllegalArgumentException if the conversion fails
203+
*/
136204
@Override
137205
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
138206
return this.objectMapper.convertValue(data, typeRef);
139207
}
140208

209+
/**
210+
* Initiates a graceful shutdown of the transport. This method ensures all active
211+
* sessions are properly closed and cleaned up.
212+
*
213+
* <p>
214+
* The shutdown process:
215+
* <ul>
216+
* <li>Marks the transport as closing to prevent new connections</li>
217+
* <li>Closes each active session</li>
218+
* <li>Removes closed sessions from the sessions map</li>
219+
* <li>Times out after 5 seconds if shutdown takes too long</li>
220+
* </ul>
221+
* @return A Mono that completes when all sessions have been closed
222+
*/
141223
@Override
142224
public Mono<Void> closeGracefully() {
143225
return Mono.fromRunnable(() -> {
@@ -155,14 +237,36 @@ public Mono<Void> closeGracefully() {
155237
}
156238

157239
/**
158-
* Get the router function for configuring the web server.
240+
* Returns the WebFlux router function that defines the transport's HTTP endpoints.
241+
* This router function should be integrated into the application's web configuration.
242+
*
243+
* <p>
244+
* The router function defines two endpoints:
245+
* <ul>
246+
* <li>GET {SSE_ENDPOINT} - For establishing SSE connections</li>
247+
* <li>POST {messageEndpoint} - For receiving client messages</li>
248+
* </ul>
249+
* @return The configured {@link RouterFunction} for handling HTTP requests
159250
*/
160251
public RouterFunction<?> getRouterFunction() {
161252
return this.routerFunction;
162253
}
163254

164255
/**
165-
* Handles new SSE connection requests from clients.
256+
* Handles new SSE connection requests from clients. Creates a new session for each
257+
* connection and sets up the SSE event stream.
258+
*
259+
* <p>
260+
* The handler performs the following steps:
261+
* <ul>
262+
* <li>Generates a unique session ID</li>
263+
* <li>Creates a new ClientSession instance</li>
264+
* <li>Sends the message endpoint URI as an initial event</li>
265+
* <li>Sets up message forwarding for the session</li>
266+
* <li>Handles connection cleanup on completion or errors</li>
267+
* </ul>
268+
* @param request The incoming server request
269+
* @return A response with the SSE event stream
166270
*/
167271
private Mono<ServerResponse> handleSseConnection(ServerRequest request) {
168272
if (isClosing) {
@@ -208,7 +312,19 @@ private Mono<ServerResponse> handleSseConnection(ServerRequest request) {
208312
}
209313

210314
/**
211-
* Handles incoming messages from clients.
315+
* Handles incoming JSON-RPC messages from clients. Deserializes the message and
316+
* processes it through the configured message handler.
317+
*
318+
* <p>
319+
* The handler:
320+
* <ul>
321+
* <li>Deserializes the incoming JSON-RPC message</li>
322+
* <li>Passes it through the message handler chain</li>
323+
* <li>Returns appropriate HTTP responses based on processing results</li>
324+
* <li>Handles various error conditions with appropriate error responses</li>
325+
* </ul>
326+
* @param request The incoming server request containing the JSON-RPC message
327+
* @return A response indicating the message processing result
212328
*/
213329
private Mono<ServerResponse> handleMessage(ServerRequest request) {
214330
if (isClosing) {
@@ -239,7 +355,16 @@ private Mono<ServerResponse> handleMessage(ServerRequest request) {
239355
}
240356

241357
/**
242-
* Represents an active client session.
358+
* Represents an active client SSE connection session. Manages the message sink for
359+
* sending events to the client and handles session lifecycle.
360+
*
361+
* <p>
362+
* Each session:
363+
* <ul>
364+
* <li>Has a unique identifier</li>
365+
* <li>Maintains its own message sink for event broadcasting</li>
366+
* <li>Supports clean shutdown through the close method</li>
367+
* </ul>
243368
*/
244369
private static class ClientSession {
245370

0 commit comments

Comments
 (0)