Skip to content

Commit d10b0aa

Browse files
committed
transport类文档
1 parent d8e52be commit d10b0aa

File tree

1 file changed

+394
-0
lines changed

1 file changed

+394
-0
lines changed
Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
# FitMcpStreamableServerTransportProvider类维护文档
2+
3+
## 文档概述
4+
5+
本文档用于记录 `FitMcpStreamableServerTransportProvider` 类的设计、实现细节以及维护更新指南。该类是基于 MCP SDK 中的 `WebMvcStreamableServerTransportProvider` 类改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。
6+
7+
**原始参考类**: MCP SDK 中的 `WebMvcStreamableServerTransportProvider` (或 `HttpServletStreamableServerTransportProvider`)
8+
9+
**作者**: 黄可欣
10+
**创建时间**: 2025-11-04
11+
12+
---
13+
14+
## 类的作用和职责
15+
16+
`FitMcpStreamableServerTransportProvider` 是 MCP 服务端传输层的核心实现类,负责:
17+
18+
1. **HTTP 端点处理**: 处理 GET、POST、DELETE 请求,实现 MCP 协议的 HTTP 传输层
19+
2. **会话管理**: 管理客户端会话的生命周期(创建、维护、销毁)
20+
3. **SSE 通信**: 通过 Server-Sent Events (SSE) 实现服务端到客户端的实时消息推送
21+
4. **消息序列化**: 处理 JSON-RPC 消息的序列化和反序列化
22+
5. **连接保活**: 支持可选的 Keep-Alive 机制
23+
6. **优雅关闭**: 支持服务的优雅关闭和资源清理
24+
25+
---
26+
27+
## 类结构概览
28+
29+
### 主要成员变量
30+
31+
| 变量名 | 类型 | 来源 | 说明 |
32+
|--------|------|------|------|
33+
| `MESSAGE_ENDPOINT` | `String` | SDK 原始 | 消息端点路径 `/mcp/streamable` |
34+
| `disallowDelete` | `boolean` | SDK 原始 | 是否禁用 DELETE 请求 |
35+
| `jsonMapper` | `McpJsonMapper` | SDK 原始 | JSON 序列化器 |
36+
| `contextExtractor` | `McpTransportContextExtractor<HttpClassicServerRequest>` | **FIT 改造** | 上下文提取器(泛型参数改为 FIT 的 Request 类型) |
37+
| `keepAliveScheduler` | `KeepAliveScheduler` | SDK 原始 | Keep-Alive 调度器 |
38+
| `sessionFactory` | `McpStreamableServerSession.Factory` | SDK 原始 | 会话工厂 |
39+
| `sessions` | `Map<String, McpStreamableServerSession>` | SDK 原始 | 活跃会话映射表 |
40+
| `isClosing` | `volatile boolean` | SDK 原始 | 关闭标志 |
41+
42+
### 主要方法
43+
44+
| 方法名 | 来源 | 说明 |
45+
|--------|------|------|
46+
| `protocolVersions()` | SDK 原始 | 返回支持的 MCP 协议版本 |
47+
| `setSessionFactory()` | SDK 原始 | 设置会话工厂 |
48+
| `notifyClients()` | SDK 原始 | 广播通知到所有客户端 |
49+
| `closeGracefully()` | SDK 原始 | 优雅关闭传输层 |
50+
| `handleGet()` | **FIT 改造** | 处理 GET 请求(SSE 连接) |
51+
| `handlePost()` | **FIT 改造** | 处理 POST 请求(JSON-RPC 消息) |
52+
| `handleDelete()` | **FIT 改造** | 处理 DELETE 请求(会话删除) |
53+
| `deserializeJsonRpcMessage()` | **FIT 创建** | 反序列化 JSON-RPC 消息 |
54+
55+
### 内部类
56+
57+
| 类名 | 来源 | 说明 |
58+
|------|------|------|
59+
| `FitStreamableMcpSessionTransport` | **FIT 改造** | 用于SSE 会话`sendMessage()`传输实现 |
60+
| `Builder` | SDK 原始 | 构建器模式 |
61+
62+
---
63+
64+
## SDK 原始逻辑
65+
66+
以下是从 MCP SDK 的 `WebMvcStreamableServerTransportProvider` 类保留的原始逻辑:
67+
68+
### 1. 会话管理核心逻辑
69+
```java
70+
private final Map<String, McpStreamableServerSession> sessions = new ConcurrentHashMap<>();
71+
```
72+
- 使用 `ConcurrentHashMap` 存储活跃会话
73+
- 会话以 `mcp-session-id` 作为键
74+
75+
### 2. 会话工厂设置
76+
```java
77+
public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) {
78+
this.sessionFactory = sessionFactory;
79+
}
80+
```
81+
- 由外部设置会话工厂,用于创建新会话
82+
83+
### 3. 客户端通知
84+
```java
85+
public Mono<Void> notifyClients(String method, Object params) {
86+
// ... 广播逻辑
87+
}
88+
```
89+
- 向所有活跃会话并行发送通知
90+
- 使用 `parallelStream()` 提高效率
91+
- 单个会话失败不影响其他会话
92+
93+
### 4. HTTP 端点处理核心流程
94+
95+
#### a. GET 请求处理流程(原始逻辑)
96+
97+
1. 检查 Accept 头是否包含 `text/event-stream`
98+
2. 验证 `mcp-session-id` 头是否存在
99+
3. 查找对应的会话
100+
4. 检查是否是重放请求(`Last-Event-ID` 头)
101+
5. 建立 SSE 连接或重放消息
102+
103+
#### b. POST 请求处理流程(原始逻辑)
104+
105+
1. 检查 Accept 头
106+
2. 反序列化 JSON-RPC 消息
107+
3. 特殊处理 `initialize` 请求(创建新会话)
108+
4. 处理其他请求(需要已存在的会话)
109+
5. 根据消息类型(Response/Notification/Request)分别处理
110+
111+
#### c. DELETE 请求处理流程(原始逻辑)
112+
113+
1. 检查是否禁用 DELETE
114+
2. 验证 `mcp-session-id`
115+
3. 查找并删除会话
116+
117+
### 5. 关闭逻辑
118+
```java
119+
public Mono<Void> closeGracefully() {
120+
this.isClosing = true;
121+
// ... 关闭所有会话
122+
// ... 关闭 keep-alive 调度器
123+
}
124+
```
125+
- 设置关闭标志
126+
- 关闭所有活跃会话
127+
- 清理资源
128+
129+
### 6. Keep-Alive 机制
130+
```java
131+
if (keepAliveInterval != null) {
132+
this.keepAliveScheduler = KeepAliveScheduler.builder(...)
133+
.initialDelay(keepAliveInterval)
134+
.interval(keepAliveInterval)
135+
.build();
136+
this.keepAliveScheduler.start();
137+
}
138+
```
139+
- 支持可选的 Keep-Alive 调度
140+
141+
142+
143+
## FIT 框架新增/改造逻辑
144+
145+
以下是为适配 FIT 框架而新增或改造的部分:
146+
147+
### 1. HTTP 类替换(重要改造)
148+
149+
**原始 SDK(Spring MVC)**:
150+
151+
```java
152+
@GetMapping("/mcp/streamable")
153+
public ResponseEntity<SseEmitter> handleGet(HttpServletRequest request, HttpServletResponse response)
154+
155+
@PostMapping("/mcp/streamable")
156+
public ResponseEntity<?> handlePost(HttpServletRequest request, @RequestBody Map<String, Object> body)
157+
158+
@DeleteMapping("/mcp/streamable")
159+
public ResponseEntity<Void> handleDelete(HttpServletRequest request)
160+
```
161+
162+
**FIT 框架改造后**:
163+
```java
164+
@GetMapping(path = MESSAGE_ENDPOINT)
165+
public Object handleGet(HttpClassicServerRequest request, HttpClassicServerResponse response)
166+
167+
@PostMapping(path = MESSAGE_ENDPOINT)
168+
public Object handlePost(HttpClassicServerRequest request, HttpClassicServerResponse response,
169+
@RequestBody Map<String, Object> requestBody)
170+
171+
@DeleteMapping(path = MESSAGE_ENDPOINT)
172+
public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerResponse response)
173+
```
174+
175+
**关键变化**:
176+
- 使用 FIT 的注解:`@GetMapping`, `@PostMapping`, `@DeleteMapping`
177+
- 请求/响应对象类型变更:
178+
- `HttpServletRequest``HttpClassicServerRequest`
179+
- `HttpServletResponse``HttpClassicServerResponse`
180+
- 返回类型改为通用的 `Object`,支持多种返回形式
181+
182+
### 2. SSE 实现改造(核心改造)
183+
184+
**原始 SDK (Spring MVC)**:
185+
```java
186+
SseEmitter sseEmitter = new SseEmitter();
187+
sseEmitter.send(SseEmitter.event()
188+
.id(messageId)
189+
.name("message")
190+
.data(jsonText));
191+
sseEmitter.complete();
192+
```
193+
194+
**FIT 框架改造**:
195+
```java
196+
// 使用 Choir 和 Emitter 实现 SSE
197+
Choir.<TextEvent>create(emitter -> {
198+
// 创建 TextEvent 并发送
199+
TextEvent textEvent = TextEvent.custom()
200+
.id(sessionId)
201+
.event(Event.MESSAGE.code())
202+
.data(jsonText)
203+
.build();
204+
emitter.emit(textEvent);
205+
206+
// 监听 Emitter 的生命周期
207+
emitter.observe(new Emitter.Observer<TextEvent>() {
208+
@Override
209+
public void onEmittedData(TextEvent data) {
210+
// 数据发送完成
211+
}
212+
213+
@Override
214+
public void onCompleted() {
215+
// SSE 流正常结束
216+
listeningStream.close();
217+
}
218+
219+
@Override
220+
public void onFailed(Exception cause) {
221+
// SSE 流异常结束
222+
listeningStream.close();
223+
}
224+
});
225+
});
226+
```
227+
228+
**关键变化**:
229+
- 使用 `Choir<TextEvent>` 返回事件流
230+
- 使用 `Emitter<TextEvent>` 替代 `SseEmitter` 的发送方法
231+
- 使用 `Emitter.Observer` 监听 SSE 生命周期事件
232+
233+
### 3. HTTP 响应处理改造
234+
235+
**FIT 特有的响应方式**:
236+
237+
#### 返回纯文本
238+
239+
```java
240+
response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode());
241+
return Entity.createText(response, "Session ID required in mcp-session-id header");
242+
```
243+
244+
#### 返回 JSON 对象
245+
246+
```java
247+
response.statusCode(HttpResponseStatus.NOT_FOUND.statusCode());
248+
return Entity.createObject(response,
249+
McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS)
250+
.message("Session not found: " + sessionId)
251+
.build());
252+
```
253+
254+
#### 返回 SSE 流(重要改造)
255+
256+
```java
257+
return Choir.<TextEvent>create(emitter -> {
258+
// 使用 FIT 的 Emitter 发送 SSE 事件
259+
emitter.emit(textEvent);
260+
emitter.complete();
261+
emitter.fail(exception);
262+
});
263+
```
264+
265+
### 4. HTTP 头处理改造
266+
267+
**FIT 框架的 Headers API**:
268+
```java
269+
// 获取 Header
270+
String acceptHeaders = request.headers().first(MessageHeaderNames.ACCEPT).orElse("");
271+
boolean hasSessionId = request.headers().contains(HttpHeaders.MCP_SESSION_ID);
272+
String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse("");
273+
274+
// 设置 Header
275+
response.headers().set("Content-Type", MimeType.APPLICATION_JSON.value());
276+
response.headers().set(HttpHeaders.MCP_SESSION_ID, sessionId);
277+
278+
// 设置状态码
279+
response.statusCode(HttpResponseStatus.OK.statusCode());
280+
```
281+
282+
**变化**:
283+
284+
- 使用 `request.headers().first(name).orElse(default)` 获取单个 Header
285+
- 使用 `request.headers().contains(name)` 检查 Header 是否存在
286+
- 使用 FIT 的 `MessageHeaderNames``MimeType` 常量
287+
- 使用 `HttpResponseStatus` 枚举设置状态码
288+
289+
### 5. 内部类 Transport 实现
290+
291+
`FitStreamableMcpSessionTransport` 类的核心职责是发送SSE事件:
292+
293+
- `sendmessage()`方法通过`Emitter<TextEvent>` 发送SSE消息到客户端
294+
- 保存了当前会话的事件的`Emitter<TextEvent>`,负责close时关闭`Emitter<TextEvent>`
295+
296+
- SSE的`Emitter<TextEvent>`感知不到GET连接是否断开,因此在`sendmessage()`发送前检查GET连接是否活跃
297+
298+
```java
299+
// 在发送消息前检查连接是否仍然活跃
300+
if (!this.response.isActive()) {
301+
logger.warn("[SSE] Connection inactive detected while sending message for session: {}",
302+
this.sessionId);
303+
this.close();
304+
return;
305+
}
306+
```
307+
308+
### 6. JSON-RPC 消息反序列化
309+
310+
```java
311+
public McpSchema.JSONRPCMessage deserializeJsonRpcMessage(Map<String, Object> map) {
312+
// 根据字段判断消息类型
313+
if (map.containsKey("method") && map.containsKey("id")) {
314+
return jsonMapper.convertValue(map, McpSchema.JSONRPCRequest.class);
315+
} else if (map.containsKey("method") && !map.containsKey("id")) {
316+
return jsonMapper.convertValue(map, McpSchema.JSONRPCNotification.class);
317+
} else if (map.containsKey("result") || map.containsKey("error")) {
318+
return jsonMapper.convertValue(map, McpSchema.JSONRPCResponse.class);
319+
}
320+
throw new IllegalArgumentException(...);
321+
}
322+
```
323+
324+
- 智能识别 JSON-RPC 消息类型
325+
326+
327+
328+
## 代码结构对照表
329+
330+
| 功能模块 | 改造程度 | SDK 原始实现 | FIT 框架实现 |
331+
|---------|---------|-------------|-------------|
332+
| SSE 实现 | **重大改造** | `SseEmitter` | `Choir<TextEvent>` + `Emitter` |
333+
| HTTP 请求对象 | **重大改造** | `HttpServletRequest` | `HttpClassicServerRequest` |
334+
| HTTP 响应对象 | **重大改造** | `HttpServletResponse` | `HttpClassicServerResponse` |
335+
| HTTP返回类型 | **重大改造** | `ResponseEntity<?>` | `Object` (`Entity`或者`Choir`) |
336+
| Get连接检测 | 新增 || `response.isActive()` |
337+
| 验证工具 | 新增 | 无或其他 | FIT Validation |
338+
| 日志系统 | 轻微改造 | SLF4J | FIT Logger |
339+
| Builder 模式 | 轻微改造 | 原始逻辑 | 类型参数调整 |
340+
| HTTP 注解 | 无变化 | `@GetMapping` (Spring) | `@GetMapping` (FIT) |
341+
| 接口实现 | 无变化 | `McpStreamableServerTransportProvider` | 相同 |
342+
| 会话管理 | 无变化 | 原始逻辑 | 相同 |
343+
| 消息序列化 | 无变化 | 原始逻辑 | 相同 |
344+
| Keep-Alive | 无变化 | 原始逻辑 | 相同 |
345+
346+
347+
348+
## 参考资源
349+
350+
### MCP 协议文档
351+
- MCP 协议规范:[https://spec.modelcontextprotocol.io/](https://spec.modelcontextprotocol.io/)
352+
- MCP SDK GitHub: [https://github.com/modelcontextprotocol/](https://github.com/modelcontextprotocol/)
353+
354+
### FIT 框架文档
355+
- FIT HTTP 模块文档:`docs/framework/fit/java/user-guide-book/04. Web MVC 能力.md`
356+
- FIT 流式功能文档:`docs/framework/fit/java/user-guide-book/10. 流式功能.md`
357+
- FIT 日志文档:`docs/framework/fit/java/user-guide-book/08. 日志.md`
358+
359+
### 相关类文档
360+
- `Event` 枚举定义:`modelengine.fel.tool.mcp.entity.Event`
361+
- MCP Server 工具其他实现:`framework/fel/java/plugins/tool-mcp-server/`
362+
363+
---
364+
365+
## 附录:快速定位指南
366+
367+
### 查找某个功能的实现位置
368+
369+
| 功能 | 方法/类 | 行号范围 |
370+
|------|--------|---------|
371+
| 协议版本声明 | `protocolVersions()` | 112-116 |
372+
| 客户端广播 | `notifyClients()` | 133-150 |
373+
| 优雅关闭 | `closeGracefully()` | 158-178 |
374+
| GET 请求处理 | `handleGet()` | 188-289 |
375+
| POST 请求处理 | `handlePost()` | 300-430 |
376+
| DELETE 请求处理 | `handleDelete()` | 440-481 |
377+
| 消息反序列化 | `deserializeJsonRpcMessage()` | 490-500 |
378+
| SSE 传输实现 | `FitStreamableMcpSessionTransport` | 511-644 |
379+
| 构建器 | `Builder` | 653-729 |
380+
381+
### 查找某个 FIT 改造点
382+
383+
| 改造内容 | 位置 |
384+
|---------|------|
385+
| HTTP 注解 | 187, 299, 439 行 |
386+
| Entity 响应 | 191, 197, 212, 304, 311, 等 |
387+
| Choir SSE | 221, 383 行 |
388+
| Emitter 观察者 | 256-281, 385-400 行 |
389+
| 连接状态检测 | 570-575 行 |
390+
| FIT Logger | 55, 全文多处 |
391+
| FIT Validation | 92-93, 668, 696, 722 行 |
392+
393+
394+

0 commit comments

Comments
 (0)