@@ -69,11 +69,16 @@ public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bui
6969
7070 @ Override
7171 public Mono <Void > connect (Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >> handler ) {
72- if (this .openConnections .isDisposed ()) {
73- return Mono .error (new RuntimeException ("Transport already disposed" ));
74- }
75- this .handler .set (handler );
76- return openConnectionOnStartup ? startOrResumeSession (null ) : Mono .empty ();
72+ return Mono .deferContextual (ctx -> {
73+ if (this .openConnections .isDisposed ()) {
74+ return Mono .error (new RuntimeException ("Transport already disposed" ));
75+ }
76+ this .handler .set (handler );
77+ if (openConnectionOnStartup ) {
78+ this .reconnect (null , ctx );
79+ }
80+ return Mono .empty ();
81+ });
7782 }
7883
7984 @ Override
@@ -82,63 +87,58 @@ public Mono<Void> closeGracefully() {
8287 }
8388
8489 private void reconnect (McpStream stream , ContextView ctx ) {
85- Disposable connection = this .startOrResumeSession (stream ).contextWrite (ctx ).subscribe ();
86- this .openConnections .add (connection );
87- }
88-
89- private Mono <Void > startOrResumeSession (McpStream stream ) {
90- return Mono .create (sink -> {
91- // Here we attempt to initialize the client.
92- // In case the server supports SSE, we will establish a long-running session
93- // here and
94- // listen for messages.
95- // If it doesn't, nothing actually happens here, that's just the way it is...
96-
97- Disposable connection = webClient .get ()
98- .uri (this .endpoint )
99- .accept (MediaType .TEXT_EVENT_STREAM )
100- .headers (httpHeaders -> {
101- if (sessionId .get () != null ) {
102- httpHeaders .add ("mcp-session-id" , sessionId .get ());
103- }
104- if (stream != null && stream .lastId () != null ) {
105- httpHeaders .add ("last-event-id" , stream .lastId ());
106- }
107- })
108- .exchangeToFlux (response -> {
109- // Per spec, we are not checking whether it's 2xx, but only if the
110- // Accept header is proper.
111- if (response .headers ().contentType ().isPresent ()
112- && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
113-
114- sink .success ();
90+ // Here we attempt to initialize the client.
91+ // In case the server supports SSE, we will establish a long-running session
92+ // here and
93+ // listen for messages.
94+ // If it doesn't, nothing actually happens here, that's just the way it is...
95+ final AtomicReference <Disposable > disposableRef = new AtomicReference <>();
96+ Disposable connection = webClient .get ()
97+ .uri (this .endpoint )
98+ .accept (MediaType .TEXT_EVENT_STREAM )
99+ .headers (httpHeaders -> {
100+ if (sessionId .get () != null ) {
101+ httpHeaders .add ("mcp-session-id" , sessionId .get ());
102+ }
103+ if (stream != null && stream .lastId () != null ) {
104+ httpHeaders .add ("last-event-id" , stream .lastId ());
105+ }
106+ })
107+ .exchangeToFlux (response -> {
108+ // Per spec, we are not checking whether it's 2xx, but only if the
109+ // Accept header is proper.
110+ if (response .headers ().contentType ().isPresent ()
111+ && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
115112
116- McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
113+ McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
117114
118- Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
119- .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
120- })
121- .map (this ::parse );
115+ Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
116+ .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
117+ })
118+ .map (this ::parse );
122119
123- return sessionStream .consumeSseStream (idWithMessages );
124- }
125- else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
126- sink .success ();
127- logger .info ("The server does not support SSE streams, using request-response mode." );
128- return Flux .empty ();
129- }
130- else {
131- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
132- sink .error (new RuntimeException ("Connection on client startup failed" , e ));
133- }).flux ();
134- }
135- })
136- // TODO: Consider retries - examine cause to decide whether a retry is
137- // needed.
138- .contextWrite (sink .contextView ())
139- .subscribe ();
140- this .openConnections .add (connection );
141- });
120+ return sessionStream .consumeSseStream (idWithMessages );
121+ }
122+ else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
123+ logger .info ("The server does not support SSE streams, using request-response mode." );
124+ return Flux .empty ();
125+ }
126+ else {
127+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
128+ logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
129+ }).flux ();
130+ }
131+ })
132+ .doFinally (s -> {
133+ Disposable ref = disposableRef .getAndSet (null );
134+ if (ref != null ) {
135+ this .openConnections .remove (ref );
136+ }
137+ })
138+ .contextWrite (ctx )
139+ .subscribe ();
140+ disposableRef .set (connection );
141+ this .openConnections .add (connection );
142142 }
143143
144144 @ Override
@@ -150,6 +150,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
150150 // here and
151151 // listen for messages.
152152 // If it doesn't, nothing actually happens here, that's just the way it is...
153+ final AtomicReference <Disposable > disposableRef = new AtomicReference <>();
153154 Disposable connection = webClient .post ()
154155 .uri (this .endpoint )
155156 .accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
@@ -166,7 +167,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
166167 sessionId .set (response .headers ().asHttpHeaders ().getFirst ("mcp-session-id" ));
167168 // Once we have a session, we try to open an async stream for
168169 // the server to send notifications and requests out-of-band.
169- startOrResumeSession (null ). contextWrite ( sink .contextView ()). subscribe ( );
170+ reconnect (null , sink .contextView ());
170171 }
171172 }
172173
@@ -242,10 +243,15 @@ else if (contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
242243 })
243244 .map (Mono ::just )
244245 .flatMap (this .handler .get ())
245- // TODO: Consider retries - examine cause to decide whether a retry is
246- // needed.
246+ .doFinally (s -> {
247+ Disposable ref = disposableRef .getAndSet (null );
248+ if (ref != null ) {
249+ this .openConnections .remove (ref );
250+ }
251+ })
247252 .contextWrite (sink .contextView ())
248253 .subscribe ();
254+ disposableRef .set (connection );
249255 this .openConnections .add (connection );
250256 });
251257 }
@@ -295,10 +301,7 @@ Flux<McpSchema.JSONRPCMessage> consumeSseStream(
295301 return Flux .deferContextual (ctx -> Flux .from (eventStream ).doOnError (e -> {
296302 // TODO: examine which error :)
297303 if (resumable ) {
298- Disposable connection = WebClientStreamableHttpTransport .this .startOrResumeSession (this )
299- .contextWrite (ctx )
300- .subscribe ();
301- WebClientStreamableHttpTransport .this .openConnections .add (connection );
304+ reconnect (this , ctx );
302305 }
303306 })
304307 .doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (this .lastId ::set ))
0 commit comments