Skip to content

Commit 372c548

Browse files
authored
discovery functions as tools (#37)
making pulsar functions be able to exported as MCP tools
1 parent 7a8ba68 commit 372c548

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+5533
-559
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ tmp/
77
*.log
88

99
# Go
10-
vendor
10+
vendor
11+
.cursor/

.licenserc.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright 2024 StreamNative
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
header:
17+
license:
18+
spdx-id: Apache-2.0
19+
copyright-owner: StreamNative
20+
21+
paths-ignore:
22+
- 'dist'
23+
- 'licenses'
24+
- '**/*.md'
25+
- 'LICENSE'
26+
- 'NOTICE'
27+
- '.github/**'
28+
- 'PROJECT'
29+
- '**/go.mod'
30+
- '**/go.work'
31+
- '**/go.work.sum'
32+
- '**/go.sum'
33+
- '**/*.json'
34+
- 'sdk/**'
35+
- '**/*.yaml'
36+
- '**/*.yml'
37+
- 'Makefile'
38+
- '.gitignore'
39+
40+
comment: on-failure

Makefile

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ build:
1919
-X ${VERSION_PATH}.date=${BUILD_DATE}" \
2020
-o bin/snmcp cmd/streamnative-mcp-server/main.go
2121

22-
# go install github.com/elastic/go-licenser@latest
23-
.PHONY: fix-license
24-
fix-license:
25-
go-licenser -license ASL2 -exclude sdk
22+
.PHONY: license-check
23+
license-check:
24+
license-eye header check
25+
26+
# go install github.com/apache/skywalking-eyes/cmd/license-eye@latest
27+
.PHONY: license-fix
28+
license-fix:
29+
license-eye header fix

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ The StreamNative MCP Server allows you to enable or disable specific groups of f
233233
| Feature | Description | Docs |
234234
|---------------------|------------------------------------------------------------------|------|
235235
| `streamnative-cloud`| Manage StreamNative Cloud context and check resource logs | [streamnative_cloud.md](docs/tools/streamnative_cloud.md) |
236+
| `functions-as-tools` | Dynamically exposes deployed Pulsar Functions as invokable MCP tools, with automatic input/output schema handling. | [functions_as_tools.md](docs/tools/functions_as_tools.md) |
236237

237238
You can combine these features as needed using the `--features` flag. For example, to enable only Pulsar client features:
238239
```bash

docs/tools/functions_as_tools.md

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Functions as Tools
2+
3+
The "Functions as Tools" feature allows the StreamNative MCP Server to dynamically discover Apache Pulsar Functions deployed in your cluster and expose them as invokable MCP tools for AI agents. This significantly enhances the capabilities of AI agents by allowing them to interact with custom business logic encapsulated in Pulsar Functions without manual tool registration for each function.
4+
5+
## How it Works
6+
7+
### 1. Function Discovery
8+
The MCP Server automatically discovers Pulsar Functions available in the connected Pulsar cluster. It periodically polls for functions and identifies those suitable for exposure as tools.
9+
10+
By default, if no custom name is provided (see Customizing Tool Properties), the MCP tool name might be derived from the Function's Fully Qualified Name (FQN), such as `pulsar_function_$tenant_$namespace_$name`.
11+
12+
### 2. Schema Conversion
13+
For each discovered function, the MCP Server attempts to extract its input and output schema definitions. Pulsar Functions can be defined with various schema types for their inputs and outputs (e.g., primitive types, AVRO, JSON).
14+
15+
The server then converts these native Pulsar schemas into a format compatible with MCP tools. This allows the AI agent to understand the expected input parameters and the structure of the output.
16+
17+
Supported Pulsar schema types for automatic conversion include:
18+
* Primitive types (String, Boolean, Numbers like INT8, INT16, INT32, INT64, FLOAT, DOUBLE)
19+
* AVRO
20+
* JSON
21+
22+
If a function uses an unsupported schema type for its input or output, or if schemas are not clearly defined, it might not be exposed as an MCP tool.
23+
24+
## Enabling the Feature
25+
To enable this functionality, you need to specific the default `--pulsar-instance` and `--pulsar-cluster`, and include `functions-as-tools` in the `--features` flag when starting the StreamNative MCP Server.
26+
27+
Example:
28+
```bash
29+
snmcp sse --organization my-org --key-file /path/to/key-file.json --features pulsar-admin,pulsar-client,functions-as-tools --pulsar-instance instance --pulsar-cluster cluster
30+
```
31+
If `functions-as-tools` is part of a broader feature set like `all` and `streamnative-cloud`, enabling `all` or `streamnative-cloud` would also activate this feature.
32+
33+
## Customizing Tool Properties
34+
You can customize how your Pulsar Functions appear as MCP tools (their name and description) by providing specific runtime options when deploying or updating your functions. This is done using the `--custom-runtime-options` flag with `pulsar-admin functions create` or `pulsar-admin functions update`.
35+
36+
The MCP Server looks for the following environment variables within the custom runtime options:
37+
* `MCP_TOOL_NAME`: Specifies the desired name for the MCP tool.
38+
* `MCP_TOOL_DESCRIPTION`: Provides a description for the MCP tool, which helps the AI agent understand its purpose.
39+
40+
**Format for `--custom-runtime-options`**:
41+
The options should be a JSON string where you define an `env` map containing `MCP_TOOL_NAME` and `MCP_TOOL_DESCRIPTION`.
42+
43+
**Example**:
44+
When deploying a Pulsar Function, you can set these properties as follows:
45+
```bash
46+
pulsar-admin functions create \
47+
--tenant public \
48+
--namespace default \
49+
--name my-custom-logic-function \
50+
--inputs "persistent://public/default/input-topic" \
51+
--output "persistent://public/default/output-topic" \
52+
--py my_function.py \
53+
--classname my_function.MyFunction \
54+
--custom-runtime-options \
55+
'''
56+
{
57+
"env": {
58+
"MCP_TOOL_NAME": "CustomObjectFunction",
59+
"MCP_TOOL_DESCRIPTION": "Takes an input number and returns the value incremented by 100."
60+
}
61+
}
62+
'''
63+
```
64+
In this example:
65+
- The MCP tool derived from `my-custom-logic-function` will be named `CustomObjectFunction`.
66+
- Its description will be "Takes an input number and returns the value incremented by 100."
67+
68+
If these custom options are not provided, the MCP tool name might default to a derivative of the function's FQN, and the description might be generic and cannot help AI Agent to understand the purpose of the MCP tool.
69+
70+
## Server-Side Configuration via Environment Variables
71+
72+
Beyond customizing individual tool properties at the function deployment level, you can also configure the overall behavior of the "Functions as Tools" feature on the StreamNative MCP Server side using the following environment variables. These variables are typically set when starting the MCP server.
73+
74+
* `FUNCTIONS_AS_TOOLS_POLL_INTERVAL`
75+
* **Description**: Controls how frequently the MCP Server polls the Pulsar cluster to discover or update available Pulsar Functions. Setting a lower value means functions are discovered faster, but it may increase the load on the Pulsar cluster.
76+
* **Unit**: Seconds
77+
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()`. Refer to the `pkg/pftools` package for the precise default (e.g., if the internal default is 60 seconds, it will be `60`).
78+
* `FUNCTIONS_AS_TOOLS_TIMEOUT`
79+
* **Description**: Sets the default timeout for invoking a Pulsar Function as an MCP tool. If a function execution exceeds this duration, the call will be considered timed out.
80+
* **Unit**: Seconds
81+
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()` (e.g., if the internal default is 30 seconds, it will be `30`).
82+
* `FUNCTIONS_AS_TOOLS_FAILURE_THRESHOLD`
83+
* **Description**: Defines the number of consecutive failures for a specific Pulsar Function tool before it is temporarily moved to a "circuit breaker open" state. In this state, further calls to this specific function tool will be immediately rejected without attempting to execute the function, until the `FUNCTIONS_AS_TOOLS_RESET_TIMEOUT` is reached.
84+
* **Unit**: Integer (number of failures)
85+
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()` (e.g., if the internal default is 5, it will be `5`).
86+
* `FUNCTIONS_AS_TOOLS_RESET_TIMEOUT`
87+
* **Description**: Specifies the duration for which a Pulsar Function tool remains in the "circuit breaker open" state (due to exceeding the failure threshold) before the MCP server attempts to reset the circuit and allow calls again.
88+
* **Unit**: Seconds
89+
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()` (e.g., if the internal default is 60 seconds, it will be `60`).
90+
* `FUNCTIONS_AS_TOOLS_TENANT_NAMESPACES`
91+
* **Description**: A comma-separated list of Pulsar `tenant/namespace` strings that the MCP Server should scan for Pulsar Functions. This allows you to restrict function discovery to specific namespaces. If not set, the server might attempt to discover functions from all namespaces it has access to, as permitted by its Pulsar client configuration.
92+
* **Format**: `tenant1/namespace1,tenant2/namespace2`
93+
* **Example**: `public/default,my-tenant/app-functions`
94+
* **Default**: Empty (meaning discover from all accessible namespaces (only on StreamNative Cloud)).
95+
* `FUNCTIONS_AS_TOOLS_STRICT_EXPORT`
96+
* **Description**: Only export functions with `MCP_TOOL_NAME` and `MCP_TOOL_DESCRIPTION` defined.
97+
* **Format**: `true` or `false`
98+
* **Example**: `false`
99+
* **Default**: `true`
100+
101+
## Considerations and Limitations
102+
103+
* **Schema Definition**: For reliable schema conversion, ensure your Pulsar Functions have clearly defined input and output schemas using Pulsar's schema registry capabilities. Functions with ambiguous or `BYTES` schemas might not be converted effectively or might default to generic byte array inputs/outputs.
104+
* **Function State**: This feature primarily focuses on the stateless request/response invocation pattern of functions.
105+
* **Discovery Latency**: There might be a slight delay between deploying/updating a function and it appearing as an MCP tool, due to the server's polling interval for function discovery.
106+
* **Error Handling**: The MCP Server will attempt to relay errors from function executions, but the specifics might vary.
107+
* **Security**: Ensure that only intended functions are exposed by managing permissions within your Pulsar cluster. The MCP Server will operate with the permissions of its Pulsar client.

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ require (
66
github.com/99designs/keyring v1.2.2
77
github.com/apache/pulsar-client-go v0.13.1
88
github.com/dgrijalva/jwt-go v3.2.0+incompatible
9+
github.com/google/go-cmp v0.7.0
910
github.com/hamba/avro/v2 v2.28.0
10-
github.com/mark3labs/mcp-go v0.27.0
11+
github.com/mark3labs/mcp-go v0.28.0
1112
github.com/mitchellh/go-homedir v1.1.0
1213
github.com/pkg/errors v0.9.1
1314
github.com/sirupsen/logrus v1.9.3
@@ -16,6 +17,7 @@ require (
1617
github.com/streamnative/pulsarctl v0.4.3-0.20250312214758-e472faec284b
1718
github.com/streamnative/streamnative-mcp-server/sdk/sdk-apiserver v0.0.0-20250506174209-b67ea08ddd82
1819
github.com/streamnative/streamnative-mcp-server/sdk/sdk-kafkaconnect v0.0.0-00010101000000-000000000000
20+
github.com/stretchr/testify v1.10.0
1921
github.com/twmb/franz-go v1.18.1
2022
github.com/twmb/franz-go/pkg/kadm v1.16.0
2123
github.com/twmb/franz-go/pkg/sr v1.3.0

go.sum

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
21
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
32
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
43
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
@@ -91,9 +90,8 @@ github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
9190
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
9291
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
9392
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
94-
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
95-
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
96-
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
93+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
94+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
9795
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
9896
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
9997
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -131,8 +129,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
131129
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
132130
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
133131
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
134-
github.com/mark3labs/mcp-go v0.27.0 h1:iok9kU4DUIU2/XVLgFS2Q9biIDqstC0jY4EQTK2Erzc=
135-
github.com/mark3labs/mcp-go v0.27.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
132+
github.com/mark3labs/mcp-go v0.28.0 h1:7yl4y5D1KYU2f/9Uxp7xfLIggfunHoESCRbrjcytcLM=
133+
github.com/mark3labs/mcp-go v0.28.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
136134
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
137135
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
138136
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=

go.work.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
138138
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
139139
github.com/mark3labs/mcp-go v0.23.1 h1:RzTzZ5kJ+HxwnutKA4rll8N/pKV6Wh5dhCmiJUu5S9I=
140140
github.com/mark3labs/mcp-go v0.23.1/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
141+
github.com/mark3labs/mcp-go v0.28.0 h1:7yl4y5D1KYU2f/9Uxp7xfLIggfunHoESCRbrjcytcLM=
142+
github.com/mark3labs/mcp-go v0.28.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
141143
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
142144
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
143145
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=

pkg/cmd/mcp/sse.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/mark3labs/mcp-go/server"
3232
"github.com/pkg/errors"
3333
"github.com/spf13/cobra"
34+
"github.com/streamnative/streamnative-mcp-server/pkg/common"
3435
"github.com/streamnative/streamnative-mcp-server/pkg/mcp"
3536
)
3637

@@ -68,24 +69,29 @@ func runSseServer(configOpts *ServerOptions) error {
6869
}
6970

7071
// 3. Create a new MCP server
71-
ctx = context.WithValue(ctx, mcp.OptionsKey, configOpts.Options)
72-
mcpServer := server.NewSSEServer(
73-
newMcpServer(configOpts, logger),
72+
ctx = context.WithValue(ctx, common.OptionsKey, configOpts.Options)
73+
mcpServer := newMcpServer(configOpts, logger)
74+
75+
// add Pulsar Functions as MCP tools
76+
mcp.PulsarFunctionManagedMcpTools(mcpServer, false, configOpts.Features)
77+
78+
sseServer := server.NewSSEServer(
79+
mcpServer,
7480
server.WithStaticBasePath(configOpts.HTTPPath),
7581
server.WithHTTPContextFunc(func(ctx context.Context, _ *http.Request) context.Context {
76-
return context.WithValue(ctx, mcp.OptionsKey, configOpts.Options)
82+
return context.WithValue(ctx, common.OptionsKey, configOpts.Options)
7783
}),
7884
)
7985

8086
// 4. Expose the full SSE URL to the user
81-
ssePath := mcpServer.CompleteSsePath()
87+
ssePath := sseServer.CompleteSsePath()
8288
fmt.Fprintf(os.Stderr, "StreamNative Cloud MCP Server listening on http://%s%s\n",
8389
configOpts.HTTPAddr, ssePath)
8490

8591
// 5. Run the HTTP listener in a goroutine
8692
errCh := make(chan error, 1)
8793
go func() {
88-
if err := mcpServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
94+
if err := sseServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
8995
errCh <- err // bubble up real crashes
9096
}
9197
}()
@@ -108,7 +114,7 @@ func runSseServer(configOpts *ServerOptions) error {
108114
defer cancel()
109115

110116
// First try to shut down the SSE server
111-
if err := mcpServer.Shutdown(shCtx); err != nil {
117+
if err := sseServer.Shutdown(shCtx); err != nil {
112118
if !errors.Is(err, http.ErrServerClosed) {
113119
logger.Errorf("Error shutting down SSE server: %v", err)
114120
}

pkg/cmd/mcp/stdio.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import (
3030
"github.com/mark3labs/mcp-go/server"
3131
"github.com/sirupsen/logrus"
3232
"github.com/spf13/cobra"
33+
"github.com/streamnative/streamnative-mcp-server/pkg/common"
3334
"github.com/streamnative/streamnative-mcp-server/pkg/log"
34-
"github.com/streamnative/streamnative-mcp-server/pkg/mcp"
3535
)
3636

3737
func NewCmdMcpStdioServer(configOpts *ServerOptions) *cobra.Command {
@@ -63,7 +63,7 @@ func runStdioServer(configOpts *ServerOptions) error {
6363
}
6464

6565
// Create a new MCP server
66-
ctx = context.WithValue(ctx, mcp.OptionsKey, configOpts.Options)
66+
ctx = context.WithValue(ctx, common.OptionsKey, configOpts.Options)
6767
stdLogger := stdlog.New(logger.Writer(), "snmcp-server", 0)
6868
stdioServer := server.NewStdioServer(newMcpServer(configOpts, logger))
6969

0 commit comments

Comments
 (0)