1515import org .springframework .http .MediaType ;
1616import org .springframework .http .codec .ServerSentEvent ;
1717import org .springframework .web .reactive .function .client .WebClient ;
18+ import org .springframework .web .reactive .function .client .WebClientResponseException ;
1819import reactor .core .Disposable ;
19- import reactor .core .Disposables ;
2020import reactor .core .publisher .Flux ;
2121import reactor .core .publisher .Mono ;
2222import reactor .util .context .ContextView ;
2626import java .io .IOException ;
2727import java .util .List ;
2828import java .util .Optional ;
29- import java .util .concurrent .atomic .AtomicBoolean ;
3029import java .util .concurrent .atomic .AtomicLong ;
3130import java .util .concurrent .atomic .AtomicReference ;
3231import java .util .function .Consumer ;
@@ -52,10 +51,10 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
5251
5352 private final boolean resumableStreams ;
5453
55- private AtomicReference <Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >>> handler = new AtomicReference <>();
56-
5754 private final AtomicReference <McpTransportSession > activeSession = new AtomicReference <>();
5855
56+ private final AtomicReference <Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >>> handler = new AtomicReference <>();
57+
5958 private final AtomicReference <Consumer <Throwable >> exceptionHandler = new AtomicReference <>();
6059
6160 public WebClientStreamableHttpTransport (ObjectMapper objectMapper , WebClient .Builder webClientBuilder ,
@@ -88,6 +87,11 @@ public void registerExceptionHandler(Consumer<Throwable> handler) {
8887
8988 private void handleException (Throwable t ) {
9089 logger .debug ("Handling exception for session {}" , activeSession .get ().sessionId (), t );
90+ if (t instanceof McpSessionNotFoundException ) {
91+ McpTransportSession invalidSession = this .activeSession .getAndSet (new McpTransportSession ());
92+ logger .warn ("Server does not recognize session {}. Invalidating." , invalidSession .sessionId ());
93+ invalidSession .close ();
94+ }
9195 Consumer <Throwable > handler = this .exceptionHandler .get ();
9296 if (handler != null ) {
9397 handler .accept (t );
@@ -106,6 +110,8 @@ public Mono<Void> closeGracefully() {
106110 });
107111 }
108112
113+ // FIXME: Avoid passing the ContextView - add hook allowing the Reactor Context to be
114+ // attached to the chain?
109115 private void reconnect (McpStream stream , ContextView ctx ) {
110116 if (stream != null ) {
111117 logger .debug ("Reconnecting stream {} with lastId {}" , stream .streamId (), stream .lastId ());
@@ -273,8 +279,7 @@ else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
273279 else {
274280 logger .warn ("Unknown media type {} returned for POST in session {}" , contentType ,
275281 transportSession .sessionId ());
276- sink .error (new RuntimeException ("Unknown media type returned: " + contentType ));
277- return Flux .empty ();
282+ return Flux .error (new RuntimeException ("Unknown media type returned: " + contentType ));
278283 }
279284 }
280285 else {
@@ -283,20 +288,45 @@ else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
283288
284289 McpSessionNotFoundException notFoundException = new McpSessionNotFoundException (
285290 transportSession .sessionId ());
286- // inform the caller of sendMessage
287- sink .error (notFoundException );
288291 // inform the stream/connection subscriber
289292 return Flux .error (notFoundException );
290293 }
291- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
292- sink .error (new RuntimeException ("Sending request failed" , e ));
294+ return response .<McpSchema .JSONRPCMessage >createError ().onErrorResume (e -> {
295+ WebClientResponseException responseException = (WebClientResponseException ) e ;
296+ byte [] body = responseException .getResponseBodyAsByteArray ();
297+ McpSchema .JSONRPCResponse .JSONRPCError jsonRpcError = null ;
298+ Exception toPropagate ;
299+ try {
300+ McpSchema .JSONRPCResponse jsonRpcResponse = objectMapper .readValue (body ,
301+ McpSchema .JSONRPCResponse .class );
302+ jsonRpcError = jsonRpcResponse .error ();
303+ toPropagate = new McpError (jsonRpcError );
304+ }
305+ catch (IOException ex ) {
306+ toPropagate = new RuntimeException ("Sending request failed" , e );
307+ logger .debug ("Received content together with {} HTTP code response: {}" ,
308+ response .statusCode (), body );
309+ }
310+
311+ // Some implementations can return 400 when presented with a
312+ // session id that it doesn't know about, so we will
313+ // invalidate the session
314+ // https://github.com/modelcontextprotocol/typescript-sdk/issues/389
315+ if (responseException .getStatusCode ().isSameCodeAs (HttpStatus .BAD_REQUEST )) {
316+ return Mono .error (new McpSessionNotFoundException (this .activeSession .get ().sessionId (),
317+ toPropagate ));
318+ }
319+ return Mono .empty ();
293320 }).flux ();
294321 }
295322 })
296323 .map (Mono ::just )
297324 .flatMap (this .handler .get ())
298325 .onErrorResume (t -> {
326+ // handle the error first
299327 this .handleException (t );
328+
329+ // inform the caller of sendMessage
300330 sink .error (t );
301331 return Flux .empty ();
302332 })
@@ -321,7 +351,8 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
321351 private Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >> parse (ServerSentEvent <String > event ) {
322352 if (MESSAGE_EVENT_TYPE .equals (event .event ())) {
323353 try {
324- // TODO: support batching?
354+ // We don't support batching ATM and probably won't since the next version
355+ // considers removing it.
325356 McpSchema .JSONRPCMessage message = McpSchema .deserializeJsonRpcMessage (this .objectMapper , event .data ());
326357 return Tuples .of (Optional .ofNullable (event .id ()), List .of (message ));
327358 }
@@ -340,6 +371,7 @@ private class McpStream {
340371
341372 private final AtomicReference <String > lastId = new AtomicReference <>();
342373
374+ // Used only for internal accounting
343375 private final long streamId ;
344376
345377 private final boolean resumable ;
@@ -360,8 +392,7 @@ long streamId() {
360392 Flux <McpSchema .JSONRPCMessage > consumeSseStream (
361393 Publisher <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> eventStream ) {
362394 return Flux .deferContextual (ctx -> Flux .from (eventStream ).doOnError (e -> {
363- // TODO: examine which error :)
364- if (resumable ) {
395+ if (resumable && !(e instanceof McpSessionNotFoundException )) {
365396 reconnect (this , ctx );
366397 }
367398 })
0 commit comments