99
1010import com .fasterxml .jackson .core .type .TypeReference ;
1111import com .fasterxml .jackson .databind .ObjectMapper ;
12- import io .modelcontextprotocol .spec .ClientMcpTransport ;
12+ import io .modelcontextprotocol .spec .McpClientTransport ;
1313import io .modelcontextprotocol .spec .McpError ;
1414import io .modelcontextprotocol .spec .McpSchema ;
1515import io .modelcontextprotocol .spec .McpSchema .JSONRPCMessage ;
5858 * "https://spec.modelcontextprotocol.io/specification/basic/transports/#http-with-sse">MCP
5959 * HTTP with SSE Transport Specification</a>
6060 */
61- public class WebFluxSseClientTransport implements ClientMcpTransport {
61+ public class WebFluxSseClientTransport implements McpClientTransport {
6262
6363 private static final Logger logger = LoggerFactory .getLogger (WebFluxSseClientTransport .class );
6464
@@ -79,7 +79,7 @@ public class WebFluxSseClientTransport implements ClientMcpTransport {
7979 * Default SSE endpoint path as specified by the MCP transport specification. This
8080 * endpoint is used to establish the SSE connection with the server.
8181 */
82- private static final String SSE_ENDPOINT = "/sse" ;
82+ private static final String DEFAULT_SSE_ENDPOINT = "/sse" ;
8383
8484 /**
8585 * Type reference for parsing SSE events containing string data.
@@ -117,6 +117,12 @@ public class WebFluxSseClientTransport implements ClientMcpTransport {
117117 */
118118 protected final Sinks .One <String > messageEndpointSink = Sinks .one ();
119119
120+ /**
121+ * The SSE endpoint URI provided by the server. Used for sending outbound messages via
122+ * HTTP POST requests.
123+ */
124+ private String sseEndpoint ;
125+
120126 /**
121127 * Constructs a new SseClientTransport with the specified WebClient builder. Uses a
122128 * default ObjectMapper instance for JSON processing.
@@ -137,11 +143,27 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
137143 * @throws IllegalArgumentException if either parameter is null
138144 */
139145 public WebFluxSseClientTransport (WebClient .Builder webClientBuilder , ObjectMapper objectMapper ) {
146+ this (webClientBuilder , objectMapper , DEFAULT_SSE_ENDPOINT );
147+ }
148+
149+ /**
150+ * Constructs a new SseClientTransport with the specified WebClient builder and
151+ * ObjectMapper. Initializes both inbound and outbound message processing pipelines.
152+ * @param webClientBuilder the WebClient.Builder to use for creating the WebClient
153+ * instance
154+ * @param objectMapper the ObjectMapper to use for JSON processing
155+ * @param sseEndpoint the SSE endpoint URI to use for establishing the connection
156+ * @throws IllegalArgumentException if either parameter is null
157+ */
158+ public WebFluxSseClientTransport (WebClient .Builder webClientBuilder , ObjectMapper objectMapper ,
159+ String sseEndpoint ) {
140160 Assert .notNull (objectMapper , "ObjectMapper must not be null" );
141161 Assert .notNull (webClientBuilder , "WebClient.Builder must not be null" );
162+ Assert .hasText (sseEndpoint , "SSE endpoint must not be null or empty" );
142163
143164 this .objectMapper = objectMapper ;
144165 this .webClient = webClientBuilder .build ();
166+ this .sseEndpoint = sseEndpoint ;
145167 }
146168
147169 /**
@@ -254,7 +276,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
254276 protected Flux <ServerSentEvent <String >> eventStream () {// @formatter:off
255277 return this .webClient
256278 .get ()
257- .uri (SSE_ENDPOINT )
279+ .uri (this . sseEndpoint )
258280 .accept (MediaType .TEXT_EVENT_STREAM )
259281 .retrieve ()
260282 .bodyToFlux (SSE_TYPE )
@@ -321,4 +343,66 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
321343 return this .objectMapper .convertValue (data , typeRef );
322344 }
323345
346+ /**
347+ * Creates a new builder for {@link WebFluxSseClientTransport}.
348+ * @param webClientBuilder the WebClient.Builder to use for creating the WebClient
349+ * instance
350+ * @return a new builder instance
351+ */
352+ public static Builder builder (WebClient .Builder webClientBuilder ) {
353+ return new Builder (webClientBuilder );
354+ }
355+
356+ /**
357+ * Builder for {@link WebFluxSseClientTransport}.
358+ */
359+ public static class Builder {
360+
361+ private final WebClient .Builder webClientBuilder ;
362+
363+ private String sseEndpoint = DEFAULT_SSE_ENDPOINT ;
364+
365+ private ObjectMapper objectMapper = new ObjectMapper ();
366+
367+ /**
368+ * Creates a new builder with the specified WebClient.Builder.
369+ * @param webClientBuilder the WebClient.Builder to use
370+ */
371+ public Builder (WebClient .Builder webClientBuilder ) {
372+ Assert .notNull (webClientBuilder , "WebClient.Builder must not be null" );
373+ this .webClientBuilder = webClientBuilder ;
374+ }
375+
376+ /**
377+ * Sets the SSE endpoint path.
378+ * @param sseEndpoint the SSE endpoint path
379+ * @return this builder
380+ */
381+ public Builder sseEndpoint (String sseEndpoint ) {
382+ Assert .hasText (sseEndpoint , "sseEndpoint must not be empty" );
383+ this .sseEndpoint = sseEndpoint ;
384+ return this ;
385+ }
386+
387+ /**
388+ * Sets the object mapper for JSON serialization/deserialization.
389+ * @param objectMapper the object mapper
390+ * @return this builder
391+ */
392+ public Builder objectMapper (ObjectMapper objectMapper ) {
393+ Assert .notNull (objectMapper , "objectMapper must not be null" );
394+ this .objectMapper = objectMapper ;
395+ return this ;
396+ }
397+
398+ /**
399+ * Builds a new {@link WebFluxSseClientTransport} instance.
400+ * @return a new transport instance
401+ */
402+ public WebFluxSseClientTransport build () {
403+ return new WebFluxSseClientTransport (webClientBuilder , objectMapper , sseEndpoint );
404+ }
405+
406+ }
407+
324408}
0 commit comments