22
33import com .fasterxml .jackson .core .type .TypeReference ;
44import com .fasterxml .jackson .databind .ObjectMapper ;
5+ import io .modelcontextprotocol .spec .DefaultMcpTransportSession ;
56import io .modelcontextprotocol .spec .McpClientTransport ;
67import io .modelcontextprotocol .spec .McpError ;
78import io .modelcontextprotocol .spec .McpSchema ;
@@ -51,20 +52,21 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
5152
5253 private final boolean resumableStreams ;
5354
54- private final AtomicReference <McpTransportSession > activeSession = new AtomicReference <>();
55+ private final AtomicReference <DefaultMcpTransportSession > activeSession = new AtomicReference <>();
5556
5657 private final AtomicReference <Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >>> handler = new AtomicReference <>();
5758
5859 private final AtomicReference <Consumer <Throwable >> exceptionHandler = new AtomicReference <>();
5960
61+ // TODO: builder
6062 public WebClientStreamableHttpTransport (ObjectMapper objectMapper , WebClient .Builder webClientBuilder ,
6163 String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
6264 this .objectMapper = objectMapper ;
6365 this .webClient = webClientBuilder .build ();
6466 this .endpoint = endpoint ;
6567 this .resumableStreams = resumableStreams ;
6668 this .openConnectionOnStartup = openConnectionOnStartup ;
67- this .activeSession .set (new McpTransportSession ());
69+ this .activeSession .set (new DefaultMcpTransportSession ());
6870 }
6971
7072 @ Override
@@ -80,15 +82,15 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
8082 }
8183
8284 @ Override
83- public void registerExceptionHandler (Consumer <Throwable > handler ) {
85+ public void setExceptionHandler (Consumer <Throwable > handler ) {
8486 logger .debug ("Exception handler registered" );
8587 this .exceptionHandler .set (handler );
8688 }
8789
8890 private void handleException (Throwable t ) {
89- logger .debug ("Handling exception for session {}" , activeSession .get (). sessionId ( ), t );
91+ logger .debug ("Handling exception for session {}" , sessionIdRepresentation ( this . activeSession .get ()), t );
9092 if (t instanceof McpSessionNotFoundException ) {
91- McpTransportSession invalidSession = this .activeSession .getAndSet (new McpTransportSession ());
93+ McpTransportSession <?> invalidSession = this .activeSession .getAndSet (new DefaultMcpTransportSession ());
9294 logger .warn ("Server does not recognize session {}. Invalidating." , invalidSession .sessionId ());
9395 invalidSession .close ();
9496 }
@@ -102,7 +104,7 @@ private void handleException(Throwable t) {
102104 public Mono <Void > closeGracefully () {
103105 return Mono .defer (() -> {
104106 logger .debug ("Graceful close triggered" );
105- McpTransportSession currentSession = this .activeSession .get ();
107+ DefaultMcpTransportSession currentSession = this .activeSession .get ();
106108 if (currentSession != null ) {
107109 return currentSession .closeGracefully ();
108110 }
@@ -125,16 +127,14 @@ private void reconnect(McpStream stream, ContextView ctx) {
125127 // listen for messages.
126128 // If it doesn't, nothing actually happens here, that's just the way it is...
127129 final AtomicReference <Disposable > disposableRef = new AtomicReference <>();
128- final McpTransportSession transportSession = this .activeSession .get ();
130+ final McpTransportSession < Disposable > transportSession = this .activeSession .get ();
129131 Disposable connection = webClient .get ()
130132 .uri (this .endpoint )
131133 .accept (MediaType .TEXT_EVENT_STREAM )
132134 .headers (httpHeaders -> {
133- if (transportSession .sessionId () != null ) {
134- httpHeaders .add ("mcp-session-id" , transportSession .sessionId ());
135- }
136- if (stream != null && stream .lastId () != null ) {
137- httpHeaders .add ("last-event-id" , stream .lastId ());
135+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add ("mcp-session-id" , id ));
136+ if (stream != null ) {
137+ stream .lastId ().ifPresent (id -> httpHeaders .add ("last-event-id" , id ));
138138 }
139139 })
140140 .exchangeToFlux (response -> {
@@ -161,7 +161,7 @@ else if (response.statusCode().isSameCodeAs(HttpStatus.NOT_FOUND)) {
161161 logger .warn ("Session {} was not found on the MCP server" , transportSession .sessionId ());
162162
163163 McpSessionNotFoundException notFoundException = new McpSessionNotFoundException (
164- transportSession . sessionId ( ));
164+ sessionIdRepresentation ( transportSession ));
165165 // inform the stream/connection subscriber
166166 return Flux .error (notFoundException );
167167 }
@@ -187,6 +187,10 @@ else if (response.statusCode().isSameCodeAs(HttpStatus.NOT_FOUND)) {
187187 transportSession .addConnection (connection );
188188 }
189189
190+ private static String sessionIdRepresentation (McpTransportSession <?> transportSession ) {
191+ return transportSession .sessionId ().orElse ("[missing_session_id]" );
192+ }
193+
190194 @ Override
191195 public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message ) {
192196 return Mono .create (sink -> {
@@ -197,15 +201,13 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
197201 // listen for messages.
198202 // If it doesn't, nothing actually happens here, that's just the way it is...
199203 final AtomicReference <Disposable > disposableRef = new AtomicReference <>();
200- final McpTransportSession transportSession = this .activeSession .get ();
204+ final McpTransportSession < Disposable > transportSession = this .activeSession .get ();
201205
202206 Disposable connection = webClient .post ()
203207 .uri (this .endpoint )
204208 .accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
205209 .headers (httpHeaders -> {
206- if (transportSession .sessionId () != null ) {
207- httpHeaders .add ("mcp-session-id" , transportSession .sessionId ());
208- }
210+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add ("mcp-session-id" , id ));
209211 })
210212 .bodyValue (message )
211213 .exchangeToFlux (response -> {
@@ -287,7 +289,7 @@ else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
287289 logger .warn ("Session {} was not found on the MCP server" , transportSession .sessionId ());
288290
289291 McpSessionNotFoundException notFoundException = new McpSessionNotFoundException (
290- transportSession . sessionId ( ));
292+ sessionIdRepresentation ( transportSession ));
291293 // inform the stream/connection subscriber
292294 return Flux .error (notFoundException );
293295 }
@@ -313,8 +315,8 @@ else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
313315 // invalidate the session
314316 // https://github.com/modelcontextprotocol/typescript-sdk/issues/389
315317 if (responseException .getStatusCode ().isSameCodeAs (HttpStatus .BAD_REQUEST )) {
316- return Mono .error (new McpSessionNotFoundException (this . activeSession . get (). sessionId (),
317- toPropagate ));
318+ return Mono .error (new McpSessionNotFoundException (
319+ sessionIdRepresentation ( this . activeSession . get ()), toPropagate ));
318320 }
319321 return Mono .empty ();
320322 }).flux ();
@@ -381,8 +383,8 @@ private class McpStream {
381383 this .resumable = resumable ;
382384 }
383385
384- String lastId () {
385- return this .lastId .get ();
386+ Optional < String > lastId () {
387+ return Optional . ofNullable ( this .lastId .get () );
386388 }
387389
388390 long streamId () {
@@ -395,9 +397,10 @@ Flux<McpSchema.JSONRPCMessage> consumeSseStream(
395397 if (resumable && !(e instanceof McpSessionNotFoundException )) {
396398 reconnect (this , ctx );
397399 }
398- })
399- .doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (this .lastId ::set ))
400- .flatMapIterable (Tuple2 ::getT2 ));
400+ }).doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (id -> {
401+ String previousId = this .lastId .getAndSet (id );
402+ logger .debug ("Updating last id {} -> {} for stream {}" , previousId , id , this .streamId );
403+ })).flatMapIterable (Tuple2 ::getT2 ));
401404 }
402405
403406 }
0 commit comments