Skip to content

Commit c32b6cc

Browse files
authored
[FEL] MCP streamable HTTP server implemented with MCP SDK (#344)
* 添加sdk写法相关类 * 接入Fit Http逻辑 * 使用Choir响应流实现sse发送 * 清除废弃文件 * request.headers().all()使用有bug * 接入MCP SDK的streamable服务器 * 增加注释 * 格式修正 * 日志修正 * 更新MCP SDK版本0.14.0 * 工具添加 * fit工具链路添加 * DefaultMcpServer * DefaultMcpServerTest修改 * DefaultMcpServerTest修改 * input schema判断逻辑修改 * 添加Server Bean * 修复Get结束Emitter不close的问题 * 接口解耦 * test变动 * 使用logback-classic提供给SLF4J * 优化连接监控机制 * SLF4J依赖修正 * Optimize imports * 修正类名 * 删除本地Tools保存 * ServerSchema旧逻辑删除 * 根据0.14.1版本,ObjectMapper更新为McpJsonMapper * 修改onToolAdded()抛出异常 * 格式化加onToolAdded()逻辑优化 * transport类文档 * 修改md文档 * 更新MD文档 * transportProvider优化 * 更新README * 修改transport类null返回值 * 修改transportProvider的handlePOST方法
1 parent 5bf6859 commit c32b6cc

21 files changed

+1484
-1028
lines changed
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
# FitMcpStreamableServerTransportProvider类维护文档
2+
3+
## 文档概述
4+
5+
本文档用于记录 `FitMcpStreamableServerTransportProvider` 类的设计、实现细节以及维护更新指南。该类是基于 MCP SDK 中的
6+
`HttpServletStreamableServerTransportProvider` 类改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。
7+
8+
**原始参考类**: MCP SDK 中的 `HttpServletStreamableServerTransportProvider`
9+
10+
**创建时间**: 2025-11-04
11+
12+
---
13+
14+
## 类的作用和职责
15+
16+
`FitMcpStreamableServerTransportProvider` 是 MCP 服务端传输层的核心实现类,负责:
17+
18+
1. **HTTP 端点处理**: 处理 GET、POST、DELETE 请求,实现 MCP 协议的 HTTP 传输层
19+
2. **会话管理**: 管理客户端会话的生命周期(创建、维护、销毁)
20+
3. **SSE 通信**: 通过 Server-Sent Events (SSE) 实现服务端到客户端的实时消息推送
21+
4. **消息序列化**: 处理 JSON-RPC 消息的序列化和反序列化
22+
5. **连接保活**: 支持可选的 Keep-Alive 机制
23+
6. **优雅关闭**: 支持服务的优雅关闭和资源清理
24+
25+
---
26+
27+
## 类结构概览
28+
29+
### 主要成员变量
30+
31+
| 变量名 | 类型 | 来源 | 说明 |
32+
|----------------------|----------------------------------------------------------|------------|---------------------------------|
33+
| `MESSAGE_ENDPOINT` | `String` | SDK 原始 | 消息端点路径 `/mcp/streamable` |
34+
| `disallowDelete` | `boolean` | SDK 原始 | 是否禁用 DELETE 请求 |
35+
| `jsonMapper` | `McpJsonMapper` | SDK 原始 | JSON 序列化器 |
36+
| `contextExtractor` | `McpTransportContextExtractor<HttpClassicServerRequest>` | **FIT 改造** | 上下文提取器(泛型参数改为 FIT 的 Request 类型) |
37+
| `keepAliveScheduler` | `KeepAliveScheduler` | SDK 原始 | Keep-Alive 调度器 |
38+
| `sessionFactory` | `McpStreamableServerSession.Factory` | SDK 原始 | 会话工厂 |
39+
| `sessions` | `Map<String, McpStreamableServerSession>` | SDK 原始 | 活跃会话映射表 |
40+
| `isClosing` | `volatile boolean` | SDK 原始 | 关闭标志 |
41+
42+
### 主要方法
43+
44+
| 方法名 | 来源 | 说明 |
45+
| --------------------- | ------------ | ------------------------------- |
46+
| `protocolVersions()` | SDK 原始 | 返回支持的 MCP 协议版本 |
47+
| `setSessionFactory()` | SDK 原始 | 设置会话工厂 |
48+
| `notifyClients()` | SDK 原始 | 广播通知到所有客户端 |
49+
| `closeGracefully()` | SDK 原始 | 优雅关闭传输层 |
50+
| `handleGet()` | **FIT 改造** | 处理 GET 请求(SSE 连接) |
51+
| `handlePost()` | **FIT 改造** | 处理 POST 请求(JSON-RPC 消息) |
52+
| `handleDelete()` | **FIT 改造** | 处理 DELETE 请求(会话删除) |
53+
54+
### 重构后的辅助方法
55+
56+
为提高代码可读性和可维护性,从原本的 `handleGet()``handlePost()``handleDelete()` 方法中抽取了以下辅助方法:
57+
58+
#### 验证请求合法性的方法
59+
60+
| 方法名 | 说明 |
61+
|-------------------------------|----------------------------------------------------------|
62+
| `validateGetAcceptHeaders()` | 验证 GET 请求的 Accept 头,确保包含 `text/event-stream` |
63+
| `validatePostAcceptHeaders()` | 验证 POST 请求的 Accept 头,确保包含 `text/event-stream``application/json` |
64+
| `validateRequestSessionId()` | 验证请求的 `mcp-session-id` 头是否存在,以及对应的会话是否存在 |
65+
66+
#### 根据请求类型调用处理逻辑的方法
67+
68+
| 方法名 | 处理的请求类型 | 说明 |
69+
|---------------------------------|---------|------------------------------------------|
70+
| `handleReplaySseRequest()` | GET | 处理 SSE 消息重放请求,用于断线重连后恢复错过的消息 |
71+
| `handleEstablishSseRequest()` | GET | 处理 SSE 连接建立请求,创建新的持久化 SSE 监听流 |
72+
| `handleInitializeRequest()` | POST | 处理客户端初始化连接请求,创建新的 MCP 会话 |
73+
| `handleJsonRpcMessage()` | POST | 把非Initialize的客户端消息分流给下面三个方法,包含Session验证。 |
74+
| `handleJsonRpcResponse()` | POST | 处理 JSON-RPC 响应消息(如 Elicitation 中的客户端响应) |
75+
| `handleJsonRpcNotification()` | POST | 处理 JSON-RPC 通知消息(客户端单向通知) |
76+
| `handleJsonRpcRequest()` | POST | 处理 JSON-RPC 请求消息,返回 SSE 流式响应 |
77+
78+
### 内部类
79+
80+
| 类名 | 来源 | 说明 |
81+
|------------------------------------|------------|-----------------------------|
82+
| `FitStreamableMcpSessionTransport` | **FIT 改造** | 用于SSE 会话`sendMessage()`传输实现 |
83+
| `Builder` | SDK 原始 | 构建器模式 |
84+
85+
---
86+
87+
## SDK 原始逻辑
88+
89+
以下是从 MCP SDK 的 `HttpServletStreamableServerTransportProvider` 类保留的原始逻辑:
90+
91+
### 1. 会话管理核心逻辑
92+
93+
```java
94+
private final Map<String, McpStreamableServerSession> sessions = new ConcurrentHashMap<>();
95+
```
96+
97+
- 使用 `ConcurrentHashMap` 存储活跃会话
98+
- 会话以 `mcp-session-id` 作为键
99+
100+
### 2. 会话工厂设置
101+
102+
```java
103+
public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) {
104+
this.sessionFactory = sessionFactory;
105+
}
106+
```
107+
108+
- 由外部设置会话工厂,用于创建新会话
109+
110+
### 3. 客户端通知
111+
112+
```java
113+
public Mono<Void> notifyClients(String method, Object params) {
114+
// ... 广播逻辑
115+
}
116+
```
117+
118+
- 向所有活跃会话并行发送通知
119+
- 使用 `parallelStream()` 提高效率
120+
- 单个会话失败不影响其他会话
121+
122+
### 4. 关闭逻辑
123+
124+
```java
125+
public Mono<Void> closeGracefully() {
126+
this.isClosing = true;
127+
// ... 关闭所有会话
128+
// ... 关闭 keep-alive 调度器
129+
}
130+
```
131+
132+
- 设置关闭标志
133+
- 关闭所有活跃会话
134+
- 清理资源
135+
136+
## FIT 框架改造核心逻辑
137+
138+
以下是为适配 FIT 框架而新增或改造的部分:
139+
140+
### 1. HTTP 端点处理核心流程(核心改造)
141+
142+
- 请求/响应对象类型变更:
143+
- `HttpServletRequest``HttpClassicServerRequest`
144+
- `HttpServletResponse``HttpClassicServerResponse`
145+
- 返回类型改为通用的 `Object`,支持多种返回形式
146+
147+
#### a. GET 请求处理流程
148+
149+
1. 检查服务器是否正在关闭
150+
2. **调用 `validateGetAcceptHeaders()`** - 验证 Accept 头是否包含 `text/event-stream`
151+
3. **调用 `validateRequestSessionId()`** - 验证 `mcp-session-id` 头是否存在及对应会话是否存在
152+
4. 提取 `transportContext` 上下文
153+
5. 获取会话 ID 和会话对象
154+
6. 检查是否是重放请求(`Last-Event-ID` 头):
155+
- 如果是,**调用 `handleReplaySseRequest()`** - 重放错过的消息
156+
- 如果否,**调用 `handleEstablishSseRequest()`** - 建立新的 SSE 监听流
157+
158+
#### b. POST 请求处理流程
159+
160+
1. 检查服务器是否正在关闭
161+
2. **调用 `validatePostAcceptHeaders()`** - 验证 Accept 头包含 `text/event-stream``application/json`
162+
3. 提取 `transportContext` 上下文
163+
4. 反序列化 JSON-RPC 消息
164+
5. 判断是否为初始化请求(`initialize` 方法):
165+
- 如果是,**调用 `handleInitializeRequest()`** - 创建新会话并返回初始化结果
166+
6. **调用 `validateRequestSessionId()`** - 验证会话(仅非初始化请求)
167+
7. 获取会话 ID 和会话对象
168+
8. 根据消息类型分发处理:
169+
- `JSONRPCResponse`**调用 `handleJsonRpcResponse()`**
170+
- `JSONRPCNotification`**调用 `handleJsonRpcNotification()`**
171+
- `JSONRPCRequest`**调用 `handleJsonRpcRequest()`**
172+
173+
#### c. DELETE 请求处理流程
174+
175+
1. 检查服务器是否正在关闭
176+
2. 检查是否禁用 DELETE 操作
177+
3. **调用 `validateRequestSessionId()`** - 验证 `mcp-session-id` 头及会话存在性
178+
4. 提取 `transportContext` 上下文
179+
5. 获取会话 ID 和会话对象
180+
6. 删除会话并从会话映射表中移除
181+
182+
### 2. SSE 实现改造(核心改造)
183+
184+
**原始 SDK**:
185+
186+
```java
187+
SseEmitter sseEmitter = new SseEmitter();
188+
sseEmitter.send(SseEmitter.event()
189+
.id(messageId)
190+
.name("message")
191+
.data(jsonText));
192+
sseEmitter.complete();
193+
```
194+
195+
**FIT 框架改造**:
196+
197+
```java
198+
// 使用 Choir 和 Emitter 实现 SSE
199+
Choir.<TextEvent>create(emitter -> {
200+
// 创建sessionTransport类,用于调用emitter发送消息
201+
FitStreamableMcpSessionTransport sessionTransport =
202+
new FitStreamableMcpSessionTransport(sessionId, emitter, response);
203+
204+
// session的逻辑是SDK原有的,里面会调用sessionTransport发送事件流
205+
session.responseStream(jsonrpcRequest, sessionTransport)
206+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext))
207+
.block();
208+
209+
// 监听 Emitter 的生命周期
210+
emitter.observe(new Emitter.Observer<TextEvent>() {
211+
@Override
212+
public void onEmittedData(TextEvent data) {
213+
// 数据发送完成
214+
}
215+
216+
@Override
217+
public void onCompleted() {
218+
// SSE 流正常结束
219+
listeningStream.close();
220+
}
221+
222+
@Override
223+
public void onFailed(Exception cause) {
224+
// SSE 流异常结束
225+
listeningStream.close();
226+
}
227+
});
228+
});
229+
```
230+
231+
**关键变化**:
232+
233+
- 使用 `Choir<TextEvent>` 返回事件流
234+
- 使用 `Emitter<TextEvent>` 替代 `SseEmitter` 的发送方法
235+
- 使用 `Emitter.Observer` 监听 SSE 生命周期事件
236+
237+
### 3. HTTP 响应处理改造
238+
239+
**FIT 特有的响应方式**:
240+
241+
#### 返回纯文本
242+
243+
```java
244+
response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode());
245+
return Entity.createText(response, "Session ID required in mcp-session-id header");
246+
```
247+
248+
#### 返回 JSON 对象
249+
250+
```java
251+
response.statusCode(HttpResponseStatus.NOT_FOUND.statusCode());
252+
return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS)
253+
.message("Session not found: "+sessionId)
254+
.build());
255+
```
256+
257+
#### 返回 SSE 流(重要改造)
258+
259+
```java
260+
return Choir.<TextEvent> create(emitter ->{
261+
// emitter封装在sessionTransport中,被session调用
262+
emitter.emit(textEvent);
263+
});
264+
```
265+
266+
### 4. HTTP 头处理改造
267+
268+
**FIT 框架的 Headers API**:
269+
270+
```java
271+
// 获取 Header
272+
String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse("");
273+
boolean hasSessionId = request.headers().contains(HttpHeaders.MCP_SESSION_ID);
274+
String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse("");
275+
276+
// 设置 Header
277+
response.headers().set("Content-Type",MimeType.APPLICATION_JSON.value());
278+
response.headers().set(HttpHeaders.MCP_SESSION_ID, sessionId);
279+
280+
// 设置状态码
281+
response.statusCode(HttpResponseStatus.OK.statusCode());
282+
```
283+
284+
**变化**:
285+
286+
- 使用 `request.headers().first(name).orElse(default)` 获取单个 Header
287+
- 使用 `request.headers().contains(name)` 检查 Header 是否存在
288+
- 使用 FIT 的 `MessageHeaderNames``MimeType` 常量
289+
- 使用 `HttpResponseStatus` 枚举设置状态码
290+
291+
### 5. 内部类 Transport 实现
292+
293+
`FitStreamableMcpSessionTransport` 类的核心职责是发送SSE事件:
294+
295+
- `sendmessage()`方法通过`Emitter<TextEvent>` 发送SSE消息到客户端
296+
- 保存了当前会话的事件的`Emitter<TextEvent>`,负责close时关闭`Emitter<TextEvent>`
297+
298+
- SSE的`Emitter<TextEvent>`感知不到GET连接是否断开,因此在`sendmessage()`发送前检查GET连接是否活跃
299+
300+
```java
301+
// 在发送消息前检查连接是否仍然活跃
302+
if(!this.response.isActive()){
303+
logger.warn("[SSE] Connection inactive detected while sending message for session: {}",
304+
this.sessionId);
305+
this.close();
306+
return;
307+
}
308+
```
309+
310+
## 参考资源
311+
312+
### MCP 协议文档
313+
314+
- MCP 协议规范:[https://spec.modelcontextprotocol.io/](https://spec.modelcontextprotocol.io/)
315+
- MCP SDK GitHub: [https://github.com/modelcontextprotocol/](https://github.com/modelcontextprotocol/)
316+
317+
### 更新记录
318+
319+
| 日期 | 更新内容 | 负责人 |
320+
|----------|---------------------------------|-----|
321+
| 2025-11-04 | 初始版本,从 SDK 改造为 FIT 框架实现 | 黄可欣 |
322+
| 2025-11-05 | 代码重构,提取9个辅助方法提高可读性和可维护性 | 黄可欣 |

framework/fel/java/plugins/tool-mcp-server/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@
4141
<groupId>org.fitframework.fel</groupId>
4242
<artifactId>tool-mcp-common</artifactId>
4343
</dependency>
44+
<dependency>
45+
<groupId>io.modelcontextprotocol.sdk</groupId>
46+
<artifactId>mcp</artifactId>
47+
<version>0.14.1</version>
48+
</dependency>
4449

4550
<!-- Test -->
4651
<dependency>

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@
66

77
package modelengine.fel.tool.mcp.server;
88

9-
import modelengine.fel.tool.mcp.entity.ServerSchema;
109
import modelengine.fel.tool.mcp.entity.Tool;
1110

1211
import java.util.List;
13-
import java.util.Map;
1412

1513
/**
1614
* Represents the MCP Server.
@@ -19,29 +17,13 @@
1917
* @since 2025-05-15
2018
*/
2119
public interface McpServer {
22-
/**
23-
* Gets MCP server schema.
24-
*
25-
* @return The MCP server schema as a {@link ServerSchema}.
26-
*/
27-
ServerSchema getSchema();
28-
2920
/**
3021
* Gets MCP server tools.
3122
*
3223
* @return The MCP server tools as a {@link List}{@code <}{@link Tool}{@code >}.
3324
*/
3425
List<Tool> getTools();
3526

36-
/**
37-
* Calls MCP server tool.
38-
*
39-
* @param name The tool name as a {@link String}.
40-
* @param arguments The tool arguments as a {@link Map}{@code <}{@link String}{@code , }{@link Object}{@code >}.
41-
* @return The tool result as a {@link Object}.
42-
*/
43-
Object callTool(String name, Map<String, Object> arguments);
44-
4527
/**
4628
* Registers MCP server tools changed observer.
4729
*

0 commit comments

Comments
 (0)