Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit bfd07b4

Browse files
committed
refactor(transport): Simplify WebMvc SSE transport with native Spring SseBuilder
Replaces custom BlockingQueue-based SSE implementation with Spring's native SseBuilder for simpler and more reliable event handling. This change: - Removes custom SSEEvent and queue implementation - Uses Spring's SseBuilder directly for SSE event management - Improves error handling with direct error reporting - Reduces complexity in session management
1 parent fa27872 commit bfd07b4

File tree

1 file changed

+18
-29
lines changed

1 file changed

+18
-29
lines changed

mcp-transport/mcp-webmvc-sse-transport/src/main/java/org/springframework/ai/mcp/server/transport/WebMvcSseServerTransport.java

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.util.UUID;
21-
import java.util.concurrent.BlockingQueue;
2221
import java.util.concurrent.ConcurrentHashMap;
23-
import java.util.concurrent.LinkedBlockingQueue;
24-
import java.util.concurrent.TimeUnit;
2522
import java.util.function.Function;
2623

2724
import com.fasterxml.jackson.core.type.TypeReference;
@@ -39,6 +36,7 @@
3936
import org.springframework.web.servlet.function.RouterFunctions;
4037
import org.springframework.web.servlet.function.ServerRequest;
4138
import org.springframework.web.servlet.function.ServerResponse;
39+
import org.springframework.web.servlet.function.ServerResponse.SseBuilder;
4240

4341
/**
4442
* Server-side implementation of the MCP HTTP with SSE transport specification using
@@ -122,11 +120,11 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
122120

123121
sessions.values().forEach(session -> {
124122
try {
125-
var event = new SSEEvent(session.id, MESSAGE_EVENT_TYPE, jsonText);
126-
session.queue.put(event);
123+
session.sseBuilder.id(session.id).event(MESSAGE_EVENT_TYPE).data(jsonText);
127124
}
128125
catch (Exception e) {
129126
logger.error("Failed to send message to session {}: {}", session.id, e.getMessage());
127+
session.sseBuilder.error(e);
130128
}
131129
});
132130
}
@@ -147,27 +145,20 @@ private ServerResponse handleSseConnection(ServerRequest request) {
147145
String sessionId = UUID.randomUUID().toString();
148146
logger.debug("Creating new SSE connection for session: {}", sessionId);
149147

150-
ClientSession session = new ClientSession(sessionId);
151-
this.sessions.put(sessionId, session);
152-
153148
// Send initial endpoint event
154149
try {
155-
session.queue.put(new SSEEvent(session.id, ENDPOINT_EVENT_TYPE, messageEndpoint));
156150
return ServerResponse.sse(sseBuilder -> {
157-
// new Thread(() -> {
158-
while (!this.isClosing) {
159-
try {
160-
SSEEvent sseEvent = session.queue.poll(100, TimeUnit.MILLISECONDS);
161-
if (sseEvent != null) {
162-
sseBuilder.id(sseEvent.id).event(sseEvent.type()).data(sseEvent.data());
163-
}
164-
}
165-
catch (Exception e) {
166-
logger.error("Failed to poll event from session queue: {}", e.getMessage());
167-
sseBuilder.error(e);
168-
}
151+
152+
ClientSession session = new ClientSession(sessionId, sseBuilder);
153+
this.sessions.put(sessionId, session);
154+
155+
try {
156+
session.sseBuilder.id(session.id).event(ENDPOINT_EVENT_TYPE).data(messageEndpoint);
157+
}
158+
catch (Exception e) {
159+
logger.error("Failed to poll event from session queue: {}", e.getMessage());
160+
sseBuilder.error(e);
169161
}
170-
// }).start();
171162
});
172163
}
173164
catch (Exception e) {
@@ -205,32 +196,30 @@ private ServerResponse handleMessage(ServerRequest request) {
205196
}
206197
}
207198

208-
record SSEEvent(String id, String type, String data) {
209-
}
210-
211199
/**
212200
* Represents an active client session.
213201
*/
214202
private static class ClientSession {
215203

216204
private final String id;
217205

218-
private final BlockingQueue<SSEEvent> queue;
206+
private final SseBuilder sseBuilder;
219207

220-
ClientSession(String id) {
208+
ClientSession(String id, SseBuilder sseBuilder) {
221209
this.id = id;
222-
this.queue = new LinkedBlockingQueue<>();
210+
this.sseBuilder = sseBuilder;
223211
logger.debug("Session {} initialized with SSE emitter", id);
224212
}
225213

226214
void close() {
227215
logger.debug("Closing session: {}", id);
228216
try {
229-
queue.remove();
217+
sseBuilder.complete();
230218
logger.debug("Successfully completed SSE emitter for session {}", id);
231219
}
232220
catch (Exception e) {
233221
logger.warn("Failed to complete SSE emitter for session {}: {}", id, e.getMessage());
222+
// sseBuilder.error(e);
234223
}
235224
}
236225

0 commit comments

Comments
 (0)