@@ -44,8 +44,24 @@ public class StreamableHttpClientTransport implements McpClientTransport {
4444
4545 private static final Logger LOGGER = LoggerFactory .getLogger (StreamableHttpClientTransport .class );
4646
47+ private static final String DEFAULT_MCP_ENDPOINT = "/mcp" ;
48+
4749 private static final String MCP_SESSION_ID = "Mcp-Session-Id" ;
4850
51+ private static final String LAST_EVENT_ID = "Last-Event-ID" ;
52+
53+ private static final String ACCEPT = "Accept" ;
54+
55+ private static final String CONTENT_TYPE = "Content-Type" ;
56+
57+ private static final String APPLICATION_JSON = "application/json" ;
58+
59+ private static final String TEXT_EVENT_STREAM = "text/event-stream" ;
60+
61+ private static final String APPLICATION_JSON_SEQ = "application/json-seq" ;
62+
63+ private static final String DEFAULT_ACCEPT_VALUES = "%s, %s" .formatted (APPLICATION_JSON , TEXT_EVENT_STREAM );
64+
4965 private final HttpClientSseClientTransport sseClientTransport ;
5066
5167 private final HttpClient httpClient ;
@@ -107,7 +123,7 @@ public static class Builder {
107123
108124 private String baseUri ;
109125
110- private String endpoint = "/mcp" ;
126+ private String endpoint = DEFAULT_MCP_ENDPOINT ;
111127
112128 private Consumer <HttpClient .Builder > clientCustomizer ;
113129
@@ -156,7 +172,7 @@ public StreamableHttpClientTransport build() {
156172 builder .customizeRequest (requestCustomizer );
157173 }
158174
159- if (!endpoint .equals ("/mcp" )) {
175+ if (!endpoint .equals (DEFAULT_MCP_ENDPOINT )) {
160176 builder .sseEndpoint (endpoint );
161177 }
162178
@@ -177,20 +193,24 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
177193 }
178194
179195 return Mono .defer (() -> Mono .fromFuture (() -> {
180- final HttpRequest .Builder request = requestBuilder .copy ()
181- .GET ()
182- .header ("Accept" , "text/event-stream" )
183- .uri (uri );
196+ final HttpRequest .Builder request = requestBuilder .copy ().GET ().header (ACCEPT , TEXT_EVENT_STREAM ).uri (uri );
184197 final String lastId = lastEventId .get ();
185198 if (lastId != null ) {
186- request .header ("Last-Event-ID" , lastId );
199+ request .header (LAST_EVENT_ID , lastId );
187200 }
188201 if (mcpSessionId .get () != null ) {
189202 request .header (MCP_SESSION_ID , mcpSessionId .get ());
190203 }
191204
192205 return httpClient .sendAsync (request .build (), HttpResponse .BodyHandlers .ofInputStream ());
193206 }).flatMap (response -> {
207+ // must like server terminate session and the client need to start a
208+ // new session by sending a new `InitializeRequest` without a session
209+ // ID attached.
210+ if (mcpSessionId .get () != null && response .statusCode () == 404 ) {
211+ mcpSessionId .set (null );
212+ }
213+
194214 if (response .statusCode () == 405 || response .statusCode () == 404 ) {
195215 LOGGER .warn ("Operation not allowed, falling back to SSE" );
196216 fallbackToSse .set (true );
@@ -226,8 +246,8 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
226246 return serializeJson (message ).flatMap (json -> {
227247 final HttpRequest .Builder request = requestBuilder .copy ()
228248 .POST (HttpRequest .BodyPublishers .ofString (json ))
229- .header ("Accept" , "application/json, text/event-stream" )
230- .header ("Content-Type" , "application/json" )
249+ .header (ACCEPT , DEFAULT_ACCEPT_VALUES )
250+ .header (CONTENT_TYPE , APPLICATION_JSON )
231251 .uri (uri );
232252 if (mcpSessionId .get () != null ) {
233253 request .header (MCP_SESSION_ID , mcpSessionId .get ());
@@ -238,7 +258,13 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
238258
239259 // server may assign a session ID at initialization time, if yes we
240260 // have to use it for any subsequent requests
241- response .headers ().firstValue (MCP_SESSION_ID ).map (String ::trim ).ifPresent (this .mcpSessionId ::set );
261+ if (message instanceof McpSchema .JSONRPCRequest
262+ && ((McpSchema .JSONRPCRequest ) message ).method ().equals (McpSchema .METHOD_INITIALIZE )) {
263+ response .headers ()
264+ .firstValue (MCP_SESSION_ID )
265+ .map (String ::trim )
266+ .ifPresent (this .mcpSessionId ::set );
267+ }
242268
243269 // If the response is 202 Accepted, there's no body to process
244270 if (response .statusCode () == 202 ) {
@@ -296,19 +322,17 @@ private Mono<String> serializeJson(final McpSchema.JSONRPCMessage msg) {
296322
297323 private Mono <Void > handleStreamingResponse (final HttpResponse <InputStream > response ,
298324 final Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >> handler ) {
299- final String contentType = response .headers ().firstValue ("Content-Type" ).orElse ("" );
300- if (contentType .contains ("application/json-seq" )) {
325+ final String contentType = response .headers ().firstValue (CONTENT_TYPE ).orElse ("" );
326+ if (contentType .contains (APPLICATION_JSON_SEQ )) {
301327 return handleJsonStream (response , handler );
302328 }
303- else if (contentType .contains ("text/event-stream" )) {
329+ else if (contentType .contains (TEXT_EVENT_STREAM )) {
304330 return handleSseStream (response , handler );
305331 }
306- else if (contentType .contains ("application/json" )) {
332+ else if (contentType .contains (APPLICATION_JSON )) {
307333 return handleSingleJson (response , handler );
308334 }
309- else {
310- return Mono .error (new UnsupportedOperationException ("Unsupported Content-Type: " + contentType ));
311- }
335+ return Mono .error (new UnsupportedOperationException ("Unsupported Content-Type: " + contentType ));
312336 }
313337
314338 private Mono <Void > handleSingleJson (final HttpResponse <InputStream > response ,
@@ -381,7 +405,7 @@ else if (node.isObject()) {
381405 }
382406 else {
383407 String warning = "Unexpected JSON in SSE data: " + rawData ;
384- LOGGER .warn ("Unexpected JSON in SSE data: {}" , rawData );
408+ LOGGER .warn (warning );
385409 return Mono .error (new IllegalArgumentException (warning ));
386410 }
387411
0 commit comments