diff --git a/pcip/picp-6.md b/pcip/picp-6.md
new file mode 100644
index 0000000..ed24360
--- /dev/null
+++ b/pcip/picp-6.md
@@ -0,0 +1,106 @@
+# PCIP-6: Implement the MCP for Pulsar Admin Tool
+
+# Background knowledge
+
+Apache Pulsar is a cloud-native distributed messaging and streaming platform, offering unified messaging, storage, and stream processing. It features multi-tenancy, persistent storage, and seamless scalability. Pulsar provides a command-line tool `pulsar-admin` to manage clusters, tenants, namespaces, topics, and subscriptions. However, `pulsar-admin` requires users to learn a large number of CLI commands and options, which is not user-friendly for non-experts.
+
+Model Context Protocol (MCP) is an open standard that defines how external tools expose capabilities to large language models (LLMs) through structured tool schemas. MCP allows LLMs like GPT or Claude to interact with systems such as Pulsar through well-defined interfaces. This is analogous to USB-C as a standardized interface for hardware devices.
+
+# Motivation
+
+Although `pulsar-admin` provides comprehensive CLI access, it's not user-friendly for non-experts. Users often struggle to express complex management needs, such as "list all subscriptions with message backlog > 1000", using raw CLI syntax.
+
+This proposal aims to introduce a new module that exposes Pulsar Admin functionalities as MCP-compatible tools. This allows users to manage Pulsar clusters using natural language prompts, interpreted and executed by LLMs.
+
+Benefits include:
+
+- Simplified cluster operations using plain English or other natural languages
+- Multi-turn conversational interactions with context awareness
+- A clean, extensible tool layer that conforms to the `pulsar-java-contrib` architecture
+
+
+# Goals
+
+## In Scope
+
+- Implement a new module `pulsar-admin-mcp-contrib` to expose Pulsar admin functionalities via MCP protocol.
+- Support 70+ commonly used admin operations via structured MCP tools.
+- Provide built-in transport support for HTTP, STDIO, and SSE.
+- Implement robust context tracking and parameter validation.
+- Provide complete developer/user documentation and a demo use case.
+
+## Out of Scope
+
+- UI interfaces or web frontends (though it can be used as backend).
+- Pulsar core changes (only builds on `pulsar-java-contrib`).
+- Full support for all 120+ CLI commands (70+ are targeted for now).
+
+# High Level Design
+
+The system architecture consists of four main layers:
+
+1. **LLM Interface Layer**: Accepts natural language inputs from an LLM and generates MCP-compatible tool calls
+2. **Protocol Layer**: Central MCP interface that dispatches structured requests to tool handlers
+3. **Tool Execution Layer**: Looks up registered tools and invokes appropriate Pulsar Admin API operations
+4. **Context Management Layer**: Maintains session memory, allowing parameter inheritance across steps
+
+Example interaction:
+
+User input:
+> "Create a topic named `user-events` with 3 partitions"
+
+LLMs send structured tool calls (as per MCP schema) such as:
+```json
+{
+ "tool": "create-topic",
+ "parameters": {
+ "name": "user-events",
+ "partitions": 3
+ }
+}
+```
+
+MCP executes the tool and returns a structured result, which the LLM then summarizes in natural language.
+# Detailed Design
+
+## Design & Implementation Details
+
+### Package structure:
+```java
+pulsar-java-contrib/
+ ├── MCPProtocol.java # MCP protocol interface
+ ├── MCPFactory.java # Factory class for dynamically loading protocol instances
+ ├── tools/ # Tool registration and concrete implementations
+ ├── client/ # PulsarAdmin client management
+ ├── context/ # Session state management
+ ├── validation/ # Parameter validation mechanism
+ ├── transport/ # Support for HTTP, STDIO,SSE
+ ├── model/ # Data structures like ToolSchema, ToolResult, etc
+```
+
+### Key components:
+- `PulsarAdminTool`: Abstract base class for all tools (e.g. list-topics, create-tenant)
+- `ToolExecutor`: Handles concurrency, thread pools, and context updates
+- `ToolRegistry`: Registers all tools via Java SPI
+- `SessionManager`: Tracks ongoing sessions and enhances parameters
+- `ParameterValidator`: Validates tool input against ToolSchema metadata
+
+### Supported tools
+
+**Total tools**: 70+
+Grouped by category:
+- **Cluster**: list-clusters, create-cluster, get-cluster-stats
+- **Tenant**: list-tenants, create-tenant, delete-tenant
+- **Namespace**: create-namespace, set-retention-policy, list-namespaces
+- **Topic**: create-topic, delete-topic, list-topics, compact-topic, get-topic-stats
+- **Subscription**: reset-subscription, get-subscription-stats
+- **Message**: produce-message, peek-messages
+- **Schema**: upload-schema, check-schema-compatibility
+- **Monitoring**: get-cluster-performance, diagnose-topic
+
+Each tool includes:
+
+- A ToolSchema (for LLM prompt templates and validation)
+- A handler (e.g. `ListTopicsHandler`)
+- Parameter schema, default values, error messages
+
diff --git a/pom.xml b/pom.xml
index e57a6e2..bd929a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
pulsar-metrics-contrib
pulsar-auth-contrib
pulsar-rpc-contrib
+ pulsar-admin-mcp-contrib
diff --git a/pulsar-admin-mcp-contrib/README-zh.md b/pulsar-admin-mcp-contrib/README-zh.md
new file mode 100644
index 0000000..b894888
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/README-zh.md
@@ -0,0 +1,301 @@
+# Pulsar Admin MCP Contrib
+
+基于 Model Context Protocol (MCP) 的 Apache Pulsar 管理服务端,支持 AI 助手通过统一接口管理 Pulsar 集群(支持 HTTP Streaming 和 STDIO 两种传输模式)。
+
+## 快速开始
+
+### 依赖
+
+- Java 17+
+- Maven 3.6+
+- Pulsar 2.10+(3.x 优先)
+- MCP Java SDK 0.12.0
+- Jetty 11.0.20
+
+## 0. 启动 Pulsar
+
+### 方式 A: Docker
+```bash
+docker run -it --name pulsar -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:3.2.4 bin/pulsar standalone
+```
+
+- **Service URL**: `pulsar://localhost:6650`
+- **Admin URL**: `http://localhost:8080`
+
+### 方式 B: 本地二进制
+```bash
+bin/pulsar standalone
+```
+
+## 1. 编译 Pulsar MCP
+
+```bash
+mvn clean install -DskipTests -am -pl pulsar-admin-mcp-contrib
+```
+
+**输出**:`target/mcp-contrib-1.0.0-SNAPSHOT.jar`
+
+## 2. 启动 MCP Server
+
+### HTTP 模式(推荐:Web/远程)
+```bash
+java -jar pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar --transport http --port 8889
+```
+
+**日志示例**:
+```
+HTTP Streamable transport ready at http://localhost:8889/mcp/stream
+```
+
+**健康检查**:
+```bash
+curl -i http://localhost:8889/mcp/stream
+```
+
+### STDIO 模式(推荐:Claude Desktop / 本地 IDE)
+```bash
+java -jar pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar --transport stdio
+```
+
+## 3. 客户端配置
+
+### Claude Desktop
+
+#### Windows 配置
+
+**配置文件位置**:`%APPDATA%\Claude\claude_desktop_config.json`
+
+**配置步骤**:
+1. 打开 Claude Desktop 配置文件
+2. 添加以下配置到 `mcpServers` 部分:
+
+```json
+{
+ "mcpServers": {
+ "pulsar-admin": {
+ "command": "java",
+ "args": ["-jar", "编译后的包所在的目录,例如 E:\\projects\\pulsar-admin-mcp-contrib\\target\\mcp-contrib-1.0.0-SNAPSHOT.jar", "--transport", "stdio"],
+ "cwd": "项目所在目录,例如E:\\projects\\pulsar-admin-mcp-contrib",
+ "env": {
+ "PULSAR_SERVICE_URL": "pulsar://localhost:6650",
+ "PULSAR_ADMIN_URL": "http://localhost:8080"
+ }
+ }
+ }
+}
+```
+
+#### macOS 配置
+
+**配置文件位置**:`~/Library/Application Support/Claude/claude_desktop_config.json`
+
+**配置步骤**:
+1. 打开 Claude Desktop 配置文件
+2. 添加以下配置到 `mcpServers` 部分:
+
+```json
+{
+ "mcpServers": {
+ "pulsar-admin": {
+ "command": "java",
+ "args": ["-jar", "编译后的包所在的目录,例如 /Users/username/projects/pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar", "--transport", "stdio"],
+ "cwd": "项目所在目录,例如/Users/username/projects/pulsar-admin-mcp-contrib",
+ "env": {
+ "PULSAR_SERVICE_URL": "pulsar://localhost:6650",
+ "PULSAR_ADMIN_URL": "http://localhost:8080"
+ }
+ }
+ }
+}
+```
+
+**注意事项**:
+- 请将 `编译后的包所在的目录` 替换为实际的 JAR 文件路径
+- 请将 `项目所在目录` 替换为实际的项目根目录路径
+- 确保 Java 环境变量已正确配置
+- Windows 使用反斜杠 `\`,macOS 使用正斜杠 `/`
+
+### Cherry Studio
+
+#### STDIO 模式配置
+同上
+
+#### HTTP 模式配置
+
+**配置步骤**:
+1. 确保 MCP 服务器已启动(HTTP 模式)
+2. 在 Cherry Studio 中添加 HTTP 类型配置:
+
+```json
+{
+ "mcpServers": {
+ "pulsar-admin-http": {
+ "type": "http",
+ "url": "http://localhost:8889/mcp"
+ }
+ }
+}
+```
+
+**配置说明**:
+- **STDIO 模式**:适合本地开发,需要指定完整的 JAR 文件路径
+- **HTTP 模式**:适合远程访问,需要先启动 HTTP 服务器
+- 两种模式的环境变量配置相同,用于指定 Pulsar 集群连接信息
+- Windows 使用反斜杠 `\`,macOS 使用正斜杠 `/`
+
+## 4. 配置项
+
+### 环境变量
+- `PULSAR_SERVICE_URL`(默认 `pulsar://localhost:6650`)
+- `PULSAR_ADMIN_URL`(默认 `http://localhost:8080`)
+
+### 命令行参数
+- `--transport`:http / stdio
+- `--port`:HTTP 端口(默认 8889)
+
+## 5. 工具清单
+
+覆盖 **集群** / **租户** / **命名空间** / **主题** / **订阅** / **消息** / **Schema** / **监控** 8 大类,共 71 个工具:
+
+### 集群管理(10 个工具)
+- `list-clusters` - 列出所有 Pulsar 集群及其状态
+- `get-cluster-info` - 获取特定集群的详细信息
+- `create-cluster` - 创建新的 Pulsar 集群
+- `update-cluster-config` - 更新现有集群的配置
+- `delete-cluster` - 按名称删除 Pulsar 集群
+- `get-cluster-stats` - 获取指定集群的统计信息
+- `list-brokers` - 列出集群中的所有活跃代理
+- `get-broker-stats` - 获取特定代理的统计信息
+- `get-cluster-failure-domain` - 获取集群的故障域
+- `set-cluster-failure-domain` - 设置或更新故障域配置
+
+### 租户管理(6 个工具)
+- `list-tenants` - 列出所有 Pulsar 租户
+- `get-tenant-info` - 获取特定租户的信息
+- `create-tenant` - 创建新的 Pulsar 租户
+- `update-tenant` - 更新租户配置
+- `delete-tenant` - 删除特定租户
+- `get-tenant-stats` - 获取租户的统计信息
+
+### 命名空间管理(10 个工具)
+- `list-namespaces` - 列出所有命名空间
+- `get-namespace-info` - 获取命名空间信息
+- `create-namespace` - 创建新的命名空间
+- `delete-namespace` - 删除命名空间
+- `set-retention-policy` - 为命名空间设置保留策略
+- `get-retention-policy` - 获取命名空间的保留策略
+- `set-backlog-quota` - 为命名空间设置积压配额
+- `get-backlog-quota` - 获取命名空间的积压配额
+- `clear-namespace-backlog` - 清除命名空间的积压
+- `get-namespace-stats` - 获取命名空间统计信息
+
+### 主题管理(15 个工具)
+- `list-topics` - 列出所有主题
+- `create-topic` - 创建新主题
+- `delete-topic` - 删除主题
+- `get-topic-stats` - 获取主题统计信息
+- `get-topic-metadata` - 获取主题元数据
+- `update-topic-partitions` - 更新主题分区数量
+- `compact-topic` - 压缩主题
+- `unload-topic` - 卸载主题
+- `get-topic-backlog` - 获取主题积压信息
+- `expire-topic-messages` - 使主题中的消息过期
+- `peek-messages` - 从主题中查看消息
+- `peek-topic-messages` - 从主题中查看消息而不消费
+- `reset-topic-cursor` - 重置主题游标
+- `get-topic-internal-stats` - 获取主题内部统计信息
+- `get-partitioned-metadata` - 获取分区主题元数据
+
+### 订阅管理(10 个工具)
+- `list-subscriptions` - 列出所有订阅
+- `create-subscription` - 创建新订阅
+- `delete-subscription` - 删除订阅
+- `get-subscription-stats` - 获取订阅统计信息
+- `reset-subscription-cursor` - 重置订阅游标
+- `skip-messages` - 跳过订阅中的消息
+- `expire-subscription-messages` - 使订阅中的消息过期
+- `pause-subscription` - 暂停订阅
+- `resume-subscription` - 恢复订阅
+- `unsubscribe` - 取消订阅主题
+
+### 消息操作(8 个工具)
+- `peek-message` - 从订阅中查看消息而不确认
+- `examine-messages` - 检查主题中的消息而不消费
+- `skip-all-messages` - 跳过订阅中的所有消息
+- `expire-all-messages` - 使订阅中的所有消息过期
+- `get-message-backlog` - 获取订阅的消息积压数量
+- `send-message` - 向主题发送消息
+- `get-message-stats` - 获取主题或订阅的消息统计信息
+- `receive-messages` - 从主题接收消息
+
+### Schema 管理(6 个工具)
+- `get-schema-info` - 获取主题的 Schema 信息
+- `get-schema-version` - 获取主题的 Schema 版本
+- `get-all-schema-versions` - 获取主题的所有 Schema 版本
+- `upload-schema` - 向主题上传新的 Schema
+- `delete-schema` - 删除主题的 Schema
+- `test-schema-compatibility` - 测试 Schema 兼容性
+
+### 监控与诊断(6 个工具)
+- `monitor-cluster-performance` - 监控集群性能指标
+- `monitor-topic-performance` - 监控主题性能指标
+- `monitor-subscription-performance` - 监控订阅性能
+- `health-check` - 检查集群、主题和订阅的健康状态
+- `connection-diagnostics` - 运行不同测试深度的连接诊断
+- `backlog-analysis` - 分析命名空间内的消息积压
+
+> **说明**:仅初始化 PulsarAdmin 时,消息发送/消费相关会返回 `not_implemented`;要启用需初始化 PulsarClient 并创建 producer/consumer。
+
+## 自然语言交互 Demo(Use Cases)
+
+下列示例展示在 MCP 客户端里,用自然语言触发工具调用的典型流程。实际返回字段随集群而异。
+
+### 1. 租户与命名空间管理
+
+**Prompt:**
+> 帮我看看集群里有哪些租户;在 tenant1 下创建命名空间 ns-orders,然后把这个命名空间的统计给我看看。
+
+**触发:**
+`list-tenants` → `create-namespace(tenant=tenant1, namespace=ns-orders)` → `get-namespace-stats(...)`
+
+### 2. 创建分区主题并扩容
+
+**Prompt:**
+> 在 public/default 下建个主题 orders,分区数 8;然后把分区数扩到 16,给我分区元数据。
+
+**触发:**
+`create-topic(partitions=8)` → `update-topic-partitions(16)` → `get-partitioned-metadata`
+
+### 3. 清理积压并做 compaction
+
+**Prompt:**
+> public/default/orders backlog 很大,清一下;再做一次 compaction。
+
+**触发:**
+`get-topic-backlog` → `expire-topic-messages/clear-namespace-backlog` → `compact-topic`
+
+### 4. Schema 上载与兼容性测试
+
+**Prompt:**
+> 给 persistent://public/default/orders 设置 Avro schema:orderId:string, amount:double,并检查兼容性。
+
+**触发:**
+`upload-schema(type=AVRO, schemaJson=...)` → `test-schema-compatibility` / `get-schema-info`
+
+### 5. 订阅管理与游标重置
+
+**Prompt:**
+> 在 orders 上创建 failover 订阅 sub-a,再把游标回拨到 1 小时前。
+
+**触发:**
+`create-subscription(type=failover)` → `reset-subscription-cursor(timestamp=now-1h)`
+
+## 最佳实践
+
+- **Topic 命名**:完整名形如 `persistent://tenant/namespace/topic`。允许短名输入,服务端会规范化。
+
+- **失败域**:为 Broker/Bookie 设置 Failure Domain,提升机架/可用区级容灾。
+
+## 许可证
+
+Apache License 2.0(详见 [LICENSE](../../LICENSE))。
diff --git a/pulsar-admin-mcp-contrib/README.md b/pulsar-admin-mcp-contrib/README.md
new file mode 100644
index 0000000..7184440
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/README.md
@@ -0,0 +1,308 @@
+# Pulsar Admin MCP Contrib
+
+Apache Pulsar management server based on Model Context Protocol (MCP), enabling AI assistants to manage Pulsar clusters through a unified interface (supports both HTTP Streaming and STDIO transport modes).
+
+## Demo
+### Claude Desktop
+
+
+### Cherry Studio
+
+
+## Quick Start
+
+### Dependencies
+
+- Java 17+
+- Maven 3.6+
+- Pulsar 2.10+ (3.x preferred)
+- MCP Java SDK 0.12.0
+- Jetty 11.0.20
+
+## 0. Start Pulsar
+
+### Method A: Docker
+```bash
+docker run -it --name pulsar -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:3.2.4 bin/pulsar standalone
+```
+
+- **Service URL**: `pulsar://localhost:6650`
+- **Admin URL**: `http://localhost:8080`
+
+### Method B: Local Binary
+```bash
+bin/pulsar standalone
+```
+
+## 1. Build Pulsar-MCP
+
+```bash
+mvn clean install -DskipTests -am -pl pulsar-admin-mcp-contrib
+```
+
+**Output**: `target/mcp-contrib-1.0.0-SNAPSHOT.jar`
+
+## 2. Start MCP Server
+
+### HTTP Mode (Recommended: Web/Remote)
+```bash
+java -jar pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar --transport http --port 8889
+```
+
+**Log Example**:
+```
+HTTP Streamable transport ready at http://localhost:8889/mcp/stream
+```
+
+**Health Check**:
+```bash
+curl -i http://localhost:8889/mcp/stream
+```
+
+### STDIO Mode (Recommended: Claude Desktop / Local IDE)
+```bash
+java -jar pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar --transport stdio
+```
+
+## 3. Client Configuration
+
+### Claude Desktop
+
+#### Windows Configuration
+
+**Config File Location**: `%APPDATA%\Claude\claude_desktop_config.json`
+
+**Configuration Steps**:
+1. Open Claude Desktop config file
+2. Add the following configuration to the `mcpServers` section:
+
+```json
+{
+ "mcpServers": {
+ "pulsar-admin": {
+ "command": "java",
+ "args": ["-jar", "Path to compiled JAR, e.g., E:\\projects\\pulsar-admin-mcp-contrib\\target\\mcp-contrib-1.0.0-SNAPSHOT.jar", "--transport", "stdio"],
+ "cwd": "Project directory, e.g., E:\\projects\\pulsar-admin-mcp-contrib",
+ "env": {
+ "PULSAR_SERVICE_URL": "pulsar://localhost:6650",
+ "PULSAR_ADMIN_URL": "http://localhost:8080"
+ }
+ }
+ }
+}
+```
+
+#### macOS Configuration
+
+**Config File Location**: `~/Library/Application Support/Claude/claude_desktop_config.json`
+
+**Configuration Steps**:
+1. Open Claude Desktop config file
+2. Add the following configuration to the `mcpServers` section:
+
+```json
+{
+ "mcpServers": {
+ "pulsar-admin": {
+ "command": "java",
+ "args": ["-jar", "Path to compiled JAR, e.g., /Users/username/projects/pulsar-admin-mcp-contrib/target/mcp-contrib-1.0.0-SNAPSHOT.jar", "--transport", "stdio"],
+ "cwd": "Project directory, e.g., /Users/username/projects/pulsar-admin-mcp-contrib",
+ "env": {
+ "PULSAR_SERVICE_URL": "pulsar://localhost:6650",
+ "PULSAR_ADMIN_URL": "http://localhost:8080"
+ }
+ }
+ }
+}
+```
+
+**Notes**:
+- Replace `Path to compiled JAR` with the actual JAR file path
+- Replace `Project directory` with the actual project root directory path
+- Ensure Java environment variables are properly configured
+- Windows uses backslash `\`, macOS uses forward slash `/`
+
+### Cherry Studio
+
+#### STDIO Mode Configuration
+Same as above
+
+#### HTTP Mode Configuration
+
+**Configuration Steps**:
+1. Ensure MCP server is started (HTTP mode)
+2. Add HTTP type configuration in Cherry Studio:
+
+```json
+{
+ "mcpServers": {
+ "pulsar-admin-http": {
+ "type": "http",
+ "url": "http://localhost:8889/mcp"
+ }
+ }
+}
+```
+
+**Configuration Notes**:
+- **STDIO Mode**: Suitable for local development, requires full JAR file path
+- **HTTP Mode**: Suitable for remote access, requires HTTP server to be started first
+- Both modes use the same environment variable configuration for Pulsar cluster connection
+- Windows uses backslash `\`, macOS uses forward slash `/`
+
+## 4. Configuration Options
+
+### Environment Variables
+- `PULSAR_SERVICE_URL` (default `pulsar://localhost:6650`)
+- `PULSAR_ADMIN_URL` (default `http://localhost:8080`)
+
+### Command Line Parameters
+- `--transport`: http / stdio
+- `--port`: HTTP port (default 8889)
+
+## 5. Tool Inventory
+
+Covers **Cluster** / **Tenant** / **Namespace** / **Topic** / **Subscription** / **Message** / **Schema** / **Monitoring** 8 categories, totaling 71 tools:
+
+### Cluster Management (10 tools)
+- `list-clusters` - List all Pulsar clusters and their status
+- `get-cluster-info` - Get detailed information about a specific cluster
+- `create-cluster` - Create a new Pulsar cluster
+- `update-cluster-config` - Update configuration of an existing cluster
+- `delete-cluster` - Delete a Pulsar cluster by name
+- `get-cluster-stats` - Get statistics for a given cluster
+- `list-brokers` - List all active brokers in a cluster
+- `get-broker-stats` - Get statistics for a specific broker
+- `get-cluster-failure-domain` - Get failure domains for a cluster
+- `set-cluster-failure-domain` - Set or update failure domain configuration
+
+### Tenant Management (6 tools)
+- `list-tenants` - List all Pulsar tenants
+- `get-tenant-info` - Get information about a specific tenant
+- `create-tenant` - Create a new Pulsar tenant
+- `update-tenant` - Update tenant configuration
+- `delete-tenant` - Delete a specific tenant
+- `get-tenant-stats` - Get statistics for a tenant
+
+### Namespace Management (10 tools)
+- `list-namespaces` - List all namespaces
+- `get-namespace-info` - Get namespace information
+- `create-namespace` - Create a new namespace
+- `delete-namespace` - Delete a namespace
+- `set-retention-policy` - Set retention policy for a namespace
+- `get-retention-policy` - Get retention policy for a namespace
+- `set-backlog-quota` - Set backlog quota for a namespace
+- `get-backlog-quota` - Get backlog quota for a namespace
+- `clear-namespace-backlog` - Clear backlog for a namespace
+- `get-namespace-stats` - Get namespace statistics
+
+### Topic Management (15 tools)
+- `list-topics` - List all topics
+- `create-topic` - Create a new topic
+- `delete-topic` - Delete a topic
+- `get-topic-stats` - Get topic statistics
+- `get-topic-metadata` - Get topic metadata
+- `update-topic-partitions` - Update topic partition count
+- `compact-topic` - Compact a topic
+- `unload-topic` - Unload a topic
+- `get-topic-backlog` - Get topic backlog information
+- `expire-topic-messages` - Expire messages in a topic
+- `peek-messages` - Peek messages from a topic
+- `peek-topic-messages` - Peek messages from a topic without consuming
+- `reset-topic-cursor` - Reset topic cursor
+- `get-topic-internal-stats` - Get internal topic statistics
+- `get-partitioned-metadata` - Get partitioned topic metadata
+
+### Subscription Management (10 tools)
+- `list-subscriptions` - List all subscriptions
+- `create-subscription` - Create a new subscription
+- `delete-subscription` - Delete a subscription
+- `get-subscription-stats` - Get subscription statistics
+- `reset-subscription-cursor` - Reset subscription cursor
+- `skip-messages` - Skip messages in a subscription
+- `expire-subscription-messages` - Expire messages in a subscription
+- `pause-subscription` - Pause a subscription
+- `resume-subscription` - Resume a subscription
+- `unsubscribe` - Unsubscribe from a topic
+
+### Message Operations (8 tools)
+- `peek-message` - Peek messages from a subscription without acknowledging
+- `examine-messages` - Examine messages from a topic without consuming
+- `skip-all-messages` - Skip all messages in a subscription
+- `expire-all-messages` - Expire all messages in a subscription
+- `get-message-backlog` - Get message backlog count for a subscription
+- `send-message` - Send a message to a topic
+- `get-message-stats` - Get message statistics for a topic or subscription
+- `receive-messages` - Receive messages from a topic
+
+### Schema Management (6 tools)
+- `get-schema-info` - Get schema information for a topic
+- `get-schema-version` - Get schema version for a topic
+- `get-all-schema-versions` - Get all schema versions for a topic
+- `upload-schema` - Upload a new schema to a topic
+- `delete-schema` - Delete schema for a topic
+- `test-schema-compatibility` - Test schema compatibility
+
+### Monitoring & Diagnostics (6 tools)
+- `monitor-cluster-performance` - Monitor cluster performance metrics
+- `monitor-topic-performance` - Monitor topic performance metrics
+- `monitor-subscription-performance` - Monitor subscription performance
+- `health-check` - Check cluster, topic, and subscription health
+- `connection-diagnostics` - Run connection diagnostics with different test depths
+- `backlog-analysis` - Analyze message backlog within a namespace
+
+> **Note**: When only PulsarAdmin is initialized, message send/consume related tools will return `not_implemented`; to enable them, initialize PulsarClient and create producer/consumer.
+
+## Natural Language Interaction Demo (Use Cases)
+
+The following examples demonstrate typical workflows of triggering tool calls with natural language in MCP clients. Actual returned fields vary by cluster.
+
+### 1. Tenant and Namespace Management
+
+**Prompt:**
+> Show me all tenants in the cluster; create namespace ns-orders under tenant1, then show me the statistics for this namespace.
+
+**Triggered:**
+`list-tenants` → `create-namespace(tenant=tenant1, namespace=ns-orders)` → `get-namespace-stats(...)`
+
+### 2. Create Partitioned Topic and Scale
+
+**Prompt:**
+> Create topic orders under public/default with 8 partitions; then scale to 16 partitions and show me the partition metadata.
+
+**Triggered:**
+`create-topic(partitions=8)` → `update-topic-partitions(16)` → `get-partitioned-metadata`
+
+### 3. Clear Backlog and Compact
+
+**Prompt:**
+> public/default/orders has a large backlog, clear it; then do a compaction.
+
+**Triggered:**
+`get-topic-backlog` → `expire-topic-messages/clear-namespace-backlog` → `compact-topic`
+
+### 4. Schema Upload and Compatibility Testing
+
+**Prompt:**
+> Set Avro schema for persistent://public/default/orders: orderId:string, amount:double, and check compatibility.
+
+**Triggered:**
+`upload-schema(type=AVRO, schemaJson=...)` → `test-schema-compatibility` / `get-schema-info`
+
+### 5. Subscription Management and Cursor Reset
+
+**Prompt:**
+> Create failover subscription sub-a on orders, then reset cursor to 1 hour ago.
+
+**Triggered:**
+`create-subscription(type=failover)` → `reset-subscription-cursor(timestamp=now-1h)`
+
+## Best Practices
+
+- **Topic Naming**: Full name format is `persistent://tenant/namespace/topic`. Short names are allowed, server will normalize.
+
+- **Failure Domain**: Set Failure Domain for Broker/Bookie to improve rack/availability zone level disaster recovery.
+
+## License
+
+Apache License 2.0 (see [LICENSE](../../LICENSE) for details).
\ No newline at end of file
diff --git a/pulsar-admin-mcp-contrib/claude-desktop-config.json b/pulsar-admin-mcp-contrib/claude-desktop-config.json
new file mode 100644
index 0000000..7221b33
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/claude-desktop-config.json
@@ -0,0 +1,18 @@
+{
+ "mcpServers": {
+ "pulsar-admin": {
+ "command": "java",
+ "args": [
+ "-jar",
+ "E:\\projects2\\pulsar-admin-mcp-contrib\\target\\mcp-contrib-1.0.0-SNAPSHOT.jar",
+ "--transport",
+ "stdio"
+ ],
+ "cwd": "E:\\projects2\\pulsar-admin-mcp-contrib",
+ "env": {
+ "PULSAR_SERVICE_URL": "pulsar://localhost:6650",
+ "PULSAR_ADMIN_URL": "http://localhost:8080"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-admin-mcp-contrib/demo1.gif b/pulsar-admin-mcp-contrib/demo1.gif
new file mode 100644
index 0000000..064fe0e
Binary files /dev/null and b/pulsar-admin-mcp-contrib/demo1.gif differ
diff --git a/pulsar-admin-mcp-contrib/demo2.gif b/pulsar-admin-mcp-contrib/demo2.gif
new file mode 100644
index 0000000..ee8cffa
Binary files /dev/null and b/pulsar-admin-mcp-contrib/demo2.gif differ
diff --git a/pulsar-admin-mcp-contrib/pom.xml b/pulsar-admin-mcp-contrib/pom.xml
new file mode 100644
index 0000000..d0ff264
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/pom.xml
@@ -0,0 +1,207 @@
+
+
+
+ 4.0.0
+
+
+ org.apache
+ pulsar-java-contrib
+ 1.0.0-SNAPSHOT
+
+
+ mcp-contrib
+ Pulsar Admin MCP Contrib
+ Apache Pulsar Admin MCP Server for AI integration
+
+
+ 3.5.3
+ 2.17.2
+ 17
+ 17
+ 11.0.8
+ 5.0.0
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring-boot.version}
+ pom
+ import
+
+
+ org.eclipse.jetty
+ jetty-bom
+ 11.0.20
+ pom
+ import
+
+
+
+
+
+
+ org.apache.pulsar
+ pulsar-client-admin
+ test
+
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty.version}
+
+
+ org.eclipse.jetty
+ jetty-servlet
+ ${jetty.version}
+
+
+ org.eclipse.jetty
+ jetty-security
+ ${jetty.version}
+
+
+ org.eclipse.jetty
+ jetty-webapp
+ ${jetty.version}
+
+
+ org.eclipse.jetty
+ jetty-io
+ ${jetty.version}
+
+
+ org.eclipse.jetty
+ jetty-util
+ ${jetty.version}
+
+
+ org.eclipse.jetty
+ jetty-http
+ ${jetty.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ io.modelcontextprotocol.sdk
+ mcp
+ 0.12.0
+
+
+ org.apache.pulsar
+ pulsar-client
+
+
+ org.apache.pulsar
+ pulsar-client-admin
+
+
+ org.apache.commons
+ commons-lang3
+ 3.18.0
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ true
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ org.testcontainers
+ pulsar
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+ org.apache.pulsar.admin.mcp.Main
+
+
+
+ repackage
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+
+
+
+
+
+ 2024
+
+
+
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java
new file mode 100644
index 0000000..f9775d3
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.admin.mcp;
+
+import org.apache.pulsar.admin.mcp.config.PulsarMCPCliOptions;
+import org.apache.pulsar.admin.mcp.transport.TransportLauncher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Main {
+ private static final Logger logger = LoggerFactory.getLogger(Main.class);
+
+ public static void main(String[] args) {
+ try {
+ PulsarMCPCliOptions options = PulsarMCPCliOptions.parseArgs(args);
+ logger.info("Starting Pulsar Admin MCP Server with options: {}", options);
+ TransportLauncher.start(options);
+ } catch (Exception e) {
+ logger.error("Fatal error starting Pulsar Admin MCP Server: {}", e.getMessage(), e);
+ System.err.println("Fatal error: " + e.getMessage());
+ System.exit(-1);
+ }
+ }
+}
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/PulsarClientManager.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/PulsarClientManager.java
new file mode 100644
index 0000000..84e464a
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/PulsarClientManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.admin.mcp.client;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.springframework.stereotype.Component;
+
+@Component
+public class PulsarClientManager implements AutoCloseable {
+
+ private PulsarAdmin pulsarAdmin;
+ private PulsarClient pulsarClient;
+
+ private final AtomicBoolean adminInitialized = new AtomicBoolean();
+ private final AtomicBoolean clientInitialized = new AtomicBoolean();
+
+ public void initialize() {
+ getAdmin();
+ getClient();
+ }
+
+ public synchronized PulsarAdmin getAdmin() {
+ if (!adminInitialized.get()) {
+ initializePulsarAdmin();
+ }
+ return pulsarAdmin;
+ }
+
+ public synchronized PulsarClient getClient() {
+ if (!clientInitialized.get()) {
+ initializePulsarClient();
+ }
+ return pulsarClient;
+ }
+
+ private void initializePulsarAdmin() {
+
+ if (!adminInitialized.compareAndSet(false, true)) {
+ return;
+ }
+
+ boolean success = false;
+ try {
+ String adminUrl = System.getenv().getOrDefault("PULSAR_ADMIN_URL", "http://localhost:8080");
+
+ PulsarAdminBuilder adminBuilder = PulsarAdmin.builder()
+ .serviceHttpUrl(adminUrl)
+ .connectionTimeout(30, TimeUnit.SECONDS)
+ .readTimeout(60, TimeUnit.SECONDS);
+
+ pulsarAdmin = adminBuilder.build();
+
+ pulsarAdmin.clusters().getClusters();
+ success = true;
+
+ } catch (Exception e) {
+ if (pulsarAdmin != null) {
+ try {
+ pulsarAdmin.close();
+ } catch (Exception ignore) {
+
+ }
+ pulsarAdmin = null;
+ }
+ adminInitialized.set(false);
+ throw new RuntimeException("Failed to initialize PulsarAdmin", e);
+ } finally {
+ if (!success) {
+ adminInitialized.set(false);
+ }
+ }
+ }
+
+ private void initializePulsarClient() {
+ if (!clientInitialized.compareAndSet(false, true)) {
+ return;
+ }
+ boolean success = false;
+ try {
+ String serviceUrl = System.getenv().getOrDefault("PULSAR_SERVICE_URL", "pulsar://localhost:6650");
+
+ var clientBuilder = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .operationTimeout(30, TimeUnit.SECONDS)
+ .connectionTimeout(30, TimeUnit.SECONDS)
+ .keepAliveInterval(30, TimeUnit.SECONDS);
+
+ this.pulsarClient = clientBuilder.build();
+ success = true;
+
+ } catch (Exception e) {
+ if (pulsarClient != null) {
+ try {
+ pulsarClient.close();
+ } catch (Exception ignore) {
+ }
+ pulsarClient = null;
+ }
+ clientInitialized.set(false);
+ throw new RuntimeException("Failed to initialize PulsarClient", e);
+ } finally {
+ if (!success) {
+ clientInitialized.set(false);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ }
+ if (pulsarAdmin != null) {
+ pulsarAdmin.close();
+ }
+ adminInitialized.set(false);
+ clientInitialized.set(false);
+ }
+
+}
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/package-info.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/package-info.java
new file mode 100644
index 0000000..0adeeac
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/package-info.java
@@ -0,0 +1,14 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.admin.mcp.client;
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/PulsarMCPCliOptions.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/PulsarMCPCliOptions.java
new file mode 100644
index 0000000..163f4e6
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/PulsarMCPCliOptions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.admin.mcp.config;
+
+import lombok.Getter;
+
+@Getter
+public class PulsarMCPCliOptions {
+
+ @Getter
+ public enum TransportType {
+
+ STDIO("stdio", "Standard input/output (Claude Desktop)"),
+ HTTP("http", "HTTP Streaming Events (Web application)");
+ private final String value;
+ private final String description;
+
+ TransportType(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ public static TransportType fromString(String value) {
+ for (TransportType t : values()) {
+ if (t.value.equalsIgnoreCase(value)) {
+ return t;
+ }
+ }
+ throw new IllegalArgumentException(
+ value + " is not a valid TransportType. Valid Options: stdio,http");
+ }
+ }
+
+ private TransportType transport = TransportType.STDIO;
+ private int httpPort = 8889;
+
+ public static PulsarMCPCliOptions parseArgs(String[] args) {
+ PulsarMCPCliOptions opts = new PulsarMCPCliOptions();
+
+ for (int i = 0; i < args.length; i++) {
+ String arg = args[i];
+ switch (arg) {
+ case "-t", "--transport" -> {
+ if (i + 1 >= args.length) {
+ throw new IllegalArgumentException("Missing value for --transport");
+ }
+ opts.transport = TransportType.fromString(args[++i]);
+ }
+ case "--port" -> {
+ if (i + 1 >= args.length) {
+ throw new IllegalArgumentException("Missing value for --port");
+ }
+ try {
+ opts.httpPort = Integer.parseInt(args[++i]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid port number for --port");
+ }
+ }
+ default -> {
+ throw new IllegalArgumentException("Unknown argument: " + arg);
+ }
+ }
+ }
+ return opts;
+ }
+
+ @Override
+ public String toString() {
+ return "PulsarMCPCliOptions{transport=" + transport
+ + ",httpPort=" + httpPort + '}';
+ }
+}
+
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/package-info.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/package-info.java
new file mode 100644
index 0000000..ea73ee4
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/package-info.java
@@ -0,0 +1,14 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.admin.mcp.config;
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/package-info.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/package-info.java
new file mode 100644
index 0000000..13e1780
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/package-info.java
@@ -0,0 +1,14 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.admin.mcp;
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java
new file mode 100644
index 0000000..d09911b
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.admin.mcp.tools;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.spec.McpSchema;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BasePulsarTools {
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(BasePulsarTools.class);
+ protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ protected final PulsarAdmin pulsarAdmin;
+
+ public BasePulsarTools(PulsarAdmin pulsarAdmin) {
+ if (pulsarAdmin == null) {
+ throw new IllegalArgumentException("pulsarAdmin cannot be null");
+ }
+ this.pulsarAdmin = pulsarAdmin;
+ }
+
+ protected McpSchema.CallToolResult createSuccessResult(String message, Object data){
+ StringBuilder result = new StringBuilder();
+ result.append(message).append("\n");
+
+ if (data != null){
+ try {
+ String jsonData = OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+ .writeValueAsString(data);
+ result.append(jsonData)
+ .append("\n");
+ } catch (Exception e) {
+ result.append("Result").append(data.toString()).append("\n");
+ }
+ }
+
+ return new McpSchema.CallToolResult(
+ List.of(new McpSchema.TextContent(result.toString())),
+ false
+ );
+ }
+
+ protected McpSchema.CallToolResult createErrorResult(String message){
+ String errorText = "Error: " + message;
+ return new McpSchema.CallToolResult(
+ List.of(new McpSchema.TextContent(errorText)),
+ true
+ );
+ }
+
+ protected McpSchema.CallToolResult createErrorResult(String message, List suggestions){
+ StringBuilder result = new StringBuilder();
+ result.append(message).append("\n");
+
+ if (suggestions != null && !suggestions.isEmpty()) {
+ suggestions.forEach(s -> result.append(s).append("\n"));
+ }
+ return new McpSchema.CallToolResult(
+ List.of(new McpSchema.TextContent(result.toString())),
+ true
+ );
+ }
+
+ protected String getStringParam(Map map, String key){
+ Object value = map.get(key);
+ return value == null ? "" : value.toString();
+ }
+
+ protected String getRequiredStringParam(Map map, String key) throws IllegalArgumentException{
+ String value = getStringParam(map, key);
+ if (value == null || value.trim().isEmpty()) {
+ throw new IllegalArgumentException("Required parameter '" + key + "' is missing");
+ }
+ return value.trim();
+ }
+
+ protected Integer getIntParam(Map map, String key, Integer defaultValue) {
+ Object value = map.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+
+ try {
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ } else {
+ return Integer.parseInt(value.toString());
+ }
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ protected Boolean getBooleanParam(Map map, String key, Boolean defaultValue) {
+ Object value = map.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Boolean) {
+ return (Boolean) value;
+ } else {
+ return Boolean.parseBoolean(value.toString());
+ }
+ }
+
+ protected Long getLongParam(Map arguments, String timestamp, Long defaultValue) {
+ Object value = arguments.get(timestamp);
+ if (value == null) {
+ return defaultValue;
+ }
+
+ try {
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ } else {
+ return Long.parseLong(value.toString());
+ }
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ protected static McpSchema.Tool createTool (
+ String name,
+ String description,
+ String inputSchema) {
+ return McpSchema.Tool.builder()
+ .name(name)
+ .description(description)
+ .inputSchema(inputSchema)
+ .build();
+ }
+
+
+ protected String buildFullTopicName(Map arguments) {
+ String topicName = getStringParam(arguments, "topic");
+ if (topicName != null && !topicName.isBlank()) {
+ if (topicName.startsWith("persistent://") || topicName.startsWith("non-persistent://")) {
+ return topicName.trim();
+ }
+ }
+
+ String tenant = (String) arguments.getOrDefault("tenant", "public");
+ String namespace = (String) arguments.getOrDefault("namespace", "default");
+ Boolean persistent = (Boolean) arguments.getOrDefault("persistent", true);
+
+ String prefix = persistent ? "persistent://" : "non-persistent://";
+ return prefix + tenant + "/" + namespace + "/" + topicName;
+ }
+
+ protected String resolveNamespace(Map arguments) {
+ String tenant = getStringParam(arguments, "tenant");
+ String namespace = getStringParam(arguments, "namespace");
+
+ if (namespace != null && namespace.contains("/")) {
+ return namespace;
+ }
+
+ if (tenant == null) {
+ tenant = "public";
+ }
+
+ if (namespace == null) {
+ namespace = "default";
+ }
+
+ return tenant + "/" + namespace;
+ }
+
+ protected void addTopicBreakdown(Map result, String fullTopicName) {
+ if (fullTopicName.startsWith("persistent://")) {
+ fullTopicName = fullTopicName.substring("persistent://".length());
+ } else if (fullTopicName.startsWith("non-persistent://")) {
+ fullTopicName = fullTopicName.substring("non-persistent://".length());
+ }
+
+ String[] parts = fullTopicName.split("/", 3);
+ if (parts.length != 3) {
+ return;
+ }
+
+ result.put("tenant", parts[0]);
+ result.put("namespace", parts[1]);
+ result.put("topicName", parts[2]);
+ }
+
+}
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/ClusterTools.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/ClusterTools.java
new file mode 100644
index 0000000..62c6a15
--- /dev/null
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/ClusterTools.java
@@ -0,0 +1,923 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.admin.mcp.tools;
+
+import io.modelcontextprotocol.server.McpServerFeatures;
+import io.modelcontextprotocol.server.McpSyncServer;
+import io.modelcontextprotocol.spec.McpSchema;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+
+public class ClusterTools extends BasePulsarTools {
+
+ public ClusterTools(PulsarAdmin pulsarAdmin) {
+ super(pulsarAdmin);
+ }
+
+ public void registerTools(McpSyncServer mcpServer){
+ registerListClusters(mcpServer);
+ registerGetClusterInfo(mcpServer);
+ registerCreateCluster(mcpServer);
+ registerDeleteCluster(mcpServer);
+ registerUpdateClusterConfig(mcpServer);
+ registerGetClusterStats(mcpServer);
+ registerListBrokers(mcpServer);
+ registerGetBrokerStats(mcpServer);
+ registerGetClusterFailureDomain(mcpServer);
+ registerSetClusterFailureDomain(mcpServer);
+ }
+
+ private void registerListClusters(McpSyncServer mcpServer){
+ McpSchema.Tool tool = createTool(
+ "list-clusters",
+ "List all Pulsar clusters and their status",
+ """
+ {
+ "type": "object",
+ "properties": {},
+ "required": []
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ var clusters = pulsarAdmin.clusters().getClusters();
+
+ Map clusterDetails = new HashMap<>();
+ for (String clusterName :clusters) {
+ try {
+ ClusterData clusterData = pulsarAdmin.clusters().getCluster(clusterName);
+ Map details = new HashMap<>();
+ details.put("serviceUrl", clusterData.getServiceUrl());
+ details.put("serviceUrlTls", clusterData.getServiceUrlTls());
+ details.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
+ details.put("brokerServiceUrlTls", clusterData.getBrokerServiceUrlTls());
+ details.put("status", "active");
+ clusterDetails.put(clusterName, details);
+ } catch (Exception e) {
+ Map details = new HashMap<>();
+ details.put("status", "error");
+ details.put("error", e.getMessage());
+ clusterDetails.put(clusterName, details);
+ }
+ }
+
+ Map result = new HashMap<>();
+ result.put("clusters", clusters);
+ result.put("count", clusters.size());
+ result.put("clusterDetails", clusterDetails);
+
+ return createSuccessResult("Cluster details", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to list clusters", e);
+ return createErrorResult("Failed to list clusters" + e.getMessage());
+ }
+ }).build());
+ }
+
+ private void registerGetClusterInfo(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "get-cluster-info",
+ "Get details information about a specific cluster",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "clusterName": {
+ "type": "string",
+ "description": "Name of the cluster to get info about"
+ }
+ },
+ "required": ["clusterName"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+
+ ClusterData clusterData = pulsarAdmin.clusters().getCluster(clusterName);
+
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("serviceUrl", clusterData.getServiceUrl());
+ result.put("serviceUrlTls", clusterData.getServiceUrlTls());
+ result.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
+ result.put("brokerServiceUrlTls", clusterData.getBrokerServiceUrlTls());
+ result.put("peerClusterNames", clusterData.getPeerClusterNames());
+ result.put("proxyProtocol", clusterData.getProxyProtocol());
+ result.put("authenticationPlugin", clusterData.getAuthenticationPlugin());
+ result.put("authenticationParameters", clusterData.getAuthenticationParameters());
+ result.put("proxyServiceUrl", clusterData.getProxyServiceUrl());
+
+ return createSuccessResult("Cluster info retrieved", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get cluster info", e);
+ return createErrorResult("Failed to get cluster info: " + e.getMessage());
+ }
+ })
+ .build());
+ }
+
+ private void registerCreateCluster(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "create-cluster",
+ "Create a new Pulsar cluster",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "clusterName": {
+ "type": "string",
+ "description": "Name of the cluster to create"
+ },
+ "serviceUrl": {
+ "type": "string",
+ "description": "Service URL for the cluster"
+ },
+ "serviceUrlTls": {
+ "type": "string",
+ "description": "TLS service URL for the cluster (optional)"
+ },
+ "brokerServiceUrl": {
+ "type": "string",
+ "description": "Broker service URL for the cluster (optional)"
+ },
+ "brokerServiceUrlTls": {
+ "type": "string",
+ "description": "TLS broker service URL for the cluster (optional)"
+ },
+ "proxyServiceUrl": {
+ "type": "string",
+ "description": "Proxy service URL (optional)"
+ },
+ "authenticationPlugin": {
+ "type": "string",
+ "description": "Authentication plugin class name (optional)"
+ },
+ "authenticationParameters": {
+ "type": "string",
+ "description": "Authentication parameters (optional)"
+ }
+ },
+ "required": ["clusterName", "serviceUrl"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+ String serviceUrl = getRequiredStringParam(request.arguments(), "serviceUrl");
+ String serviceUrlTls = getStringParam(request.arguments(), "serviceUrlTls");
+ String brokerServiceUrl = getStringParam(request.arguments(), "brokerServiceUrl");
+ String brokerServiceUrlTls = getStringParam(request.arguments(), "brokerServiceUrlTls");
+ String proxyServiceUrl = getStringParam(request.arguments(), "proxyServiceUrl");
+ String authenticationPlugin = getStringParam(request.arguments(), "authenticationPlugin");
+ String authenticationParameters = getStringParam(
+ request.arguments(),
+ "authenticationParameters"
+ );
+
+ try {
+ pulsarAdmin.clusters().getCluster(clusterName);
+ return createErrorResult("Cluster already exists: " + clusterName,
+ List.of("Choose a different cluster name"));
+ } catch (PulsarAdminException.NotFoundException ignore) {
+
+ } catch (PulsarAdminException e) {
+ return createErrorResult("Failed to verify cluster existence: " + e.getMessage());
+ }
+
+ var clusterDataBuilder = ClusterData.builder()
+ .serviceUrl(serviceUrl);
+
+ if (serviceUrlTls != null) {
+ clusterDataBuilder.serviceUrlTls(serviceUrlTls);
+ }
+ if (brokerServiceUrl != null) {
+ clusterDataBuilder.brokerServiceUrl(brokerServiceUrl);
+ }
+ if (brokerServiceUrlTls != null) {
+ clusterDataBuilder.brokerServiceUrlTls(brokerServiceUrlTls);
+ }
+ if (proxyServiceUrl != null) {
+ clusterDataBuilder.proxyServiceUrl(proxyServiceUrl);
+ }
+ if (authenticationPlugin != null) {
+ clusterDataBuilder.authenticationPlugin(authenticationPlugin);
+ }
+ if (authenticationParameters != null) {
+ clusterDataBuilder.authenticationParameters(authenticationParameters);
+ }
+
+ pulsarAdmin.clusters().createCluster(clusterName, clusterDataBuilder.build());
+
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("serviceUrl", serviceUrl);
+ result.put("created", true);
+
+ return createSuccessResult("Cluster created successfully", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult("Invalid parameter: " + e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to create cluster", e);
+ return createErrorResult("Failed to create cluster: " + e.getMessage());
+ }
+ })
+ .build());
+ }
+
+ private void registerUpdateClusterConfig(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "update-cluster-config",
+ "Update configuration of an existing Pulsar cluster",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "clusterName": {
+ "type": "string",
+ "description": "Name of the cluster to update"
+ },
+ "serviceUrl": {
+ "type": "string",
+ "description": "Service URL for the cluster (optional)"
+ },
+ "serviceUrlTls": {
+ "type": "string",
+ "description": "TLS service URL for the cluster (optional)"
+ },
+ "brokerServiceUrl": {
+ "type": "string",
+ "description": "Broker service URL for the cluster (optional)"
+ },
+ "brokerServiceUrlTls": {
+ "type": "string",
+ "description": "TLS broker service URL for the cluster (optional)"
+ },
+ "proxyServiceUrl": {
+ "type": "string",
+ "description": "Proxy service URL (optional)"
+ },
+ "authenticationPlugin": {
+ "type": "string",
+ "description": "Authentication plugin class name (optional)"
+ },
+ "authenticationParameters": {
+ "type": "string",
+ "description": "Authentication parameters (optional)"
+ }
+ },
+ "required": ["clusterName", "serviceUrl"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+ String serviceUrl = getStringParam(request.arguments(), "serviceUrl");
+ String serviceUrlTls = getStringParam(request.arguments(), "serviceUrlTls");
+ String brokerServiceUrl = getStringParam(request.arguments(), "brokerServiceUrl");
+ String brokerServiceUrlTls = getStringParam(request.arguments(), "brokerServiceUrlTls");
+ String proxyServiceUrl = getStringParam(request.arguments(), "proxyServiceUrl");
+ String authenticationPlugin = getStringParam(request.arguments(), "authenticationPlugin");
+ String authenticationParameters = getStringParam(
+ request.arguments(),
+ "authenticationParameters");
+
+ ClusterData current;
+ try {
+ current = pulsarAdmin.clusters().getCluster(clusterName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ return createErrorResult("Cluster not found: " + clusterName);
+ }
+
+ String finalServiceUrl = (serviceUrl != null && !serviceUrl.isBlank())
+ ? serviceUrl.trim()
+ : current.getServiceUrl();
+
+ var b = ClusterData.builder()
+ .serviceUrl(finalServiceUrl)
+ .serviceUrlTls((serviceUrlTls != null
+ && !serviceUrlTls.isBlank())
+ ? serviceUrlTls
+ : current.getServiceUrlTls())
+ .brokerServiceUrl((brokerServiceUrl != null
+ && !brokerServiceUrl.isBlank())
+ ? brokerServiceUrl
+ : current.getBrokerServiceUrl())
+ .brokerServiceUrlTls((brokerServiceUrlTls != null
+ && !brokerServiceUrlTls.isBlank())
+ ? brokerServiceUrlTls
+ : current.getBrokerServiceUrlTls())
+ .proxyServiceUrl((proxyServiceUrl != null
+ && !proxyServiceUrl.isBlank())
+ ? proxyServiceUrl
+ : current.getProxyServiceUrl());
+
+ if (authenticationPlugin != null && !authenticationPlugin.isBlank()) {
+ b.authenticationPlugin(authenticationPlugin);
+ } else if (current.getAuthenticationPlugin() != null) {
+ b.authenticationPlugin(current.getAuthenticationPlugin());
+ }
+ if (authenticationParameters != null && !authenticationParameters.isBlank()) {
+ b.authenticationParameters(authenticationParameters);
+ } else if (current.getAuthenticationParameters() != null) {
+ b.authenticationParameters(current.getAuthenticationParameters());
+ }
+
+ pulsarAdmin.clusters().updateCluster(clusterName, b.build());
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("serviceUrl", serviceUrl);
+ result.put("updated", true);
+
+ return createSuccessResult("Cluster configuration updated successfully", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult("Invalid input: " + e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to update cluster config", e);
+ return createErrorResult("Failed to update cluster config: " + e.getMessage());
+ }
+ })
+ .build());
+ }
+
+ private void registerDeleteCluster(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "delete-cluster",
+ "Delete a Pulsar cluster by name",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "clusterName": {
+ "type": "string",
+ "description": "Name of the cluster to delete"
+ },
+ "force": {
+ "type": "boolean",
+ "description": "Force cluster to delete",
+ "default": false
+ }
+ },
+ "required": ["clusterName"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName").trim();
+ boolean force = getBooleanParam(request.arguments(), "force", false);
+
+ if (!force) {
+ List tenants = pulsarAdmin.tenants().getTenants();
+ List referencingTenants = new ArrayList<>();
+ for (String tenant : tenants) {
+ var info = pulsarAdmin.tenants().getTenantInfo(tenant);
+ var allowed = info != null ? info.getAllowedClusters() : null;
+ if (allowed != null && allowed.contains(clusterName)) {
+ referencingTenants.add(tenant);
+ }
+ }
+
+ List referencingNamespaces = new ArrayList<>();
+ for (String tenant : tenants) {
+ var nss = pulsarAdmin.namespaces().getNamespaces(tenant);
+ for (String ns : nss) {
+ var repl = pulsarAdmin.namespaces().getNamespaceReplicationClusters(ns);
+ if (repl != null && repl.contains(clusterName)) {
+ referencingNamespaces.add(ns);
+ }
+ }
+ }
+
+ if (!referencingTenants.isEmpty() || !referencingNamespaces.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Cluster '").append(clusterName)
+ .append("' is still referenced. Use 'force: true' to delete anyway.");
+ if (!referencingTenants.isEmpty()) {
+ sb.append(" Referenced by tenants: ").append(referencingTenants);
+ }
+ if (!referencingNamespaces.isEmpty()) {
+ sb.append(" Referenced by namespaces: ").append(referencingNamespaces);
+ }
+ return createErrorResult(sb.toString());
+ }
+ }
+
+ pulsarAdmin.clusters().deleteCluster(clusterName);
+
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("deleted", true);
+ result.put("forced", force);
+
+ return createSuccessResult("Cluster deleted successfully", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to delete cluster", e);
+ return createErrorResult("Failed to delete cluster: " + e.getMessage());
+ }
+ })
+ .build());
+ }
+
+ private void registerGetClusterStats(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "get-cluster-stats",
+ "Get statistics for a given Pulsar cluster",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "clusterName": {
+ "type": "string",
+ "description": "The name of the cluster to retrieve stats for"
+ }
+ },
+ "required": ["clusterName"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+ if (clusterName == null || clusterName.isBlank()) {
+ return createErrorResult("Missing required parameter: clusterName");
+ }
+
+ var brokers = pulsarAdmin.brokers().getActiveBrokers(clusterName);
+
+ Map stats = new HashMap<>();
+ stats.put("clusterName", clusterName);
+ stats.put("activeBrokers", brokers);
+ stats.put("brokerCount", brokers.size());
+
+ return createSuccessResult("Cluster stats retrieved successfully", stats);
+
+ } catch (IllegalArgumentException e){
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get cluster stats", e);
+ return createErrorResult("Failed to get cluster stats: " + e.getMessage());
+ }
+ })
+ .build());
+ }
+
+ private void registerListBrokers(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "list-brokers",
+ "List all active brokers in a given Pulsar cluster",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "clusterName": {
+ "type": "string",
+ "description": "The name of the Pulsar cluster"
+ }
+ },
+ "required": ["clusterName"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName").trim();
+ if (clusterName.isEmpty()) {
+ return createErrorResult("clusterName cannot be blank");
+ }
+
+ try {
+ pulsarAdmin.clusters().getCluster(clusterName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ return createErrorResult("Cluster '" + clusterName + "' not found");
+ }
+
+ List active = new ArrayList<>(pulsarAdmin.brokers().getActiveBrokers(clusterName));
+ active.sort(String::compareTo);
+
+ String leader = null;
+ try {
+ leader = String.valueOf(pulsarAdmin.brokers().getLeaderBroker());
+ } catch (Exception ignore) {}
+
+ var dynamicConfigNames = pulsarAdmin.brokers().getDynamicConfigurationNames();
+ List dynamicNamesSorted = dynamicConfigNames == null
+ ? List.of()
+ : dynamicConfigNames.stream().sorted().toList();
+
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("activeBrokers", active);
+ result.put("brokerCount", active.size());
+ result.put("leaderBroker", leader);
+ result.put("dynamicConfigNames", dynamicNamesSorted);
+ result.put("available", !active.isEmpty());
+ result.put("timestamp", System.currentTimeMillis());
+
+ String msg = "List of active brokers retrieved successfully"
+ + (leader != null ? "" : " (leader not available)");
+ return createSuccessResult(msg, result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to list brokers", e);
+ return createErrorResult("Failed to list brokers: " + e.getMessage());
+ }
+ })
+ .build());
+ }
+
+ private void registerGetBrokerStats(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "get-broker-stats",
+ "Get statistics for a specific Pulsar broker",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "brokerUrl": {
+ "type": "string",
+ "description": "The URL of the broker (e.g., 'localhost:8080')"
+ }
+ },
+ "required": ["brokerUrl"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String brokerUrl = getRequiredStringParam(request.arguments(), "brokerUrl");
+
+ if (brokerUrl == null || brokerUrl.isBlank()) {
+ return createErrorResult("Missing required parameter: brokerUrl");
+ }
+
+ var brokerStats = pulsarAdmin.brokerStats().getTopics();
+
+ Map result = new HashMap<>();
+ result.put("brokerUrl", brokerUrl);
+ result.put("stats", brokerStats);
+
+ return createSuccessResult("Broker stats retrieved successfully", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get broker stats", e);
+ return createErrorResult("Failed to get broker stats: " + e.getMessage());
+ }
+ })
+ .build()
+ );
+ }
+
+ private void registerGetClusterFailureDomain(McpSyncServer mcpServer) {
+ McpSchema.Tool tool = createTool(
+ "get-cluster-failure-domain",
+ "Get failure domain(s) for a specific Pulsar cluster",
+ """
+ {
+ "type": "object",
+ "properties": {
+ "clusterName": {
+ "type": "string",
+ "description": "The name of the Pulsar cluster"
+ },
+ "domainName": {
+ "type": "string",
+ "description": "Optional. If set, only this failure domain will be returned"
+ },
+ "includeEmpty": {
+ "type": "boolean",
+ "description": "Include domains with empty broker list",
+ "default": true
+ }
+ },
+ "required": ["clusterName"]
+ }
+ """
+ );
+
+ mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler((exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName").trim();
+ String domainName = getStringParam(request.arguments(), "domainName");
+ boolean includeEmpty = getBooleanParam(request.arguments(), "includeEmpty", true);
+
+ if (clusterName.isEmpty()) {
+ return createErrorResult("clusterName cannot be blank");
+ }
+ if (domainName != null) {
+ domainName = domainName.trim();
+ }
+
+ try {
+ pulsarAdmin.clusters().getCluster(clusterName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ return createErrorResult("Cluster '" + clusterName + "' not found");
+ }
+
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+
+ if (domainName != null && !domainName.isEmpty()) {
+ try {
+ FailureDomain fd =
+ pulsarAdmin.clusters().getFailureDomain(clusterName, domainName);
+
+ Set brokers = (fd != null && fd.getBrokers() != null)
+ ? new HashSet<>(fd.getBrokers())
+ : new HashSet<>();
+
+ if (!includeEmpty && brokers.isEmpty()) {
+ result.put("domains", List.of());
+ result.put("domainCount", 0);
+ result.put("available", false);
+ return createSuccessResult(
+ "Domain exists but filtered by includeEmpty=false", result);
+ }
+
+ Map item = new HashMap<>();
+ item.put("domainName", domainName);
+ item.put("brokers", brokers.stream().sorted().toList());
+ item.put("brokerCount", brokers.size());
+
+ result.put("domains", List.of(item));
+ result.put("domainCount", 1);
+ result.put("available", true);
+
+ return createSuccessResult(
+ "Cluster failure domain retrieved successfully", result);
+ } catch (PulsarAdminException.NotFoundException e) {
+ return createErrorResult(
+ "Domain '"
+ + domainName
+ + "' not found in cluster '"
+ + clusterName + "'");
+ }
+ } else {
+ Map raw =
+ pulsarAdmin.clusters().getFailureDomains(clusterName);
+ if (raw == null) {
+ raw = Map.of();
+ }
+
+ List