@@ -82,8 +82,16 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
8282 */
8383 public static final String DEFAULT_SSE_ENDPOINT = "/sse" ;
8484
85+ public static final String DEFAULT_BASE_URL = "" ;
86+
8587 private final ObjectMapper objectMapper ;
8688
89+ /**
90+ * Base URL for the message endpoint. This is used to construct the full URL for
91+ * clients to send their JSON-RPC messages.
92+ */
93+ private final String baseUrl ;
94+
8795 private final String messageEndpoint ;
8896
8997 private final String sseEndpoint ;
@@ -102,6 +110,20 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
102110 */
103111 private volatile boolean isClosing = false ;
104112
113+ /**
114+ * Constructs a new WebFlux SSE server transport provider instance with the default
115+ * SSE endpoint.
116+ * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
117+ * of MCP messages. Must not be null.
118+ * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
119+ * messages. This endpoint will be communicated to clients during SSE connection
120+ * setup. Must not be null.
121+ * @throws IllegalArgumentException if either parameter is null
122+ */
123+ public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String messageEndpoint ) {
124+ this (objectMapper , messageEndpoint , DEFAULT_SSE_ENDPOINT );
125+ }
126+
105127 /**
106128 * Constructs a new WebFlux SSE server transport provider instance.
107129 * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
@@ -112,11 +134,28 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
112134 * @throws IllegalArgumentException if either parameter is null
113135 */
114136 public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String messageEndpoint , String sseEndpoint ) {
137+ this (objectMapper , DEFAULT_BASE_URL , messageEndpoint , sseEndpoint );
138+ }
139+
140+ /**
141+ * Constructs a new WebFlux SSE server transport provider instance.
142+ * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
143+ * of MCP messages. Must not be null.
144+ * @param baseUrl webflux messag base path
145+ * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
146+ * messages. This endpoint will be communicated to clients during SSE connection
147+ * setup. Must not be null.
148+ * @throws IllegalArgumentException if either parameter is null
149+ */
150+ public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String baseUrl , String messageEndpoint ,
151+ String sseEndpoint ) {
115152 Assert .notNull (objectMapper , "ObjectMapper must not be null" );
153+ Assert .notNull (baseUrl , "Message base path must not be null" );
116154 Assert .notNull (messageEndpoint , "Message endpoint must not be null" );
117155 Assert .notNull (sseEndpoint , "SSE endpoint must not be null" );
118156
119157 this .objectMapper = objectMapper ;
158+ this .baseUrl = baseUrl ;
120159 this .messageEndpoint = messageEndpoint ;
121160 this .sseEndpoint = sseEndpoint ;
122161 this .routerFunction = RouterFunctions .route ()
@@ -125,20 +164,6 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
125164 .build ();
126165 }
127166
128- /**
129- * Constructs a new WebFlux SSE server transport provider instance with the default
130- * SSE endpoint.
131- * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
132- * of MCP messages. Must not be null.
133- * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
134- * messages. This endpoint will be communicated to clients during SSE connection
135- * setup. Must not be null.
136- * @throws IllegalArgumentException if either parameter is null
137- */
138- public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String messageEndpoint ) {
139- this (objectMapper , messageEndpoint , DEFAULT_SSE_ENDPOINT );
140- }
141-
142167 @ Override
143168 public void setSessionFactory (McpServerSession .Factory sessionFactory ) {
144169 this .sessionFactory = sessionFactory ;
@@ -163,23 +188,24 @@ public void setSessionFactory(McpServerSession.Factory sessionFactory) {
163188 * errors if any session fails to receive the message
164189 */
165190 @ Override
166- public Mono <Void > notifyClients (String method , Map < String , Object > params ) {
191+ public Mono <Void > notifyClients (String method , Object params ) {
167192 if (sessions .isEmpty ()) {
168193 logger .debug ("No active sessions to broadcast message to" );
169194 return Mono .empty ();
170195 }
171196
172197 logger .debug ("Attempting to broadcast message to {} active sessions" , sessions .size ());
173198
174- return Flux .fromStream (sessions .values (). stream ())
199+ return Flux .fromIterable (sessions .values ())
175200 .flatMap (session -> session .sendNotification (method , params )
176- .doOnError (e -> logger . error ( "Failed to " + "send message to session " + "{}: {}" , session . getId (),
177- e .getMessage ()))
201+ .doOnError (
202+ e -> logger . error ( "Failed to send message to session {}: {}" , session . getId (), e .getMessage ()))
178203 .onErrorComplete ())
179204 .then ();
180205 }
181206
182- // FIXME: This javadoc makes claims about using isClosing flag but it's not actually
207+ // FIXME: This javadoc makes claims about using isClosing flag but it's not
208+ // actually
183209 // doing that.
184210 /**
185211 * Initiates a graceful shutdown of all the sessions. This method ensures all active
@@ -245,7 +271,7 @@ private Mono<ServerResponse> handleSseConnection(ServerRequest request) {
245271 logger .debug ("Sending initial endpoint event to session: {}" , sessionId );
246272 sink .next (ServerSentEvent .builder ()
247273 .event (ENDPOINT_EVENT_TYPE )
248- .data (messageEndpoint + "?sessionId=" + sessionId )
274+ .data (this . baseUrl + this . messageEndpoint + "?sessionId=" + sessionId )
249275 .build ());
250276 sink .onCancel (() -> {
251277 logger .debug ("Session {} cancelled" , sessionId );
@@ -280,6 +306,11 @@ private Mono<ServerResponse> handleMessage(ServerRequest request) {
280306
281307 McpServerSession session = sessions .get (request .queryParam ("sessionId" ).get ());
282308
309+ if (session == null ) {
310+ return ServerResponse .status (HttpStatus .NOT_FOUND )
311+ .bodyValue (new McpError ("Session not found: " + request .queryParam ("sessionId" ).get ()));
312+ }
313+
283314 return request .bodyToMono (String .class ).flatMap (body -> {
284315 try {
285316 McpSchema .JSONRPCMessage message = McpSchema .deserializeJsonRpcMessage (objectMapper , body );
@@ -360,6 +391,8 @@ public static class Builder {
360391
361392 private ObjectMapper objectMapper ;
362393
394+ private String baseUrl = DEFAULT_BASE_URL ;
395+
363396 private String messageEndpoint ;
364397
365398 private String sseEndpoint = DEFAULT_SSE_ENDPOINT ;
@@ -377,6 +410,19 @@ public Builder objectMapper(ObjectMapper objectMapper) {
377410 return this ;
378411 }
379412
413+ /**
414+ * Sets the project basePath as endpoint prefix where clients should send their
415+ * JSON-RPC messages
416+ * @param baseUrl the message basePath . Must not be null.
417+ * @return this builder instance
418+ * @throws IllegalArgumentException if basePath is null
419+ */
420+ public Builder basePath (String baseUrl ) {
421+ Assert .notNull (baseUrl , "basePath must not be null" );
422+ this .baseUrl = baseUrl ;
423+ return this ;
424+ }
425+
380426 /**
381427 * Sets the endpoint URI where clients should send their JSON-RPC messages.
382428 * @param messageEndpoint The message endpoint URI. Must not be null.
@@ -411,7 +457,7 @@ public WebFluxSseServerTransportProvider build() {
411457 Assert .notNull (objectMapper , "ObjectMapper must be set" );
412458 Assert .notNull (messageEndpoint , "Message endpoint must be set" );
413459
414- return new WebFluxSseServerTransportProvider (objectMapper , messageEndpoint , sseEndpoint );
460+ return new WebFluxSseServerTransportProvider (objectMapper , baseUrl , messageEndpoint , sseEndpoint );
415461 }
416462
417463 }
0 commit comments