Skip to content

Commit ca3555b

Browse files
committed
优化连接监控机制
1 parent a954a5e commit ca3555b

File tree

3 files changed

+44
-51
lines changed

3 files changed

+44
-51
lines changed

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/DefaultMcpServer.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,13 @@ public void onToolAdded(String name, String description, Map<String, Object> par
9090
log.warn("Tool addition is ignored: tool schema is null or empty. [toolName={}]", name);
9191
return;
9292
}
93+
Object props = parameters.get(PROPERTIES);
94+
Object reqs = parameters.get(REQUIRED);
9395
if (!(parameters.get(TYPE) instanceof String)
94-
|| parameters.get(PROPERTIES) != null && !(parameters.get(PROPERTIES) instanceof Map)
95-
|| parameters.get(REQUIRED) != null && !(parameters.get(REQUIRED) instanceof List)) {
96+
|| (props != null && (!(props instanceof Map<?, ?>)
97+
|| ((Map<?, ?>) props).keySet().stream().anyMatch(k -> !(k instanceof String))))
98+
|| (reqs != null && (!(reqs instanceof List<?>)
99+
|| ((List<?>) reqs).stream().anyMatch(v -> !(v instanceof String))))) {
96100
log.warn("Invalid parameter schema. [toolName={}]", name);
97101
return;
98102
}
@@ -112,13 +116,19 @@ public void onToolAdded(String name, String description, Map<String, Object> par
112116
return new McpSchema.CallToolResult(result, false);
113117
})
114118
.build();
115-
this.mcpSyncServer.addTool(toolSpecification);
116-
117119
Tool tool = new Tool();
118120
tool.setName(name);
119121
tool.setDescription(description);
120122
tool.setInputSchema(parameters);
121-
this.tools.put(name, tool);
123+
124+
try {
125+
this.mcpSyncServer.addTool(toolSpecification);
126+
this.tools.put(name, tool);
127+
} catch (Exception e) {
128+
log.error("Failed to add tool: {}", name, e);
129+
this.tools.remove(name);
130+
return;
131+
}
122132
log.info("Tool added to MCP server. [toolName={}, description={}, schema={}]", name, description, parameters);
123133
this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged);
124134
}

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/DefaultMcpServerBean.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
*/
2323
@Component
2424
public class DefaultMcpServerBean {
25+
private final static Duration requestTimeout = Duration.ofSeconds(10);
26+
2527
@Bean
2628
public DefaultMcpStreamableServerTransportProvider defaultMcpStreamableServerTransportProvider() {
2729
return DefaultMcpStreamableServerTransportProvider.builder()
@@ -37,7 +39,7 @@ public McpSyncServer mcpSyncServer(DefaultMcpStreamableServerTransportProvider t
3739
.tools(true) // Enable tool support
3840
.logging() // Enable logging support
3941
.build())
40-
.requestTimeout(Duration.ofSeconds(10))
42+
.requestTimeout(requestTimeout)
4143
.build();
4244
}
4345
}

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/DefaultMcpStreamableServerTransportProvider.java

Lines changed: 26 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo
235235
// TODO emitter.onTimeout() logger.info()
236236

237237
DefaultStreamableMcpSessionTransport sessionTransport = new DefaultStreamableMcpSessionTransport(
238-
sessionId, emitter);
238+
sessionId, emitter, response);
239239

240240
// Check if this is a replay request
241241
if (request.headers().contains(HttpHeaders.LAST_EVENT_ID)) {
@@ -278,17 +278,23 @@ public void onEmittedData(TextEvent data) {
278278
@Override
279279
public void onCompleted() {
280280
logger.info("[SSE] Completed SSE emitting for session: {}", sessionId);
281+
try {
282+
listeningStream.close();
283+
} catch (Exception e) {
284+
logger.warn("[SSE] Error closing listeningStream on complete: {}", e.getMessage());
285+
}
281286
}
282287

283288
@Override
284289
public void onFailed(Exception cause) {
285290
logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, cause.getMessage());
291+
try {
292+
listeningStream.close();
293+
} catch (Exception e) {
294+
logger.warn("[SSE] Error closing listeningStream on failure: {}", e.getMessage());
295+
}
286296
}
287297
});
288-
289-
// Add connection monitoring to detect client disconnection
290-
// This is a workaround to ensure listeningStream.close() is called when client disconnects
291-
startConnectionMonitoring(sessionId, listeningStream, response);
292298
}
293299
});
294300
}
@@ -299,43 +305,6 @@ public void onFailed(Exception cause) {
299305
}
300306
}
301307

