@@ -44,6 +44,8 @@ public class StreamableHttpClientTransport implements McpClientTransport {
4444
4545 private static final Logger LOGGER = LoggerFactory .getLogger (StreamableHttpClientTransport .class );
4646
47+ private static final String MCP_SESSION_ID = "Mcp-Session-Id" ;
48+
4749 private final HttpClientSseClientTransport sseClientTransport ;
4850
4951 private final HttpClient httpClient ;
@@ -58,6 +60,8 @@ public class StreamableHttpClientTransport implements McpClientTransport {
5860
5961 private final AtomicReference <String > lastEventId = new AtomicReference <>();
6062
63+ private final AtomicReference <String > mcpSessionId = new AtomicReference <>();
64+
6165 private final AtomicBoolean fallbackToSse = new AtomicBoolean (false );
6266
6367 StreamableHttpClientTransport (final HttpClient httpClient , final HttpRequest .Builder requestBuilder ,
@@ -173,15 +177,19 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
173177 }
174178
175179 return Mono .defer (() -> Mono .fromFuture (() -> {
176- final HttpRequest .Builder builder = requestBuilder .copy ()
180+ final HttpRequest .Builder request = requestBuilder .copy ()
177181 .GET ()
178182 .header ("Accept" , "text/event-stream" )
179183 .uri (uri );
180184 final String lastId = lastEventId .get ();
181185 if (lastId != null ) {
182- builder .header ("Last-Event-ID" , lastId );
186+ request .header ("Last-Event-ID" , lastId );
187+ }
188+ if (mcpSessionId .get () != null ) {
189+ request .header (MCP_SESSION_ID , mcpSessionId .get ());
183190 }
184- return httpClient .sendAsync (builder .build (), HttpResponse .BodyHandlers .ofInputStream ());
191+
192+ return httpClient .sendAsync (request .build (), HttpResponse .BodyHandlers .ofInputStream ());
185193 }).flatMap (response -> {
186194 if (response .statusCode () == 405 || response .statusCode () == 404 ) {
187195 LOGGER .warn ("Operation not allowed, falling back to SSE" );
@@ -216,20 +224,34 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
216224 }
217225
218226 return serializeJson (message ).flatMap (json -> {
219- final HttpRequest request = requestBuilder .copy ()
227+ final HttpRequest . Builder request = requestBuilder .copy ()
220228 .POST (HttpRequest .BodyPublishers .ofString (json ))
221229 .header ("Accept" , "application/json, text/event-stream" )
222230 .header ("Content-Type" , "application/json" )
223- .uri (uri )
224- .build ();
225- return Mono .fromFuture (httpClient .sendAsync (request , HttpResponse .BodyHandlers .ofInputStream ()))
231+ .uri (uri );
232+ if (mcpSessionId .get () != null ) {
233+ request .header (MCP_SESSION_ID , mcpSessionId .get ());
234+ }
235+
236+ return Mono .fromFuture (httpClient .sendAsync (request .build (), HttpResponse .BodyHandlers .ofInputStream ()))
226237 .flatMap (response -> {
227238
239+ // server may assign a session ID at initialization time, if yes we
240+ // have to use it for any subsequent requests
241+ response .headers ().firstValue (MCP_SESSION_ID ).map (String ::trim ).ifPresent (this .mcpSessionId ::set );
242+
228243 // If the response is 202 Accepted, there's no body to process
229244 if (response .statusCode () == 202 ) {
230245 return Mono .empty ();
231246 }
232247
248+ // must like server terminate session and the client need to start a
249+ // new session by sending a new `InitializeRequest` without a session
250+ // ID attached.
251+ if (mcpSessionId .get () != null && response .statusCode () == 404 ) {
252+ mcpSessionId .set (null );
253+ }
254+
233255 if (response .statusCode () == 405 || response .statusCode () == 404 ) {
234256 LOGGER .warn ("Operation not allowed, falling back to SSE" );
235257 fallbackToSse .set (true );
0 commit comments