Skip to content

Commit 0ada88c

Browse files
committed
add docs
1 parent b7889fe commit 0ada88c

File tree

3 files changed

+77
-11
lines changed

3 files changed

+77
-11
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ The StreamNative MCP Server allows you to enable or disable specific groups of f
233233
| Feature | Description | Docs |
234234
|---------------------|------------------------------------------------------------------|------|
235235
| `streamnative-cloud`| Manage StreamNative Cloud context and check resource logs | [streamnative_cloud.md](docs/tools/streamnative_cloud.md) |
236+
| `functions-as-tools` | Dynamically exposes deployed Pulsar Functions as invokable MCP tools, with automatic input/output schema handling. | [functions_as_tools.md](docs/tools/functions_as_tools.md) |
236237

237238
You can combine these features as needed using the `--features` flag. For example, to enable only Pulsar client features:
238239
```bash

docs/tools/functions_as_tools.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Functions as Tools
2+
3+
The "Functions as Tools" feature allows the StreamNative MCP Server to dynamically discover Apache Pulsar Functions deployed in your cluster and expose them as invokable MCP tools for AI agents. This significantly enhances the capabilities of AI agents by allowing them to interact with custom business logic encapsulated in Pulsar Functions without manual tool registration for each function.
4+
5+
## How it Works
6+
7+
### 1. Function Discovery
8+
The MCP Server automatically discovers Pulsar Functions available in the connected Pulsar cluster. It periodically polls for functions and identifies those suitable for exposure as tools.
9+
10+
By default, if no custom name is provided (see Customizing Tool Properties), the MCP tool name might be derived from the Function's Fully Qualified Name (FQN), such as `pulsar_function_$tenant_$namespace_$name`.
11+
12+
### 2. Schema Conversion
13+
For each discovered function, the MCP Server attempts to extract its input and output schema definitions. Pulsar Functions can be defined with various schema types for their inputs and outputs (e.g., primitive types, AVRO, JSON).
14+
15+
The server then converts these native Pulsar schemas into a format compatible with MCP tools. This allows the AI agent to understand the expected input parameters and the structure of the output.
16+
17+
Supported Pulsar schema types for automatic conversion include:
18+
* Primitive types (String, Boolean, Numbers like INT8, INT16, INT32, INT64, FLOAT, DOUBLE)
19+
* AVRO
20+
* JSON
21+
22+
If a function uses an unsupported schema type for its input or output, or if schemas are not clearly defined, it might not be exposed as an MCP tool.
23+
24+
## Enabling the Feature
25+
To enable this functionality, you need to specific the default `--pulsar-instance` and `--pulsar-cluster`, and include `functions-as-tools` in the `--features` flag when starting the StreamNative MCP Server.
26+
27+
Example:
28+
```bash
29+
snmcp stdio --organization my-org --key-file /path/to/key-file.json --features pulsar-admin,pulsar-client,functions-as-tools --pulsar-instance instance --pulsar-cluster cluster
30+
```
31+
If `functions-as-tools` is part of a broader feature set like `all` and `streamnative-cloud`, enabling `all` or `streamnative-cloud` would also activate this feature.
32+
33+
## Customizing Tool Properties
34+
You can customize how your Pulsar Functions appear as MCP tools (their name and description) by providing specific runtime options when deploying or updating your functions. This is done using the `--custom-runtime-options` flag with `pulsar-admin functions create` or `pulsar-admin functions update`.
35+
36+
The MCP Server looks for the following environment variables within the custom runtime options:
37+
* `MCP_TOOL_NAME`: Specifies the desired name for the MCP tool.
38+
* `MCP_TOOL_DESCRIPTION`: Provides a description for the MCP tool, which helps the AI agent understand its purpose.
39+
40+
**Format for `--custom-runtime-options`**:
41+
The options should be a JSON string where you define an `env` map containing `MCP_TOOL_NAME` and `MCP_TOOL_DESCRIPTION`.
42+
43+
**Example**:
44+
When deploying a Pulsar Function, you can set these properties as follows:
45+
```bash
46+
pulsar-admin functions create \
47+
--tenant public \
48+
--namespace default \
49+
--name my-custom-logic-function \
50+
--inputs "persistent://public/default/input-topic" \
51+
--output "persistent://public/default/output-topic" \
52+
--py my_function.py \
53+
--classname my_function.MyFunction \
54+
--custom-runtime-options \
55+
'''
56+
{
57+
"env": {
58+
"MCP_TOOL_NAME": "CustomObjectFunction",
59+
"MCP_TOOL_DESCRIPTION": "Takes an input number and returns the value incremented by 100."
60+
}
61+
}
62+
'''
63+
```
64+
In this example:
65+
- The MCP tool derived from `my-custom-logic-function` will be named `CustomObjectFunction`.
66+
- Its description will be "Takes an input number and returns the value incremented by 100."
67+
68+
If these custom options are not provided, the MCP tool name might default to a derivative of the function's FQN, and the description might be generic and cannot help AI Agent to understand the purpose of the MCP tool.
69+
70+
## Considerations and Limitations
71+
72+
* **Schema Definition**: For reliable schema conversion, ensure your Pulsar Functions have clearly defined input and output schemas using Pulsar's schema registry capabilities. Functions with ambiguous or `BYTES` schemas might not be converted effectively or might default to generic byte array inputs/outputs.
73+
* **Function State**: This feature primarily focuses on the stateless request/response invocation pattern of functions.
74+
* **Discovery Latency**: There might be a slight delay between deploying/updating a function and it appearing as an MCP tool, due to the server's polling interval for function discovery.
75+
* **Error Handling**: The MCP Server will attempt to relay errors from function executions, but the specifics might vary.
76+
* **Security**: Ensure that only intended functions are exposed by managing permissions within your Pulsar cluster. The MCP Server will operate with the permissions of its Pulsar client.

pkg/mcp/pulsar_functions_as_tools.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,11 @@ import (
3030
"github.com/streamnative/streamnative-mcp-server/pkg/pftools"
3131
)
3232

33-
// 管理器跟踪
3433
var (
3534
functionManagers = make(map[string]*pftools.PulsarFunctionManager)
3635
functionManagersLock sync.RWMutex
3736
)
3837

39-
// StopAllPulsarFunctionManagers 停止所有注册的Pulsar Function管理器
40-
// 可在程序退出前调用
4138
func StopAllPulsarFunctionManagers() {
4239
functionManagersLock.Lock()
4340
defer functionManagersLock.Unlock()
@@ -48,26 +45,22 @@ func StopAllPulsarFunctionManagers() {
4845
delete(functionManagers, id)
4946
}
5047

51-
// 给一些时间让管理器清理资源
5248
if len(functionManagers) > 0 {
5349
time.Sleep(500 * time.Millisecond)
5450
}
5551

5652
log.Println("All Pulsar Function managers stopped")
5753
}
5854

59-
// PulsarFunctionManagedMcpTools 将运行中的Pulsar Functions集成为MCP工具
6055
func PulsarFunctionManagedMcpTools(s *server.MCPServer, readOnly bool, features []string) {
6156
if !slices.Contains(features, string(FeatureAll)) &&
6257
!slices.Contains(features, string(FeatureFunctionsAsTools)) &&
6358
!slices.Contains(features, string(FeatureStreamNativeCloud)) {
6459
return
6560
}
6661

67-
// 创建新的管理器选项
6862
options := pftools.DefaultManagerOptions()
6963

70-
// 从环境变量读取配置
7164
if pollIntervalStr := os.Getenv("PULSAR_FUNCTIONS_POLL_INTERVAL"); pollIntervalStr != "" {
7265
if seconds, err := strconv.Atoi(pollIntervalStr); err == nil && seconds > 0 {
7366
options.PollInterval = time.Duration(seconds) * time.Second
@@ -96,23 +89,19 @@ func PulsarFunctionManagedMcpTools(s *server.MCPServer, readOnly bool, features
9689
}
9790
}
9891

99-
// 设置要监听的租户和命名空间
10092
if tenantNamespacesStr := os.Getenv("PULSAR_FUNCTIONS_TENANT_NAMESPACES"); tenantNamespacesStr != "" {
10193
options.TenantNamespaces = strings.Split(tenantNamespacesStr, ",")
10294
log.Printf("Setting Pulsar Functions tenant namespaces to %v", options.TenantNamespaces)
10395
}
10496

105-
// 创建管理器
10697
manager, err := pftools.NewPulsarFunctionManager(s, readOnly, options)
10798
if err != nil {
10899
log.Printf("Failed to create Pulsar Function manager: %v", err)
109100
return
110101
}
111102

112-
// 启动管理器
113103
manager.Start()
114104

115-
// 将管理器添加到全局跟踪中
116105
managerID := "pulsar_functions_manager_" + strconv.FormatInt(time.Now().UnixNano(), 10)
117106
functionManagersLock.Lock()
118107
functionManagers[managerID] = manager

0 commit comments

Comments
 (0)