Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class WebFluxSseClientTransport implements McpClientTransport {
* Default SSE endpoint path as specified by the MCP transport specification. This
* endpoint is used to establish the SSE connection with the server.
*/
private static final String SSE_ENDPOINT = "/sse";
private static final String DEFAULT_SSE_ENDPOINT = "/sse";

/**
* Type reference for parsing SSE events containing string data.
Expand All @@ -92,6 +92,10 @@ public class WebFluxSseClientTransport implements McpClientTransport {
* for establishing the SSE connection and sending outbound messages.
*/
private final WebClient webClient;
/**
* The SSE endpoint URI provided by the server.
*/
private final String sseEndpoint;

/**
* ObjectMapper for serializing outbound messages and deserializing inbound messages.
Expand Down Expand Up @@ -137,11 +141,26 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
* @throws IllegalArgumentException if either parameter is null
*/
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) {
this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT);
}

/**
* Constructs a new SseClientTransport with the specified WebClient builder and
* ObjectMapper. Initializes both inbound and outbound message processing pipelines.
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
* instance
* @param objectMapper the ObjectMapper to use for JSON processing
* @param sseEndpoint the SSE endpoint URI to use for establishing the connection
* @throws IllegalArgumentException if either parameter is null
*/
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper, String sseEndpoint) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");

this.objectMapper = objectMapper;
this.webClient = webClientBuilder.build();
this.sseEndpoint = sseEndpoint;
}

/**
Expand Down Expand Up @@ -254,7 +273,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
protected Flux<ServerSentEvent<String>> eventStream() {// @formatter:off
return this.webClient
.get()
.uri(SSE_ENDPOINT)
.uri(sseEndpoint)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(SSE_TYPE)
Expand Down