Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ public class WebFluxSseClientTransport implements McpClientTransport {
* endpoint is used to establish the SSE connection with the server.
*/
private static final String SSE_ENDPOINT = "/sse";

/**
* Custom sseEndpoint
*/
private final String sseEndpoint;
/**
* Type reference for parsing SSE events containing string data.
*/
Expand Down Expand Up @@ -137,13 +140,17 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
* @throws IllegalArgumentException if either parameter is null
*/
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) {
this(webClientBuilder,objectMapper,SSE_ENDPOINT);
}
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper,String sseEndpoint) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
Assert.notNull(sseEndpoint, "sseEndpoint must not be null");

this.objectMapper = objectMapper;
this.webClient = webClientBuilder.build();
this.sseEndpoint=sseEndpoint;
}

/**
* Establishes a connection to the MCP server using Server-Sent Events (SSE). This
* method initiates the SSE connection and sets up the message processing pipeline.
Expand Down Expand Up @@ -254,7 +261,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
protected Flux<ServerSentEvent<String>> eventStream() {// @formatter:off
return this.webClient
.get()
.uri(SSE_ENDPOINT)
.uri(this.sseEndpoint)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(SSE_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ public class HttpClientSseClientTransport implements McpClientTransport {

/** Default SSE endpoint path */
private static final String SSE_ENDPOINT = "/sse";

/**
* Custom sseEndPoint
*/
private final String sseEndpoint;
/** Base URI for the MCP server */
private final String baseUri;

Expand Down Expand Up @@ -110,15 +113,32 @@ public HttpClientSseClientTransport(String baseUri) {
* @throws IllegalArgumentException if objectMapper or clientBuilder is null
*/
public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, String baseUri, ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.notNull(clientBuilder, "clientBuilder must not be null");
this.baseUri = baseUri;
this.objectMapper = objectMapper;
this.httpClient = clientBuilder.connectTimeout(Duration.ofSeconds(10)).build();
this.sseClient = new FlowSseClient(this.httpClient);
this(clientBuilder, baseUri, SSE_ENDPOINT, objectMapper);
}

/**
* Creates a new transport instance with custom HTTP client builder and object mapper.
*
* @param clientBuilder the HTTP client builder to use
* @param baseUri the base URI of the MCP server
* @param objectMapper the object mapper for JSON serialization/deserialization
* @param clientBuilder the HTTP client builder to use
* @param baseUri the base URI of the MCP server
* @param sseEndpoint Custom sseEndPoint
* @param objectMapper the object mapper for JSON serialization/deserialization
* @throws IllegalArgumentException if objectMapper or clientBuilder is null
*/
public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, String baseUri, String sseEndpoint, ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.notNull(clientBuilder, "clientBuilder must not be null");
Assert.notNull(sseEndpoint, "sseEndPoint must not be null");
this.baseUri = baseUri;
this.objectMapper = objectMapper;
this.httpClient = clientBuilder.connectTimeout(Duration.ofSeconds(10)).build();
this.sseClient = new FlowSseClient(this.httpClient);
this.sseEndpoint = sseEndpoint;
}
/**
* Establishes the SSE connection with the server and sets up message handling.
*
Expand All @@ -137,7 +157,7 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
CompletableFuture<Void> future = new CompletableFuture<>();
connectionFuture.set(future);

sseClient.subscribe(this.baseUri + SSE_ENDPOINT, new FlowSseClient.SseEventHandler() {
sseClient.subscribe(this.baseUri + this.sseEndpoint, new FlowSseClient.SseEventHandler() {
@Override
public void onEvent(SseEvent event) {
if (isClosing) {
Expand Down