22
33## 文档概述
44
5- 本文档用于记录 ` FitMcpStreamableServerTransportProvider ` 类的设计、实现细节以及维护更新指南。该类是基于 MCP SDK 中的 ` WebMvcStreamableServerTransportProvider ` 类改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。
5+ 本文档用于记录 ` FitMcpStreamableServerTransportProvider ` 类的设计、实现细节以及维护更新指南。该类是基于 MCP SDK 中的
6+ ` WebMvcStreamableServerTransportProvider ` 类改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。
67
7- ** 原始参考类** : MCP SDK 中的 ` WebMvcStreamableServerTransportProvider ` (或 ` HttpServletStreamableServerTransportProvider ` )
8+ ** 原始参考类** : MCP SDK 中的 ` WebMvcStreamableServerTransportProvider ` (或
9+ ` HttpServletStreamableServerTransportProvider ` )
810
9- ** 作者** : 黄可欣
1011** 创建时间** : 2025-11-04
1112
1213---
2829
2930### 主要成员变量
3031
31- | 变量名 | 类型 | 来源 | 说明 |
32- | --------| ------| ------| ------|
33- | ` MESSAGE_ENDPOINT ` | ` String ` | SDK 原始 | 消息端点路径 ` /mcp/streamable ` |
34- | ` disallowDelete ` | ` boolean ` | SDK 原始 | 是否禁用 DELETE 请求 |
35- | ` jsonMapper ` | ` McpJsonMapper ` | SDK 原始 | JSON 序列化器 |
36- | ` contextExtractor ` | ` McpTransportContextExtractor<HttpClassicServerRequest> ` | ** FIT 改造** | 上下文提取器(泛型参数改为 FIT 的 Request 类型) |
37- | ` keepAliveScheduler ` | ` KeepAliveScheduler ` | SDK 原始 | Keep-Alive 调度器 |
38- | ` sessionFactory ` | ` McpStreamableServerSession.Factory ` | SDK 原始 | 会话工厂 |
39- | ` sessions ` | ` Map<String, McpStreamableServerSession> ` | SDK 原始 | 活跃会话映射表 |
40- | ` isClosing ` | ` volatile boolean ` | SDK 原始 | 关闭标志 |
32+ | 变量名 | 类型 | 来源 | 说明 |
33+ | ---------------------- | ---------------------------------------------------------- | ------------ | --------------------------- ------|
34+ | ` MESSAGE_ENDPOINT ` | ` String ` | SDK 原始 | 消息端点路径 ` /mcp/streamable ` |
35+ | ` disallowDelete ` | ` boolean ` | SDK 原始 | 是否禁用 DELETE 请求 |
36+ | ` jsonMapper ` | ` McpJsonMapper ` | SDK 原始 | JSON 序列化器 |
37+ | ` contextExtractor ` | ` McpTransportContextExtractor<HttpClassicServerRequest> ` | ** FIT 改造** | 上下文提取器(泛型参数改为 FIT 的 Request 类型) |
38+ | ` keepAliveScheduler ` | ` KeepAliveScheduler ` | SDK 原始 | Keep-Alive 调度器 |
39+ | ` sessionFactory ` | ` McpStreamableServerSession.Factory ` | SDK 原始 | 会话工厂 |
40+ | ` sessions ` | ` Map<String, McpStreamableServerSession> ` | SDK 原始 | 活跃会话映射表 |
41+ | ` isClosing ` | ` volatile boolean ` | SDK 原始 | 关闭标志 |
4142
4243### 主要方法
4344
44- | 方法名 | 来源 | 说明 |
45- | --------| ------| ------|
46- | ` protocolVersions() ` | SDK 原始 | 返回支持的 MCP 协议版本 |
47- | ` setSessionFactory() ` | SDK 原始 | 设置会话工厂 |
48- | ` notifyClients() ` | SDK 原始 | 广播通知到所有客户端 |
49- | ` closeGracefully() ` | SDK 原始 | 优雅关闭传输层 |
50- | ` handleGet() ` | ** FIT 改造** | 处理 GET 请求(SSE 连接) |
51- | ` handlePost() ` | ** FIT 改造** | 处理 POST 请求(JSON-RPC 消息) |
52- | ` handleDelete() ` | ** FIT 改造** | 处理 DELETE 请求(会话删除) |
53- | ` deserializeJsonRpcMessage() ` | ** FIT 创建** | 反序列化 JSON-RPC 消息 |
45+ | 方法名 | 来源 | 说明 |
46+ | ------------------------------- | ------------ | ------------------- ------|
47+ | ` protocolVersions() ` | SDK 原始 | 返回支持的 MCP 协议版本 |
48+ | ` setSessionFactory() ` | SDK 原始 | 设置会话工厂 |
49+ | ` notifyClients() ` | SDK 原始 | 广播通知到所有客户端 |
50+ | ` closeGracefully() ` | SDK 原始 | 优雅关闭传输层 |
51+ | ` handleGet() ` | ** FIT 改造** | 处理 GET 请求(SSE 连接) |
52+ | ` handlePost() ` | ** FIT 改造** | 处理 POST 请求(JSON-RPC 消息) |
53+ | ` handleDelete() ` | ** FIT 改造** | 处理 DELETE 请求(会话删除) |
54+ | ` deserializeJsonRpcMessage() ` | ** FIT 创建** | 反序列化 JSON-RPC 消息 |
5455
5556### 内部类
5657
57- | 类名 | 来源 | 说明 |
58- | ------| ------| ------|
58+ | 类名 | 来源 | 说明 |
59+ | ------------------------------------ | ------------ | ----------------------- ------|
5960| ` FitStreamableMcpSessionTransport ` | ** FIT 改造** | 用于SSE 会话` sendMessage() ` 传输实现 |
60- | ` Builder ` | SDK 原始 | 构建器模式 |
61+ | ` Builder ` | SDK 原始 | 构建器模式 |
6162
6263---
6364
6667以下是从 MCP SDK 的 ` WebMvcStreamableServerTransportProvider ` 类保留的原始逻辑:
6768
6869### 1. 会话管理核心逻辑
70+
6971``` java
7072private final Map<String , McpStreamableServerSession > sessions = new ConcurrentHashMap<> ();
7173```
74+
7275- 使用 ` ConcurrentHashMap ` 存储活跃会话
7376- 会话以 ` mcp-session-id ` 作为键
7477
7578### 2. 会话工厂设置
79+
7680``` java
7781public void setSessionFactory(McpStreamableServerSession . Factory sessionFactory) {
7882 this . sessionFactory = sessionFactory;
7983}
8084```
85+
8186- 由外部设置会话工厂,用于创建新会话
8287
8388### 3. 客户端通知
89+
8490``` java
8591public Mono<Void > notifyClients(String method, Object params) {
8692 // ... 广播逻辑
8793}
8894```
95+
8996- 向所有活跃会话并行发送通知
9097- 使用 ` parallelStream() ` 提高效率
9198- 单个会话失败不影响其他会话
@@ -115,30 +122,32 @@ public Mono<Void> notifyClients(String method, Object params) {
1151223 . 查找并删除会话
116123
117124### 5. 关闭逻辑
125+
118126``` java
119127public Mono<Void > closeGracefully() {
120128 this . isClosing = true ;
121129 // ... 关闭所有会话
122130 // ... 关闭 keep-alive 调度器
123131}
124132```
133+
125134- 设置关闭标志
126135- 关闭所有活跃会话
127136- 清理资源
128137
129138### 6. Keep-Alive 机制
139+
130140``` java
131- if (keepAliveInterval != null ) {
132- this . keepAliveScheduler = KeepAliveScheduler . builder(... )
141+ if (keepAliveInterval != null ){
142+ this . keepAliveScheduler = KeepAliveScheduler . builder(... )
133143 .initialDelay(keepAliveInterval)
134144 .interval(keepAliveInterval)
135145 .build();
136146 this . keepAliveScheduler. start();
137147}
138148```
139- - 支持可选的 Keep-Alive 调度
140-
141149
150+ - 支持可选的 Keep-Alive 调度
142151
143152## FIT 框架新增/改造逻辑
144153
@@ -149,6 +158,7 @@ if (keepAliveInterval != null) {
149158** 原始 SDK(Spring MVC)** :
150159
151160``` java
161+
152162@GetMapping (" /mcp/streamable" )
153163public ResponseEntity<SseEmitter > handleGet(HttpServletRequest request, HttpServletResponse response)
154164
@@ -160,7 +170,9 @@ public ResponseEntity<Void> handleDelete(HttpServletRequest request)
160170```
161171
162172** FIT 框架改造后** :
173+
163174``` java
175+
164176@GetMapping (path = MESSAGE_ENDPOINT )
165177public Object handleGet(HttpClassicServerRequest request, HttpClassicServerResponse response)
166178
@@ -173,6 +185,7 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe
173185```
174186
175187** 关键变化** :
188+
176189- 使用 FIT 的注解:` @GetMapping ` , ` @PostMapping ` , ` @DeleteMapping `
177190- 请求/响应对象类型变更:
178191 - ` HttpServletRequest ` → ` HttpClassicServerRequest `
@@ -182,6 +195,7 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe
182195### 2. SSE 实现改造(核心改造)
183196
184197** 原始 SDK (Spring MVC)** :
198+
185199``` java
186200SseEmitter sseEmitter = new SseEmitter ();
187201sseEmitter. send(SseEmitter . event()
@@ -192,41 +206,43 @@ sseEmitter.complete();
192206```
193207
194208** FIT 框架改造** :
209+
195210``` java
196211// 使用 Choir 和 Emitter 实现 SSE
197212Choir . < TextEvent > create(emitter - > {
198213 // 创建sessionTransport类,用于调用emitter发送消息
199214 FitStreamableMcpSessionTransport sessionTransport =
200215 new FitStreamableMcpSessionTransport (sessionId, emitter, response);
201-
216+
202217 // session的逻辑是SDK原有的,里面会调用sessionTransport发送事件流
203218 session. responseStream(jsonrpcRequest, sessionTransport)
204- .contextWrite(ctx - > ctx. put(McpTransportContext . KEY , transportContext))
205- .block();
206-
219+ .contextWrite(ctx - > ctx. put(McpTransportContext . KEY , transportContext))
220+ .block();
221+
207222 // 监听 Emitter 的生命周期
208223 emitter. observe(new Emitter .Observer<TextEvent > () {
209- @Override
210- public void onEmittedData (TextEvent data ) {
211- // 数据发送完成
212- }
224+ @Override
225+ public void onEmittedData (TextEvent data ) {
226+ // 数据发送完成
227+ }
213228
214- @Override
215- public void onCompleted () {
216- // SSE 流正常结束
217- listeningStream. close();
218- }
229+ @Override
230+ public void onCompleted () {
231+ // SSE 流正常结束
232+ listeningStream. close();
233+ }
219234
220- @Override
221- public void onFailed (Exception cause ) {
222- // SSE 流异常结束
223- listeningStream. close();
224- }
235+ @Override
236+ public void onFailed (Exception cause ) {
237+ // SSE 流异常结束
238+ listeningStream. close();
239+ }
225240 });
226241});
227242```
228243
229244** 关键变化** :
245+
230246- 使用 ` Choir<TextEvent> ` 返回事件流
231247- 使用 ` Emitter<TextEvent> ` 替代 ` SseEmitter ` 的发送方法
232248- 使用 ` Emitter.Observer ` 监听 SSE 生命周期事件
@@ -246,34 +262,32 @@ return Entity.createText(response, "Session ID required in mcp-session-id header
246262
247263``` java
248264response. statusCode(HttpResponseStatus . NOT_FOUND. statusCode());
249- return Entity . createObject(response,
250- McpError . builder(McpSchema . ErrorCodes . INVALID_PARAMS )
251- .message(" Session not found: " + sessionId)
265+ return Entity . createObject(response, McpError . builder(McpSchema . ErrorCodes . INVALID_PARAMS )
266+ .message(" Session not found: " + sessionId)
252267 .build());
253268```
254269
255270#### 返回 SSE 流(重要改造)
256271
257272``` java
258- return Choir . < TextEvent > create(emitter - > {
259- // 使用 FIT 的 Emitter 发送 SSE 事件
273+ return Choir . < TextEvent > create(emitter - > {
274+ // emitter封装在sessionTransport中,被session调用
260275 emitter. emit(textEvent);
261- emitter. complete();
262- emitter. fail(exception);
263276});
264277```
265278
266279### 4. HTTP 头处理改造
267280
268281** FIT 框架的 Headers API** :
282+
269283``` java
270284// 获取 Header
271285String acceptHeaders = request. headers(). first(MessageHeaderNames . ACCEPT ). orElse(" " );
272286boolean hasSessionId = request. headers(). contains(HttpHeaders . MCP_SESSION_ID );
273287String sessionId = request. headers(). first(HttpHeaders . MCP_SESSION_ID ). orElse(" " );
274288
275289// 设置 Header
276- response. headers(). set(" Content-Type" , MimeType . APPLICATION_JSON. value());
290+ response. headers(). set(" Content-Type" ,MimeType . APPLICATION_JSON. value());
277291response. headers(). set(HttpHeaders . MCP_SESSION_ID , sessionId);
278292
279293// 设置状态码
@@ -298,9 +312,9 @@ response.statusCode(HttpResponseStatus.OK.statusCode());
298312
299313``` java
300314// 在发送消息前检查连接是否仍然活跃
301- if (! this . response. isActive()) {
315+ if (! this . response. isActive()){
302316 logger. warn(" [SSE] Connection inactive detected while sending message for session: {}" ,
303- this . sessionId);
317+ this . sessionId);
304318 this . close();
305319 return ;
306320}
@@ -324,72 +338,37 @@ public McpSchema.JSONRPCMessage deserializeJsonRpcMessage(Map<String, Object> ma
324338
325339- 智能识别 JSON-RPC 消息类型
326340
327-
328-
329341## 代码结构对照表
330342
331- | 功能模块 | 改造程度 | SDK 原始实现 | FIT 框架实现 |
332- | ---------| ---------| -------------| -------------|
333- | SSE 实现 | ** 重大改造** | ` SseEmitter ` | ` Choir<TextEvent> ` + ` Emitter ` |
334- | HTTP 请求对象 | ** 重大改造** | ` HttpServletRequest ` | ` HttpClassicServerRequest ` |
335- | HTTP 响应对象 | ** 重大改造** | ` HttpServletResponse ` | ` HttpClassicServerResponse ` |
336- | HTTP返回类型 | ** 重大改造** | ` ResponseEntity<?> ` | ` Object ` (` Entity ` 或者` Choir ` ) |
337- | Get连接检测 | 新增 | 无 | ` response.isActive() ` |
338- | 验证工具 | 新增 | 无或其他 | FIT Validation |
339- | 日志系统 | 轻微改造 | SLF4J | FIT Logger |
340- | Builder 模式 | 轻微改造 | 原始逻辑 | 类型参数调整 |
341- | HTTP 注解 | 无变化 | ` @GetMapping ` (Spring) | ` @GetMapping ` (FIT) |
342- | 接口实现 | 无变化 | ` McpStreamableServerTransportProvider ` | 相同 |
343- | 会话管理 | 无变化 | 原始逻辑 | 相同 |
344- | 消息序列化 | 无变化 | 原始逻辑 | 相同 |
345- | Keep-Alive | 无变化 | 原始逻辑 | 相同 |
346-
347-
343+ | 功能模块 | 改造程度 | SDK 原始实现 | FIT 框架实现 |
344+ | ------------| ----------| ----------------------------------------| --------------------------------|
345+ | SSE 实现 | ** 重大改造** | ` SseEmitter ` | ` Choir<TextEvent> ` + ` Emitter ` |
346+ | HTTP 请求对象 | ** 重大改造** | ` HttpServletRequest ` | ` HttpClassicServerRequest ` |
347+ | HTTP 响应对象 | ** 重大改造** | ` HttpServletResponse ` | ` HttpClassicServerResponse ` |
348+ | HTTP返回类型 | ** 重大改造** | ` ResponseEntity<?> ` | ` Object ` (` Entity ` 或者` Choir ` ) |
349+ | Get连接检测 | 新增 | 无 | ` response.isActive() ` |
350+ | 验证工具 | 新增 | 无或其他 | FIT Validation |
351+ | 日志系统 | 轻微改造 | SLF4J | FIT Logger |
352+ | Builder 模式 | 轻微改造 | 原始逻辑 | 类型参数调整 |
353+ | HTTP 注解 | 无变化 | ` @GetMapping ` (Spring) | ` @GetMapping ` (FIT) |
354+ | 接口实现 | 无变化 | ` McpStreamableServerTransportProvider ` | 相同 |
355+ | 会话管理 | 无变化 | 原始逻辑 | 相同 |
356+ | 消息序列化 | 无变化 | 原始逻辑 | 相同 |
357+ | Keep-Alive | 无变化 | 原始逻辑 | 相同 |
348358
349359## 参考资源
350360
351361### MCP 协议文档
362+
352363- MCP 协议规范:[ https://spec.modelcontextprotocol.io/ ] ( https://spec.modelcontextprotocol.io/ )
353364- MCP SDK GitHub: [ https://github.com/modelcontextprotocol/ ] ( https://github.com/modelcontextprotocol/ )
354365
355366### FIT 框架文档
367+
356368- FIT HTTP 模块文档:` docs/framework/fit/java/user-guide-book/04. Web MVC 能力.md `
357369- FIT 流式功能文档:` docs/framework/fit/java/user-guide-book/10. 流式功能.md `
358370- FIT 日志文档:` docs/framework/fit/java/user-guide-book/08. 日志.md `
359371
360372### 相关类文档
361373- ` Event ` 枚举定义:` modelengine.fel.tool.mcp.entity.Event `
362- - MCP Server 工具其他实现:` framework/fel/java/plugins/tool-mcp-server/ `
363-
364- ---
365-
366- ## 附录:快速定位指南
367-
368- ### 查找某个功能的实现位置
369-
370- | 功能 | 方法/类 | 行号范围 |
371- | ------| --------| ---------|
372- | 协议版本声明 | ` protocolVersions() ` | 112-116 |
373- | 客户端广播 | ` notifyClients() ` | 133-150 |
374- | 优雅关闭 | ` closeGracefully() ` | 158-178 |
375- | GET 请求处理 | ` handleGet() ` | 188-289 |
376- | POST 请求处理 | ` handlePost() ` | 300-430 |
377- | DELETE 请求处理 | ` handleDelete() ` | 440-481 |
378- | 消息反序列化 | ` deserializeJsonRpcMessage() ` | 490-500 |
379- | SSE 传输实现 | ` FitStreamableMcpSessionTransport ` | 511-644 |
380- | 构建器 | ` Builder ` | 653-729 |
381-
382- ### 查找某个 FIT 改造点
383-
384- | 改造内容 | 位置 |
385- | ---------| ------|
386- | HTTP 注解 | 187, 299, 439 行 |
387- | Entity 响应 | 191, 197, 212, 304, 311, 等 |
388- | Choir SSE | 221, 383 行 |
389- | Emitter 观察者 | 256-281, 385-400 行 |
390- | 连接状态检测 | 570-575 行 |
391- | FIT Logger | 55, 全文多处 |
392- | FIT Validation | 92-93, 668, 696, 722 行 |
393-
394-
395-
374+ - MCP Server 工具其他实现:` framework/fel/java/plugins/tool-mcp-server/ `
0 commit comments