Skip to content

Commit c7cbe98

Browse files
committed
Listening stream already exists for this session and will be closed to make way for the new listening SSE stream
1 parent 3dac307 commit c7cbe98

File tree

3 files changed

+21
-20
lines changed

3 files changed

+21
-20
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,16 +190,17 @@ private Mono<ServerResponse> handleGet(ServerRequest request) {
190190
}
191191

192192
McpLoggableSession listenedStream = session.getListeningStream();
193-
boolean replayRequest = request.headers().asHttpHeaders().containsKey(HttpHeaders.LAST_EVENT_ID);
194-
if (replayRequest) {
193+
if (request.headers().asHttpHeaders().containsKey(HttpHeaders.LAST_EVENT_ID)) {
195194
String lastId = request.headers().asHttpHeaders().getFirst(HttpHeaders.LAST_EVENT_ID);
196195
return ServerResponse.ok()
197196
.contentType(MediaType.TEXT_EVENT_STREAM)
198197
.body(session.replay(lastId), ServerSentEvent.class);
199198
}
200199
if (listenedStream instanceof McpStreamableServerSessionStream) {
201-
logger.debug("Listening stream for session: {} exists.", sessionId);
202-
return ServerResponse.ok().build();
200+
logger.debug(
201+
"Listening stream already exists for this session:{} and will be closed to make way for the new listening SSE stream",
202+
sessionId);
203+
listenedStream.close();
203204
}
204205

205206
return ServerResponse.ok()

mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,6 @@ private ServerResponse handleGet(ServerRequest request) {
254254
}
255255

256256
logger.debug("Handling GET request for session: {}", sessionId);
257-
McpLoggableSession listenedStream = session.getListeningStream();
258-
boolean replayRequest = request.headers().asHttpHeaders().containsKey(HttpHeaders.LAST_EVENT_ID);
259-
if (!replayRequest && listenedStream instanceof McpStreamableServerSessionStream) {
260-
logger.debug("Listening stream for session: {} exists.", sessionId);
261-
return ServerResponse.ok().build();
262-
}
263257
try {
264258
return ServerResponse.sse(sseBuilder -> {
265259
sseBuilder.onTimeout(() -> {
@@ -270,7 +264,7 @@ private ServerResponse handleGet(ServerRequest request) {
270264
sessionId, sseBuilder);
271265

272266
// Check if this is a replay request
273-
if (replayRequest) {
267+
if (request.headers().asHttpHeaders().containsKey(HttpHeaders.LAST_EVENT_ID)) {
274268
String lastId = request.headers().asHttpHeaders().getFirst(HttpHeaders.LAST_EVENT_ID);
275269
try {
276270
session.replay(lastId)
@@ -294,6 +288,13 @@ private ServerResponse handleGet(ServerRequest request) {
294288
}
295289
}
296290
else {
291+
McpLoggableSession listenedStream = session.getListeningStream();
292+
if (listenedStream instanceof McpStreamableServerSessionStream) {
293+
logger.debug(
294+
"Listening stream already exists for this session:{} and will be closed to make way for the new listening SSE stream",
295+
sessionId);
296+
listenedStream.close();
297+
}
297298
// Establish new listening stream
298299
McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session
299300
.listeningStream(sessionTransport);

mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -275,14 +275,6 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
275275
}
276276

277277
logger.debug("Handling GET request for session: {}", sessionId);
278-
McpLoggableSession listenedStream = session.getListeningStream();
279-
boolean replayRequest = request.getHeader(HttpHeaders.LAST_EVENT_ID) != null;
280-
if (!replayRequest && listenedStream instanceof McpStreamableServerSessionStream) {
281-
logger.debug("Listening stream for session: {} exists.", sessionId);
282-
response.setStatus(HttpServletResponse.SC_OK);
283-
return;
284-
}
285-
286278
McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
287279

288280
try {
@@ -299,7 +291,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
299291
sessionId, asyncContext, response.getWriter());
300292

301293
// Check if this is a replay request
302-
if (replayRequest) {
294+
if (request.getHeader(HttpHeaders.LAST_EVENT_ID) != null) {
303295
String lastId = request.getHeader(HttpHeaders.LAST_EVENT_ID);
304296

305297
try {
@@ -324,6 +316,13 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
324316
}
325317
}
326318
else {
319+
McpLoggableSession listenedStream = session.getListeningStream();
320+
if (listenedStream instanceof McpStreamableServerSessionStream) {
321+
logger.debug(
322+
"Listening stream already exists for this session:{} and will be closed to make way for the new listening SSE stream",
323+
sessionId);
324+
listenedStream.close();
325+
}
327326
// Establish new listening stream
328327
McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session
329328
.listeningStream(sessionTransport);

0 commit comments

Comments
 (0)