302-
/**
303-
* Starts connection monitoring to detect client disconnection and ensure proper cleanup.
304-
* This is a workaround to ensure listeningStream.close() is called when client disconnects.
305-
*
306-
* @param sessionId The session ID
307-
* @param listeningStream The listening stream to close when connection is lost
308-
* @param response The HTTP response to check for connection status
309-
*/
310-
private void startConnectionMonitoring(String sessionId,
311-
McpStreamableServerSession.McpStreamableServerSessionStream listeningStream,
312-
HttpClassicServerResponse response) {
313-
// Use a separate thread to periodically check connection status
314-
Thread monitoringThread = new Thread(() -> {
315-
try {
316-
while (!Thread.currentThread().isInterrupted()) {
317-
Thread.sleep(1000); // Check every second
318-
319-
// Check if the HTTP response is still active
320-
if (!response.isActive()) {
321-
logger.info("[SSE] Connection lost for session, completing emitter to trigger cleanup");
322-
listeningStream.close();
323-
break;
324-
}
325-
}
326-
} catch (InterruptedException e) {
327-
logger.debug("[SSE] Connection monitoring interrupted for session");
328-
Thread.currentThread().interrupt();
329-
} catch (Exception e) {
330-
logger.warn("[SSE] Error in connection monitoring: {}", e.getMessage());
331-
}
332-
});
333-
334-
monitoringThread.setDaemon(true);
335-
monitoringThread.setName("sse-connection-monitor-" + sessionId);
336-
monitoringThread.start();
337-
}
338-
339308
/**
340309
* Handles POST requests for incoming JSON-RPC messages from clients.
341310
*
@@ -449,11 +418,11 @@ public void onCompleted() {
449418

450419
@Override
451420
public void onFailed(Exception e) {
452-
// No action needed
421+
logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, e.getMessage());
453422
}
454423
});
455424

456-
DefaultStreamableMcpSessionTransport sessionTransport = new DefaultStreamableMcpSessionTransport(sessionId, emitter);
425+
DefaultStreamableMcpSessionTransport sessionTransport = new DefaultStreamableMcpSessionTransport(sessionId, emitter, response);
457426

458427
try {
459428
session.responseStream(jsonrpcRequest, sessionTransport)
@@ -571,15 +540,20 @@ private class DefaultStreamableMcpSessionTransport implements McpStreamableServe
571540
private final ReentrantLock lock = new ReentrantLock();
572541

573542
private volatile boolean closed = false;
543+
544+
private final HttpClassicServerResponse response;
574545

575546
/**
576547
* Creates a new session transport with the specified ID and SSE builder.
577548
*
578549
* @param sessionId The unique identifier for this session
550+
* @param emitter The emitter for sending events
551+
* @param response The HTTP response for checking connection status
579552
*/
580-
DefaultStreamableMcpSessionTransport(String sessionId, Emitter<TextEvent> emitter) {
553+
DefaultStreamableMcpSessionTransport(String sessionId, Emitter<TextEvent> emitter, HttpClassicServerResponse response) {
581554
this.sessionId = sessionId;
582555
this.emitter = emitter;
556+
this.response = response;
583557
logger.info("[SSE] Building SSE for session: {} ", sessionId);
584558
}
585559

@@ -616,6 +590,13 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId
616590
logger.info("Session {} was closed during message send attempt", this.sessionId);
617591
return;
618592
}
593+
594+
// Check if connection is still active before sending
595+
if (!this.response.isActive()) {
596+
logger.warn("[SSE] Connection inactive detected while sending message for session: {}", this.sessionId);
597+
this.close();
598+
return;
599+
}
619600

620601
String jsonText = objectMapper.writeValueAsString(message);
621602
TextEvent textEvent = TextEvent.custom()

0 commit comments

Comments
 (0)