1616import java .util .function .Consumer ;
1717import java .util .function .Function ;
1818
19- import org .slf4j .Logger ;
20- import org .slf4j .LoggerFactory ;
2119import io .modelcontextprotocol .client .transport .ResponseSubscribers .ResponseEvent ;
2220import io .modelcontextprotocol .client .transport .customizer .McpAsyncHttpClientRequestCustomizer ;
2321import io .modelcontextprotocol .client .transport .customizer .McpSyncHttpClientRequestCustomizer ;
3331import io .modelcontextprotocol .spec .ProtocolVersions ;
3432import io .modelcontextprotocol .util .Assert ;
3533import io .modelcontextprotocol .util .Utils ;
34+ import org .slf4j .Logger ;
35+ import org .slf4j .LoggerFactory ;
3636import reactor .core .Disposable ;
3737import reactor .core .publisher .Flux ;
3838import reactor .core .publisher .Mono ;
@@ -117,6 +117,11 @@ public class HttpClientSseClientTransport implements McpClientTransport {
117117 */
118118 private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ;
119119
120+ /**
121+ * Validator for the message endpoint;
122+ */
123+ private final SseMessageEndpointValidator messageEndpointValidator ;
124+
120125 /**
121126 * Creates a new transport instance with custom HTTP client builder, object mapper,
122127 * and headers.
@@ -127,22 +132,26 @@ public class HttpClientSseClientTransport implements McpClientTransport {
127132 * @param jsonMapper the object mapper for JSON serialization/deserialization
128133 * @param httpRequestCustomizer customizer for the requestBuilder before executing
129134 * requests
135+ * @param messageEndpointValidator validator for the message endpoint
130136 * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
131137 */
132138 HttpClientSseClientTransport (HttpClient httpClient , HttpRequest .Builder requestBuilder , String baseUri ,
133- String sseEndpoint , McpJsonMapper jsonMapper , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ) {
139+ String sseEndpoint , McpJsonMapper jsonMapper , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ,
140+ SseMessageEndpointValidator messageEndpointValidator ) {
134141 Assert .notNull (jsonMapper , "jsonMapper must not be null" );
135142 Assert .hasText (baseUri , "baseUri must not be empty" );
136143 Assert .hasText (sseEndpoint , "sseEndpoint must not be empty" );
137144 Assert .notNull (httpClient , "httpClient must not be null" );
138145 Assert .notNull (requestBuilder , "requestBuilder must not be null" );
139146 Assert .notNull (httpRequestCustomizer , "httpRequestCustomizer must not be null" );
147+ Assert .notNull (messageEndpointValidator , "messageEndpointValidator must not be null" );
140148 this .baseUri = URI .create (baseUri );
141149 this .sseEndpoint = sseEndpoint ;
142150 this .jsonMapper = jsonMapper ;
143151 this .httpClient = httpClient ;
144152 this .requestBuilder = requestBuilder ;
145153 this .httpRequestCustomizer = httpRequestCustomizer ;
154+ this .messageEndpointValidator = messageEndpointValidator ;
146155 }
147156
148157 @ Override
@@ -178,6 +187,8 @@ public static class Builder {
178187
179188 private Duration connectTimeout = Duration .ofSeconds (10 );
180189
190+ private SseMessageEndpointValidator messageEndpointValidator = new DefaultSseMessageEndpointValidator ();
191+
181192 /**
182193 * Creates a new builder instance.
183194 */
@@ -308,14 +319,27 @@ public Builder connectTimeout(Duration connectTimeout) {
308319 return this ;
309320 }
310321
322+ /**
323+ * Sets the validator that ensure the message endpoint returned over the SSE
324+ * connection is valid.
325+ * @param messageEndpointValidator the validator
326+ * @return this builder
327+ */
328+ public Builder messageEndpointValidator (SseMessageEndpointValidator messageEndpointValidator ) {
329+ Assert .notNull (messageEndpointValidator , "messageEndpointValidator must not be null" );
330+ this .messageEndpointValidator = messageEndpointValidator ;
331+ return this ;
332+ }
333+
311334 /**
312335 * Builds a new {@link HttpClientSseClientTransport} instance.
313336 * @return a new transport instance
314337 */
315338 public HttpClientSseClientTransport build () {
316339 HttpClient httpClient = this .clientBuilder .connectTimeout (this .connectTimeout ).build ();
317340 return new HttpClientSseClientTransport (httpClient , requestBuilder , baseUri , sseEndpoint ,
318- jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper , httpRequestCustomizer );
341+ jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper , httpRequestCustomizer ,
342+ messageEndpointValidator );
319343 }
320344
321345 }
@@ -353,6 +377,14 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
353377 try {
354378 if (ENDPOINT_EVENT_TYPE .equals (responseEvent .sseEvent ().event ())) {
355379 String messageEndpointUri = responseEvent .sseEvent ().data ();
380+ try {
381+ messageEndpointValidator .validate (uri , messageEndpointUri );
382+ }
383+ catch (InvalidSseMessageEndpointException e ) {
384+ sink .error (e );
385+ this .messageEndpointSink .tryEmitError (e );
386+ return Flux .error (e );
387+ }
356388 if (this .messageEndpointSink .tryEmitValue (messageEndpointUri ).isSuccess ()) {
357389 sink .success ();
358390 return Flux .empty (); // No further processing needed
0 commit comments