Skip to content

Commit d3760b8

Browse files
committed
修改transportProvider的handlePOST方法
1 parent 5f56153 commit d3760b8

File tree

2 files changed

+48
-32
lines changed

2 files changed

+48
-32
lines changed

framework/fel/java/plugins/tool-mcp-server/README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,15 @@
6565

6666
#### 根据请求类型调用处理逻辑的方法
6767

68-
| 方法名 | 处理的请求类型 | 说明 |
69-
|--------------------------------|---------|--------------------------------------|
70-
| `handleReplaySseRequest()` | GET | 处理 SSE 消息重放请求,用于断线重连后恢复错过的消息 |
71-
| `handleEstablishSseRequest()` | GET | 处理 SSE 连接建立请求,创建新的持久化 SSE 监听流 |
72-
| `handleInitializeRequest()` | POST | 处理客户端初始化连接请求,创建新的 MCP 会话 |
73-
| `handleJsonRpcResponse()` | POST | 处理 JSON-RPC 响应消息(如 Elicitation 中的客户端响应) |
74-
| `handleJsonRpcNotification()` | POST | 处理 JSON-RPC 通知消息(客户端单向通知) |
75-
| `handleJsonRpcRequest()` | POST | 处理 JSON-RPC 请求消息,返回 SSE 流式响应 |
68+
| 方法名 | 处理的请求类型 | 说明 |
69+
|---------------------------------|---------|------------------------------------------|
70+
| `handleReplaySseRequest()` | GET | 处理 SSE 消息重放请求,用于断线重连后恢复错过的消息 |
71+
| `handleEstablishSseRequest()` | GET | 处理 SSE 连接建立请求,创建新的持久化 SSE 监听流 |
72+
| `handleInitializeRequest()` | POST | 处理客户端初始化连接请求,创建新的 MCP 会话 |
73+
| `handleJsonRpcMessage()` | POST | 把非Initialize的客户端消息分流给下面三个方法,包含Session验证。 |
74+
| `handleJsonRpcResponse()` | POST | 处理 JSON-RPC 响应消息(如 Elicitation 中的客户端响应) |
75+
| `handleJsonRpcNotification()` | POST | 处理 JSON-RPC 通知消息(客户端单向通知) |
76+
| `handleJsonRpcRequest()` | POST | 处理 JSON-RPC 请求消息,返回 SSE 流式响应 |
7677

7778
### 内部类
7879

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -249,35 +249,13 @@ public Object handlePost(HttpClassicServerRequest request, HttpClassicServerResp
249249
String requestBody = new String(request.entityBytes(), StandardCharsets.UTF_8);
250250
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, requestBody);
251251

252-
// Handle initialization request
252+
// Handle JSONRPCMessage
253253
if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest && jsonrpcRequest.method()
254254
.equals(McpSchema.METHOD_INITIALIZE)) {
255255
logger.info("[POST] Handling initialize method, with receiving message: {}", requestBody);
256256
return handleInitializeRequest(request, response, jsonrpcRequest);
257-
}
258-
259-
// Get session ID and session
260-
Object sessionError = validateRequestSessionId(request, response);
261-
if (sessionError != null) {
262-
return sessionError;
263-
}
264-
String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse("");
265-
McpStreamableServerSession session = this.sessions.get(sessionId);
266-
logger.info("[POST] Receiving message from session {}: {}", sessionId, requestBody);
267-
268-
// Handle JSONRPCMessage
269-
if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) {
270-
handleJsonRpcResponse(jsonrpcResponse, session, transportContext, response);
271-
return null;
272-
} else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
273-
handleJsonRpcNotification(jsonrpcNotification, session, transportContext, response);
274-
return null;
275-
} else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
276-
return handleJsonRpcRequest(jsonrpcRequest, session, sessionId, transportContext, response);
277257
} else {
278-
response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode());
279-
return Entity.createObject(response,
280-
McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message("Unknown message type").build());
258+
return handleJsonRpcMessage(message, request, requestBody, transportContext, response);
281259
}
282260
} catch (IllegalArgumentException | IOException e) {
283261
logger.error("Failed to deserialize message: {}", e.getMessage(), e);
@@ -517,6 +495,43 @@ private Object handleInitializeRequest(HttpClassicServerRequest request, HttpCla
517495
}
518496
}
519497

498+
/**
499+
* Handles different types of JSON-RPC messages (Response, Notification, Request).
500+
* Routes the message to the appropriate handler method based on its type.
501+
*
502+
* @param message The {@link McpSchema.JSONRPCMessage} to handle
503+
* @param request The incoming {@link HttpClassicServerRequest}
504+
* @param requestBody The {@link String} of request body.
505+
* @param transportContext The {@link McpTransportContext} for request context propagation
506+
* @param response The {@link HttpClassicServerResponse} to set status code and return data
507+
* @return An {@link Entity} or {@link Choir} containing the response data, or {@code null} for accepted messages
508+
*/
509+
private Object handleJsonRpcMessage(McpSchema.JSONRPCMessage message, HttpClassicServerRequest request,
510+
String requestBody, McpTransportContext transportContext, HttpClassicServerResponse response) {
511+
// Get session ID and session
512+
Object sessionError = validateRequestSessionId(request, response);
513+
if (sessionError != null) {
514+
return sessionError;
515+
}
516+
String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse("");
517+
McpStreamableServerSession session = this.sessions.get(sessionId);
518+
logger.info("[POST] Receiving message from session {}: {}", sessionId, requestBody);
519+
520+
if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) {
521+
handleJsonRpcResponse(jsonrpcResponse, session, transportContext, response);
522+
return null;
523+
} else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
524+
handleJsonRpcNotification(jsonrpcNotification, session, transportContext, response);
525+
return null;
526+
} else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
527+
return handleJsonRpcRequest(jsonrpcRequest, session, sessionId, transportContext, response);
528+
} else {
529+
response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode());
530+
return Entity.createObject(response,
531+
McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message("Unknown message type").build());
532+
}
533+
}
534+
520535
/**
521536
* Handles incoming JSON-RPC response messages from clients.
522537
* Accepts the response and delivers it to the corresponding pending request within the session.

0 commit comments

Comments
 (0)