77import java .io .IOException ;
88import java .time .Duration ;
99import java .util .concurrent .ConcurrentHashMap ;
10+ import java .util .concurrent .locks .ReentrantLock ;
1011import java .util .function .Function ;
1112
1213import com .fasterxml .jackson .core .type .TypeReference ;
@@ -230,9 +231,6 @@ private ServerResponse handleGet(ServerRequest request) {
230231
231232 try {
232233 return ServerResponse .sse (sseBuilder -> {
233- sseBuilder .onComplete (() -> {
234- logger .debug ("SSE connection completed for session: {}" , sessionId );
235- });
236234 sseBuilder .onTimeout (() -> {
237235 logger .debug ("SSE connection timed out for session: {}" , sessionId );
238236 });
@@ -264,8 +262,11 @@ private ServerResponse handleGet(ServerRequest request) {
264262 // Establish new listening stream
265263 McpStreamableServerSession .McpStreamableServerSessionStream listeningStream = session
266264 .listeningStream (sessionTransport );
267- // Note: WebMVC SSE doesn't have onCancel, cleanup will happen in
268- // onComplete/onTimeout
265+
266+ sseBuilder .onComplete (() -> {
267+ logger .debug ("SSE connection completed for session: {}" , sessionId );
268+ listeningStream .close ();
269+ });
269270 }
270271 }, Duration .ZERO );
271272 }
@@ -415,13 +416,22 @@ private ServerResponse handleDelete(ServerRequest request) {
415416 /**
416417 * Implementation of McpStreamableServerTransport for WebMVC SSE sessions. This class
417418 * handles the transport-level communication for a specific client session.
419+ *
420+ * <p>
421+ * This class is thread-safe and uses a ReentrantLock to synchronize access to the
422+ * underlying SSE builder to prevent race conditions when multiple threads attempt to
423+ * send messages concurrently.
418424 */
419425 private class WebMvcStreamableMcpSessionTransport implements McpStreamableServerTransport {
420426
421427 private final String sessionId ;
422428
423429 private final SseBuilder sseBuilder ;
424430
431+ private final ReentrantLock lock = new ReentrantLock ();
432+
433+ private volatile boolean closed = false ;
434+
425435 /**
426436 * Creates a new session transport with the specified ID and SSE builder.
427437 * @param sessionId The unique identifier for this session
@@ -453,7 +463,18 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
453463 @ Override
454464 public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message , String messageId ) {
455465 return Mono .fromRunnable (() -> {
466+ if (this .closed ) {
467+ logger .debug ("Attempted to send message to closed session: {}" , this .sessionId );
468+ return ;
469+ }
470+
471+ this .lock .lock ();
456472 try {
473+ if (this .closed ) {
474+ logger .debug ("Session {} was closed during message send attempt" , this .sessionId );
475+ return ;
476+ }
477+
457478 String jsonText = objectMapper .writeValueAsString (message );
458479 this .sseBuilder .id (messageId != null ? messageId : this .sessionId )
459480 .event (MESSAGE_EVENT_TYPE )
@@ -462,7 +483,16 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId
462483 }
463484 catch (Exception e ) {
464485 logger .error ("Failed to send message to session {}: {}" , this .sessionId , e .getMessage ());
465- this .sseBuilder .error (e );
486+ try {
487+ this .sseBuilder .error (e );
488+ }
489+ catch (Exception errorException ) {
490+ logger .error ("Failed to send error to SSE builder for session {}: {}" , this .sessionId ,
491+ errorException .getMessage ());
492+ }
493+ }
494+ finally {
495+ this .lock .unlock ();
466496 }
467497 });
468498 }
@@ -486,14 +516,7 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
486516 @ Override
487517 public Mono <Void > closeGracefully () {
488518 return Mono .fromRunnable (() -> {
489- logger .debug ("Closing streamable session transport: {}" , this .sessionId );
490- try {
491- this .sseBuilder .complete ();
492- logger .debug ("Successfully completed SSE builder for session {}" , this .sessionId );
493- }
494- catch (Exception e ) {
495- logger .warn ("Failed to complete SSE builder for session {}: {}" , this .sessionId , e .getMessage ());
496- }
519+ WebMvcStreamableMcpSessionTransport .this .close ();
497520 });
498521 }
499522
@@ -502,13 +525,24 @@ public Mono<Void> closeGracefully() {
502525 */
503526 @ Override
504527 public void close () {
528+ this .lock .lock ();
505529 try {
530+ if (this .closed ) {
531+ logger .debug ("Session transport {} already closed" , this .sessionId );
532+ return ;
533+ }
534+
535+ this .closed = true ;
536+
506537 this .sseBuilder .complete ();
507538 logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
508539 }
509540 catch (Exception e ) {
510541 logger .warn ("Failed to complete SSE builder for session {}: {}" , sessionId , e .getMessage ());
511542 }
543+ finally {
544+ this .lock .unlock ();
545+ }
512546 }
513547
514548 }
0 commit comments