Skip to content

Commit f45ee46

Browse files
authored
Add Pulsar Admin MCP server implementation with HTTP/STDIO transport and 70 tool commands (apache#17)
Fixes #PCIP-6 Implement the MCP for Pulsar Admin Tool Main Issue: #PCIP-6 Implement the MCP for Pulsar Admin Tool ### Motivation This PR introduces the **MCP (Model Context Protocol) server implementation** for Pulsar Admin operations. The motivation is to enable **AI assistants** (such as Claude Desktop) to interact with Apache Pulsar clusters using natural language. Through MCP, Pulsar administration capabilities are exposed via a standardized protocol, improving automation and intelligent integration. ### Modifications - Added **MCP server layer**: `StdioMCPServer` and `HttpMCPServer`, supporting **STDIO** and **HTTP Streaming** transports - Implemented **PulsarClientManager** to manage the lifecycle of `PulsarAdmin` and `PulsarClient`, with lazy initialization and thread safety - Developed **70 MCP tool commands**, covering: - Cluster / Tenant / Namespace / Topic / Subscription management - Schema operations - Monitoring and backlog analysis - Message send and receive - Introduced **BasePulsarTools** base class for parameter validation, error handling, and response encapsulation - Enhanced documentation with **Quick Start, FAQ, and natural language interaction demos** (README) ### Verifying this change - Verified in local Standalone Pulsar environment (`pulsar://localhost:6650`, `http://localhost:8080`) - Tested both transport modes: - **STDIO mode**: Successfully integrated with Claude Desktop - **HTTP mode**: Server responded correctly at `/mcp` - Example validation: - Successfully executed `list-tenants`, `create-namespace`, `get-topic-stats`, etc. - Verified schema upload and compatibility checks - Validated backlog analysis and health check tools
1 parent 78da165 commit f45ee46

38 files changed

+9231
-0
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<module>pulsar-metrics-contrib</module>
6464
<module>pulsar-auth-contrib</module>
6565
<module>pulsar-rpc-contrib</module>
66+
<module>pulsar-admin-mcp-contrib</module>
6667
</modules>
6768

6869
<dependencyManagement>
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
# Pulsar Admin MCP Contrib
2+
3+
基于 Model Context Protocol (MCP) 的 Apache Pulsar 管理服务端,支持 AI 助手通过统一接口管理 Pulsar 集群(支持 HTTP Streaming 和 STDIO 两种传输模式)。
4+
5+
## 快速开始
6+
7+
### 依赖
8+
9+
- Java 17+
10+
- Maven 3.6+
11+
- Pulsar 2.10+(3.x 优先)
12+
- MCP Java SDK 0.12.0
13+
- Jetty 11.0.20
14+
15+
## 0. 启动 Pulsar
16+
17+
### 方式 A: Docker
18+
```bash
19+
docker run -it --name pulsar -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:3.2.4 bin/pulsar standalone
20+
```
21+
22+
- **Service URL**: `pulsar://localhost:6650`
23+
- **Admin URL**: `http://localhost:8080`
24+
25+
### 方式 B: 本地二进制
26+
```bash
27+
bin/pulsar standalone
28+
```
29+
30+
## 1. 编译 Pulsar MCP
31+
32+
```bash
33+
mvn clean install -DskipTests -am -pl pulsar-admin-mcp-contrib
34+
```
35+
36+
**输出**`target/mcp-contrib-1.0.0-SNAPSHOT.jar`
37+
38+
## 2. 启动 MCP Server
39+
40+
### HTTP 模式(推荐:Web/远程)
41+
```bash
42+
java -jar pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar --transport http --port 8889
43+
```
44+
45+
**日志示例**
46+
```
47+
HTTP Streamable transport ready at http://localhost:8889/mcp/stream
48+
```
49+
50+
**健康检查**
51+
```bash
52+
curl -i http://localhost:8889/mcp/stream
53+
```
54+
55+
### STDIO 模式(推荐:Claude Desktop / 本地 IDE)
56+
```bash
57+
java -jar pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar --transport stdio
58+
```
59+
60+
## 3. 客户端配置
61+
62+
### Claude Desktop
63+
64+
#### Windows 配置
65+
66+
**配置文件位置**`%APPDATA%\Claude\claude_desktop_config.json`
67+
68+
**配置步骤**
69+
1. 打开 Claude Desktop 配置文件
70+
2. 添加以下配置到 `mcpServers` 部分:
71+
72+
```json
73+
{
74+
"mcpServers": {
75+
"pulsar-admin": {
76+
"command": "java",
77+
"args": ["-jar", "编译后的包所在的目录,例如 E:\\projects\\pulsar-admin-mcp-contrib\\target\\mcp-contrib-1.0.0-SNAPSHOT.jar", "--transport", "stdio"],
78+
"cwd": "项目所在目录,例如E:\\projects\\pulsar-admin-mcp-contrib",
79+
"env": {
80+
"PULSAR_SERVICE_URL": "pulsar://localhost:6650",
81+
"PULSAR_ADMIN_URL": "http://localhost:8080"
82+
}
83+
}
84+
}
85+
}
86+
```
87+
88+
#### macOS 配置
89+
90+
**配置文件位置**`~/Library/Application Support/Claude/claude_desktop_config.json`
91+
92+
**配置步骤**
93+
1. 打开 Claude Desktop 配置文件
94+
2. 添加以下配置到 `mcpServers` 部分:
95+
96+
```json
97+
{
98+
"mcpServers": {
99+
"pulsar-admin": {
100+
"command": "java",
101+
"args": ["-jar", "编译后的包所在的目录,例如 /Users/username/projects/pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar", "--transport", "stdio"],
102+
"cwd": "项目所在目录,例如/Users/username/projects/pulsar-admin-mcp-contrib",
103+
"env": {
104+
"PULSAR_SERVICE_URL": "pulsar://localhost:6650",
105+
"PULSAR_ADMIN_URL": "http://localhost:8080"
106+
}
107+
}
108+
}
109+
}
110+
```
111+
112+
**注意事项**
113+
- 请将 `编译后的包所在的目录` 替换为实际的 JAR 文件路径
114+
- 请将 `项目所在目录` 替换为实际的项目根目录路径
115+
- 确保 Java 环境变量已正确配置
116+
- Windows 使用反斜杠 `\`,macOS 使用正斜杠 `/`
117+
118+
### Cherry Studio
119+
120+
#### STDIO 模式配置
121+
同上
122+
123+
#### HTTP 模式配置
124+
125+
**配置步骤**
126+
1. 确保 MCP 服务器已启动(HTTP 模式)
127+
2. 在 Cherry Studio 中添加 HTTP 类型配置:
128+
129+
```json
130+
{
131+
"mcpServers": {
132+
"pulsar-admin-http": {
133+
"type": "http",
134+
"url": "http://localhost:8889/mcp"
135+
}
136+
}
137+
}
138+
```
139+
140+
**配置说明**
141+
- **STDIO 模式**:适合本地开发,需要指定完整的 JAR 文件路径
142+
- **HTTP 模式**:适合远程访问,需要先启动 HTTP 服务器
143+
- 两种模式的环境变量配置相同,用于指定 Pulsar 集群连接信息
144+
- Windows 使用反斜杠 `\`,macOS 使用正斜杠 `/`
145+
146+
## 4. 配置项
147+
148+
### 环境变量
149+
- `PULSAR_SERVICE_URL`(默认 `pulsar://localhost:6650`
150+
- `PULSAR_ADMIN_URL`(默认 `http://localhost:8080`
151+
152+
### 命令行参数
153+
- `--transport`:http / stdio
154+
- `--port`:HTTP 端口(默认 8889)
155+
156+
## 5. 工具清单
157+
158+
覆盖 **集群** / **租户** / **命名空间** / **主题** / **订阅** / **消息** / **Schema** / **监控** 8 大类,共 71 个工具:
159+
160+
### 集群管理(10 个工具)
161+
- `list-clusters` - 列出所有 Pulsar 集群及其状态
162+
- `get-cluster-info` - 获取特定集群的详细信息
163+
- `create-cluster` - 创建新的 Pulsar 集群
164+
- `update-cluster-config` - 更新现有集群的配置
165+
- `delete-cluster` - 按名称删除 Pulsar 集群
166+
- `get-cluster-stats` - 获取指定集群的统计信息
167+
- `list-brokers` - 列出集群中的所有活跃代理
168+
- `get-broker-stats` - 获取特定代理的统计信息
169+
- `get-cluster-failure-domain` - 获取集群的故障域
170+
- `set-cluster-failure-domain` - 设置或更新故障域配置
171+
172+
### 租户管理(6 个工具)
173+
- `list-tenants` - 列出所有 Pulsar 租户
174+
- `get-tenant-info` - 获取特定租户的信息
175+
- `create-tenant` - 创建新的 Pulsar 租户
176+
- `update-tenant` - 更新租户配置
177+
- `delete-tenant` - 删除特定租户
178+
- `get-tenant-stats` - 获取租户的统计信息
179+
180+
### 命名空间管理(10 个工具)
181+
- `list-namespaces` - 列出所有命名空间
182+
- `get-namespace-info` - 获取命名空间信息
183+
- `create-namespace` - 创建新的命名空间
184+
- `delete-namespace` - 删除命名空间
185+
- `set-retention-policy` - 为命名空间设置保留策略
186+
- `get-retention-policy` - 获取命名空间的保留策略
187+
- `set-backlog-quota` - 为命名空间设置积压配额
188+
- `get-backlog-quota` - 获取命名空间的积压配额
189+
- `clear-namespace-backlog` - 清除命名空间的积压
190+
- `get-namespace-stats` - 获取命名空间统计信息
191+
192+
### 主题管理(15 个工具)
193+
- `list-topics` - 列出所有主题
194+
- `create-topic` - 创建新主题
195+
- `delete-topic` - 删除主题
196+
- `get-topic-stats` - 获取主题统计信息
197+
- `get-topic-metadata` - 获取主题元数据
198+
- `update-topic-partitions` - 更新主题分区数量
199+
- `compact-topic` - 压缩主题
200+
- `unload-topic` - 卸载主题
201+
- `get-topic-backlog` - 获取主题积压信息
202+
- `expire-topic-messages` - 使主题中的消息过期
203+
- `peek-messages` - 从主题中查看消息
204+
- `peek-topic-messages` - 从主题中查看消息而不消费
205+
- `reset-topic-cursor` - 重置主题游标
206+
- `get-topic-internal-stats` - 获取主题内部统计信息
207+
- `get-partitioned-metadata` - 获取分区主题元数据
208+
209+
### 订阅管理(10 个工具)
210+
- `list-subscriptions` - 列出所有订阅
211+
- `create-subscription` - 创建新订阅
212+
- `delete-subscription` - 删除订阅
213+
- `get-subscription-stats` - 获取订阅统计信息
214+
- `reset-subscription-cursor` - 重置订阅游标
215+
- `skip-messages` - 跳过订阅中的消息
216+
- `expire-subscription-messages` - 使订阅中的消息过期
217+
- `pause-subscription` - 暂停订阅
218+
- `resume-subscription` - 恢复订阅
219+
- `unsubscribe` - 取消订阅主题
220+
221+
### 消息操作(8 个工具)
222+
- `peek-message` - 从订阅中查看消息而不确认
223+
- `examine-messages` - 检查主题中的消息而不消费
224+
- `skip-all-messages` - 跳过订阅中的所有消息
225+
- `expire-all-messages` - 使订阅中的所有消息过期
226+
- `get-message-backlog` - 获取订阅的消息积压数量
227+
- `send-message` - 向主题发送消息
228+
- `get-message-stats` - 获取主题或订阅的消息统计信息
229+
- `receive-messages` - 从主题接收消息
230+
231+
### Schema 管理(6 个工具)
232+
- `get-schema-info` - 获取主题的 Schema 信息
233+
- `get-schema-version` - 获取主题的 Schema 版本
234+
- `get-all-schema-versions` - 获取主题的所有 Schema 版本
235+
- `upload-schema` - 向主题上传新的 Schema
236+
- `delete-schema` - 删除主题的 Schema
237+
- `test-schema-compatibility` - 测试 Schema 兼容性
238+
239+
### 监控与诊断(6 个工具)
240+
- `monitor-cluster-performance` - 监控集群性能指标
241+
- `monitor-topic-performance` - 监控主题性能指标
242+
- `monitor-subscription-performance` - 监控订阅性能
243+
- `health-check` - 检查集群、主题和订阅的健康状态
244+
- `connection-diagnostics` - 运行不同测试深度的连接诊断
245+
- `backlog-analysis` - 分析命名空间内的消息积压
246+
247+
> **说明**:仅初始化 PulsarAdmin 时,消息发送/消费相关会返回 `not_implemented`;要启用需初始化 PulsarClient 并创建 producer/consumer。
248+
249+
## 自然语言交互 Demo(Use Cases)
250+
251+
下列示例展示在 MCP 客户端里,用自然语言触发工具调用的典型流程。实际返回字段随集群而异。
252+
253+
### 1. 租户与命名空间管理
254+
255+
**Prompt:**
256+
> 帮我看看集群里有哪些租户;在 tenant1 下创建命名空间 ns-orders,然后把这个命名空间的统计给我看看。
257+
258+
**触发:**
259+
`list-tenants``create-namespace(tenant=tenant1, namespace=ns-orders)``get-namespace-stats(...)`
260+
261+
### 2. 创建分区主题并扩容
262+
263+
**Prompt:**
264+
> 在 public/default 下建个主题 orders,分区数 8;然后把分区数扩到 16,给我分区元数据。
265+
266+
**触发:**
267+
`create-topic(partitions=8)``update-topic-partitions(16)``get-partitioned-metadata`
268+
269+
### 3. 清理积压并做 compaction
270+
271+
**Prompt:**
272+
> public/default/orders backlog 很大,清一下;再做一次 compaction。
273+
274+
**触发:**
275+
`get-topic-backlog``expire-topic-messages/clear-namespace-backlog``compact-topic`
276+
277+
### 4. Schema 上载与兼容性测试
278+
279+
**Prompt:**
280+
> 给 persistent://public/default/orders 设置 Avro schema:orderId:string, amount:double,并检查兼容性。
281+
282+
**触发:**
283+
`upload-schema(type=AVRO, schemaJson=...)``test-schema-compatibility` / `get-schema-info`
284+
285+
### 5. 订阅管理与游标重置
286+
287+
**Prompt:**
288+
> 在 orders 上创建 failover 订阅 sub-a,再把游标回拨到 1 小时前。
289+
290+
**触发:**
291+
`create-subscription(type=failover)``reset-subscription-cursor(timestamp=now-1h)`
292+
293+
## 最佳实践
294+
295+
- **Topic 命名**:完整名形如 `persistent://tenant/namespace/topic`。允许短名输入,服务端会规范化。
296+
297+
- **失败域**:为 Broker/Bookie 设置 Failure Domain,提升机架/可用区级容灾。
298+
299+
## 许可证
300+
301+
Apache License 2.0(详见 [LICENSE](../../LICENSE))。

0 commit comments

Comments
 (0)