@@ -41,24 +41,25 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
4141 private static final String MESSAGE_EVENT_TYPE = "message" ;
4242
4343 private final ObjectMapper objectMapper ;
44+
4445 private final WebClient webClient ;
46+
4547 private final String endpoint ;
48+
4649 private final boolean openConnectionOnStartup ;
50+
4751 private final boolean resumableStreams ;
4852
49- private AtomicReference <Function <Mono <McpSchema .JSONRPCMessage >,
50- Mono <McpSchema .JSONRPCMessage >>> handler = new AtomicReference <>();
53+ private AtomicReference <Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >>> handler = new AtomicReference <>();
5154
5255 private final Disposable .Composite openConnections = Disposables .composite ();
56+
5357 private final AtomicBoolean initialized = new AtomicBoolean ();
58+
5459 private final AtomicReference <String > sessionId = new AtomicReference <>();
5560
56- public WebClientStreamableHttpTransport (
57- ObjectMapper objectMapper ,
58- WebClient .Builder webClientBuilder ,
59- String endpoint ,
60- boolean resumableStreams ,
61- boolean openConnectionOnStartup ) {
61+ public WebClientStreamableHttpTransport (ObjectMapper objectMapper , WebClient .Builder webClientBuilder ,
62+ String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
6263 this .objectMapper = objectMapper ;
6364 this .webClient = webClientBuilder .build ();
6465 this .endpoint = endpoint ;
@@ -81,57 +82,61 @@ public Mono<Void> closeGracefully() {
8182 }
8283
8384 private void reconnect (McpStream stream , ContextView ctx ) {
84- Disposable connection = this .startOrResumeSession (stream )
85- .contextWrite (ctx )
86- .subscribe ();
85+ Disposable connection = this .startOrResumeSession (stream ).contextWrite (ctx ).subscribe ();
8786 this .openConnections .add (connection );
8887 }
8988
9089 private Mono <Void > startOrResumeSession (McpStream stream ) {
9190 return Mono .create (sink -> {
9291 // Here we attempt to initialize the client.
93- // In case the server supports SSE, we will establish a long-running session here and
92+ // In case the server supports SSE, we will establish a long-running session
93+ // here and
9494 // listen for messages.
9595 // If it doesn't, nothing actually happens here, that's just the way it is...
9696
9797 Disposable connection = webClient .get ()
98- .uri (this .endpoint )
99- .accept (MediaType .TEXT_EVENT_STREAM )
100- .headers (httpHeaders -> {
101- if (sessionId .get () != null ) {
102- httpHeaders .add ("mcp-session-id" , sessionId .get ());
103- }
104- if (stream != null && stream .lastId () != null ) {
105- httpHeaders .add ("last-event-id" , stream .lastId ());
106- }
107- })
108- .exchangeToFlux (response -> {
109- // Per spec, we are not checking whether it's 2xx, but only if the Accept header is proper.
110- if (response .headers ().contentType ().isPresent ()
111- && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
112-
113- sink .success ();
114-
115- McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
116-
117- Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages =
118- response .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
119- }).map (this ::parse );
120-
121- return sessionStream .consumeSseStream (idWithMessages );
122- } else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
123- sink .success ();
124- logger .info ("The server does not support SSE streams, using request-response mode." );
125- return Flux .empty ();
126- } else {
127- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
128- sink .error (new RuntimeException ("Connection on client startup failed" , e ));
129- }).flux ();
130- }
131- })
132- // TODO: Consider retries - examine cause to decide whether a retry is needed.
133- .contextWrite (sink .contextView ())
134- .subscribe ();
98+ .uri (this .endpoint )
99+ .accept (MediaType .TEXT_EVENT_STREAM )
100+ .headers (httpHeaders -> {
101+ if (sessionId .get () != null ) {
102+ httpHeaders .add ("mcp-session-id" , sessionId .get ());
103+ }
104+ if (stream != null && stream .lastId () != null ) {
105+ httpHeaders .add ("last-event-id" , stream .lastId ());
106+ }
107+ })
108+ .exchangeToFlux (response -> {
109+ // Per spec, we are not checking whether it's 2xx, but only if the
110+ // Accept header is proper.
111+ if (response .headers ().contentType ().isPresent ()
112+ && response .headers ().contentType ().get ().isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
113+
114+ sink .success ();
115+
116+ McpStream sessionStream = stream != null ? stream : new McpStream (this .resumableStreams );
117+
118+ Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
119+ .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
120+ })
121+ .map (this ::parse );
122+
123+ return sessionStream .consumeSseStream (idWithMessages );
124+ }
125+ else if (response .statusCode ().isSameCodeAs (HttpStatus .METHOD_NOT_ALLOWED )) {
126+ sink .success ();
127+ logger .info ("The server does not support SSE streams, using request-response mode." );
128+ return Flux .empty ();
129+ }
130+ else {
131+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
132+ sink .error (new RuntimeException ("Connection on client startup failed" , e ));
133+ }).flux ();
134+ }
135+ })
136+ // TODO: Consider retries - examine cause to decide whether a retry is
137+ // needed.
138+ .contextWrite (sink .contextView ())
139+ .subscribe ();
135140 this .openConnections .add (connection );
136141 });
137142 }
@@ -141,92 +146,106 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
141146 return Mono .create (sink -> {
142147 System .out .println ("Sending message " + message );
143148 // Here we attempt to initialize the client.
144- // In case the server supports SSE, we will establish a long-running session here and
149+ // In case the server supports SSE, we will establish a long-running session
150+ // here and
145151 // listen for messages.
146152 // If it doesn't, nothing actually happens here, that's just the way it is...
147153 Disposable connection = webClient .post ()
148- .uri (this .endpoint )
149- .accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
150- .headers (httpHeaders -> {
151- if (sessionId .get () != null ) {
152- httpHeaders .add ("mcp-session-id" , sessionId .get ());
153- }
154- })
155- .bodyValue (message )
156- .exchangeToFlux (response -> {
157- // TODO: this goes into the request phase
158- if (!initialized .compareAndExchange (false , true )) {
159- if (!response .headers ().header ("mcp-session-id" ).isEmpty ()) {
160- sessionId .set (response .headers ().asHttpHeaders ().getFirst ("mcp-session-id" ));
161- // Once we have a session, we try to open an async stream for the server to send notifications and requests out-of-band.
162- startOrResumeSession (null )
163- .contextWrite (sink .contextView ())
164- .subscribe ();
165- }
166- }
167-
168- // The spec mentions only ACCEPTED, but the existing SDKs can return 200 OK for notifications
169- // if (!response.statusCode().isSameCodeAs(HttpStatus.ACCEPTED)) {
170- if (!response .statusCode ().is2xxSuccessful ()) {
171- if (response .statusCode ().isSameCodeAs (HttpStatus .NOT_FOUND )) {
172- logger .info ("Session {} was not found on the MCP server" , sessionId .get ());
173-
174- McpSessionNotFoundException notFoundException = new McpSessionNotFoundException ("Session " + sessionId .get () + " not found" );
175- // inform the caller of sendMessage
176- sink .error (notFoundException );
177- // inform the stream/connection subscriber
178- return Flux .error (notFoundException );
179- }
180- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
181- sink .error (new RuntimeException ("Sending request failed" , e ));
182- }).flux ();
154+ .uri (this .endpoint )
155+ .accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
156+ .headers (httpHeaders -> {
157+ if (sessionId .get () != null ) {
158+ httpHeaders .add ("mcp-session-id" , sessionId .get ());
159+ }
160+ })
161+ .bodyValue (message )
162+ .exchangeToFlux (response -> {
163+ // TODO: this goes into the request phase
164+ if (!initialized .compareAndExchange (false , true )) {
165+ if (!response .headers ().header ("mcp-session-id" ).isEmpty ()) {
166+ sessionId .set (response .headers ().asHttpHeaders ().getFirst ("mcp-session-id" ));
167+ // Once we have a session, we try to open an async stream for
168+ // the server to send notifications and requests out-of-band.
169+ startOrResumeSession (null ).contextWrite (sink .contextView ()).subscribe ();
183170 }
184-
185- // Existing SDKs consume notifications with no response body nor content type
186- if (response .headers ().contentType ().isEmpty ()) {
187- sink .success ();
188- return Flux .empty ();
189- // return response.<McpSchema.JSONRPCMessage>createError().doOnError(e -> {
190- //// sink.error(new RuntimeException("Response has no content type"));
191- // }).flux();
171+ }
172+
173+ // The spec mentions only ACCEPTED, but the existing SDKs can return
174+ // 200 OK for notifications
175+ // if (!response.statusCode().isSameCodeAs(HttpStatus.ACCEPTED)) {
176+ if (!response .statusCode ().is2xxSuccessful ()) {
177+ if (response .statusCode ().isSameCodeAs (HttpStatus .NOT_FOUND )) {
178+ logger .info ("Session {} was not found on the MCP server" , sessionId .get ());
179+
180+ McpSessionNotFoundException notFoundException = new McpSessionNotFoundException (
181+ "Session " + sessionId .get () + " not found" );
182+ // inform the caller of sendMessage
183+ sink .error (notFoundException );
184+ // inform the stream/connection subscriber
185+ return Flux .error (notFoundException );
192186 }
193-
194- MediaType contentType = response .headers ().contentType ().get ();
195-
196- if (contentType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
197- sink .success ();
198- McpStream sessionStream = new McpStream (this .resumableStreams );
199-
200- Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages =
201- response .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
202- }).map (this ::parse );
203-
204- return sessionStream .consumeSseStream (idWithMessages );
205- } else if (contentType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
206- sink .success ();
207- // return response.bodyToMono(new ParameterizedTypeReference<Iterable<McpSchema.JSONRPCMessage>>() {});
208- return response .bodyToMono (String .class )
209- .<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
210- try {
211- McpSchema .JSONRPCMessage jsonRpcResponse = McpSchema .deserializeJsonRpcMessage (objectMapper , responseMessage );
212- s .next (List .of (jsonRpcResponse ));
213- } catch (IOException e ) {
214- s .error (e );
215- }
187+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
188+ sink .error (new RuntimeException ("Sending request failed" , e ));
189+ }).flux ();
190+ }
191+
192+ // Existing SDKs consume notifications with no response body nor
193+ // content type
194+ if (response .headers ().contentType ().isEmpty ()) {
195+ sink .success ();
196+ return Flux .empty ();
197+ // return
198+ // response.<McpSchema.JSONRPCMessage>createError().doOnError(e ->
199+ // {
200+ //// sink.error(new RuntimeException("Response has no content
201+ // type"));
202+ // }).flux();
203+ }
204+
205+ MediaType contentType = response .headers ().contentType ().get ();
206+
207+ if (contentType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
208+ sink .success ();
209+ McpStream sessionStream = new McpStream (this .resumableStreams );
210+
211+ Flux <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> idWithMessages = response
212+ .bodyToFlux (new ParameterizedTypeReference <ServerSentEvent <String >>() {
216213 })
217- .flatMapIterable (Function .identity ());
218- // .map(Mono::just)
219- // .flatMap(this.handler.get());
220- } else {
221- sink .error (new RuntimeException ("Unknown media type" ));
222- return Flux .empty ();
223- }
224- })
225- .map (Mono ::just )
226- .flatMap (this .handler .get ())
227- // TODO: Consider retries - examine cause to decide whether a retry is needed.
228- .contextWrite (sink .contextView ())
229- .subscribe ();
214+ .map (this ::parse );
215+
216+ return sessionStream .consumeSseStream (idWithMessages );
217+ }
218+ else if (contentType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
219+ sink .success ();
220+ // return response.bodyToMono(new
221+ // ParameterizedTypeReference<Iterable<McpSchema.JSONRPCMessage>>()
222+ // {});
223+ return response .bodyToMono (
224+ String .class ).<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
225+ try {
226+ McpSchema .JSONRPCMessage jsonRpcResponse = McpSchema
227+ .deserializeJsonRpcMessage (objectMapper , responseMessage );
228+ s .next (List .of (jsonRpcResponse ));
229+ }
230+ catch (IOException e ) {
231+ s .error (e );
232+ }
233+ })
234+ .flatMapIterable (Function .identity ());
235+ // .map(Mono::just)
236+ // .flatMap(this.handler.get());
237+ }
238+ else {
239+ sink .error (new RuntimeException ("Unknown media type" ));
240+ return Flux .empty ();
241+ }
242+ })
243+ .map (Mono ::just )
244+ .flatMap (this .handler .get ())
245+ // TODO: Consider retries - examine cause to decide whether a retry is
246+ // needed.
247+ .contextWrite (sink .contextView ())
248+ .subscribe ();
230249 this .openConnections .add (connection );
231250 });
232251 }
@@ -259,33 +278,33 @@ private class McpStream {
259278 private final AtomicReference <String > lastId = new AtomicReference <>();
260279
261280 private final long streamId ;
281+
262282 private final boolean resumable ;
263283
264- McpStream (boolean resumable ) {
265- this .streamId = counter .getAndIncrement ();
284+ McpStream (boolean resumable ) {
285+ this .streamId = counter .getAndIncrement ();
266286 this .resumable = resumable ;
267287 }
268288
269289 String lastId () {
270290 return this .lastId .get ();
271291 }
272292
273- Flux <McpSchema .JSONRPCMessage > consumeSseStream (Publisher <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> eventStream ) {
274- return Flux .deferContextual (ctx ->
275- Flux .from (eventStream )
276- .doOnError (e -> {
277- // TODO: examine which error :)
278- if (resumable ) {
279- Disposable connection = WebClientStreamableHttpTransport .this .startOrResumeSession (this )
280- .contextWrite (ctx )
281- .subscribe ();
282- WebClientStreamableHttpTransport .this .openConnections .add (connection );
283- }
284- })
285- .doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (this .lastId ::set ))
286- .flatMapIterable (Tuple2 ::getT2 )
287- );
293+ Flux <McpSchema .JSONRPCMessage > consumeSseStream (
294+ Publisher <Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >>> eventStream ) {
295+ return Flux .deferContextual (ctx -> Flux .from (eventStream ).doOnError (e -> {
296+ // TODO: examine which error :)
297+ if (resumable ) {
298+ Disposable connection = WebClientStreamableHttpTransport .this .startOrResumeSession (this )
299+ .contextWrite (ctx )
300+ .subscribe ();
301+ WebClientStreamableHttpTransport .this .openConnections .add (connection );
302+ }
303+ })
304+ .doOnNext (idAndMessage -> idAndMessage .getT1 ().ifPresent (this .lastId ::set ))
305+ .flatMapIterable (Tuple2 ::getT2 ));
288306 }
289307
290308 }
309+
291310}
0 commit comments