Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ tmp/
*.log

# Go
vendor
vendor
.cursor/
40 changes: 40 additions & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2024 StreamNative
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

header:
license:
spdx-id: Apache-2.0
copyright-owner: StreamNative

paths-ignore:
- 'dist'
- 'licenses'
- '**/*.md'
- 'LICENSE'
- 'NOTICE'
- '.github/**'
- 'PROJECT'
- '**/go.mod'
- '**/go.work'
- '**/go.work.sum'
- '**/go.sum'
- '**/*.json'
- 'sdk/**'
- '**/*.yaml'
- '**/*.yml'
- 'Makefile'
- '.gitignore'

comment: on-failure
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ build:
-X ${VERSION_PATH}.date=${BUILD_DATE}" \
-o bin/snmcp cmd/streamnative-mcp-server/main.go

# go install github.com/elastic/go-licenser@latest
.PHONY: fix-license
fix-license:
go-licenser -license ASL2 -exclude sdk
.PHONY: license-check
license-check:
license-eye header check

# go install github.com/apache/skywalking-eyes/cmd/license-eye@latest
.PHONY: license-fix
license-fix:
license-eye header fix
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ The StreamNative MCP Server allows you to enable or disable specific groups of f
| Feature | Description | Docs |
|---------------------|------------------------------------------------------------------|------|
| `streamnative-cloud`| Manage StreamNative Cloud context and check resource logs | [streamnative_cloud.md](docs/tools/streamnative_cloud.md) |
| `functions-as-tools` | Dynamically exposes deployed Pulsar Functions as invokable MCP tools, with automatic input/output schema handling. | [functions_as_tools.md](docs/tools/functions_as_tools.md) |

You can combine these features as needed using the `--features` flag. For example, to enable only Pulsar client features:
```bash
Expand Down
107 changes: 107 additions & 0 deletions docs/tools/functions_as_tools.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Functions as Tools

The "Functions as Tools" feature allows the StreamNative MCP Server to dynamically discover Apache Pulsar Functions deployed in your cluster and expose them as invokable MCP tools for AI agents. This significantly enhances the capabilities of AI agents by allowing them to interact with custom business logic encapsulated in Pulsar Functions without manual tool registration for each function.

## How it Works

### 1. Function Discovery
The MCP Server automatically discovers Pulsar Functions available in the connected Pulsar cluster. It periodically polls for functions and identifies those suitable for exposure as tools.

By default, if no custom name is provided (see Customizing Tool Properties), the MCP tool name might be derived from the Function's Fully Qualified Name (FQN), such as `pulsar_function_$tenant_$namespace_$name`.

### 2. Schema Conversion
For each discovered function, the MCP Server attempts to extract its input and output schema definitions. Pulsar Functions can be defined with various schema types for their inputs and outputs (e.g., primitive types, AVRO, JSON).

The server then converts these native Pulsar schemas into a format compatible with MCP tools. This allows the AI agent to understand the expected input parameters and the structure of the output.

Supported Pulsar schema types for automatic conversion include:
* Primitive types (String, Boolean, Numbers like INT8, INT16, INT32, INT64, FLOAT, DOUBLE)
* AVRO
* JSON

If a function uses an unsupported schema type for its input or output, or if schemas are not clearly defined, it might not be exposed as an MCP tool.

## Enabling the Feature
To enable this functionality, you need to specific the default `--pulsar-instance` and `--pulsar-cluster`, and include `functions-as-tools` in the `--features` flag when starting the StreamNative MCP Server.

Example:
```bash
snmcp sse --organization my-org --key-file /path/to/key-file.json --features pulsar-admin,pulsar-client,functions-as-tools --pulsar-instance instance --pulsar-cluster cluster
```
If `functions-as-tools` is part of a broader feature set like `all` and `streamnative-cloud`, enabling `all` or `streamnative-cloud` would also activate this feature.

## Customizing Tool Properties
You can customize how your Pulsar Functions appear as MCP tools (their name and description) by providing specific runtime options when deploying or updating your functions. This is done using the `--custom-runtime-options` flag with `pulsar-admin functions create` or `pulsar-admin functions update`.

The MCP Server looks for the following environment variables within the custom runtime options:
* `MCP_TOOL_NAME`: Specifies the desired name for the MCP tool.
* `MCP_TOOL_DESCRIPTION`: Provides a description for the MCP tool, which helps the AI agent understand its purpose.

**Format for `--custom-runtime-options`**:
The options should be a JSON string where you define an `env` map containing `MCP_TOOL_NAME` and `MCP_TOOL_DESCRIPTION`.

**Example**:
When deploying a Pulsar Function, you can set these properties as follows:
```bash
pulsar-admin functions create \
--tenant public \
--namespace default \
--name my-custom-logic-function \
--inputs "persistent://public/default/input-topic" \
--output "persistent://public/default/output-topic" \
--py my_function.py \
--classname my_function.MyFunction \
--custom-runtime-options \
'''
{
"env": {
"MCP_TOOL_NAME": "CustomObjectFunction",
"MCP_TOOL_DESCRIPTION": "Takes an input number and returns the value incremented by 100."
}
}
'''
```
In this example:
- The MCP tool derived from `my-custom-logic-function` will be named `CustomObjectFunction`.
- Its description will be "Takes an input number and returns the value incremented by 100."

If these custom options are not provided, the MCP tool name might default to a derivative of the function's FQN, and the description might be generic and cannot help AI Agent to understand the purpose of the MCP tool.

## Server-Side Configuration via Environment Variables

Beyond customizing individual tool properties at the function deployment level, you can also configure the overall behavior of the "Functions as Tools" feature on the StreamNative MCP Server side using the following environment variables. These variables are typically set when starting the MCP server.

* `FUNCTIONS_AS_TOOLS_POLL_INTERVAL`
* **Description**: Controls how frequently the MCP Server polls the Pulsar cluster to discover or update available Pulsar Functions. Setting a lower value means functions are discovered faster, but it may increase the load on the Pulsar cluster.
* **Unit**: Seconds
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()`. Refer to the `pkg/pftools` package for the precise default (e.g., if the internal default is 60 seconds, it will be `60`).
* `FUNCTIONS_AS_TOOLS_TIMEOUT`
* **Description**: Sets the default timeout for invoking a Pulsar Function as an MCP tool. If a function execution exceeds this duration, the call will be considered timed out.
* **Unit**: Seconds
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()` (e.g., if the internal default is 30 seconds, it will be `30`).
* `FUNCTIONS_AS_TOOLS_FAILURE_THRESHOLD`
* **Description**: Defines the number of consecutive failures for a specific Pulsar Function tool before it is temporarily moved to a "circuit breaker open" state. In this state, further calls to this specific function tool will be immediately rejected without attempting to execute the function, until the `FUNCTIONS_AS_TOOLS_RESET_TIMEOUT` is reached.
* **Unit**: Integer (number of failures)
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()` (e.g., if the internal default is 5, it will be `5`).
* `FUNCTIONS_AS_TOOLS_RESET_TIMEOUT`
* **Description**: Specifies the duration for which a Pulsar Function tool remains in the "circuit breaker open" state (due to exceeding the failure threshold) before the MCP server attempts to reset the circuit and allow calls again.
* **Unit**: Seconds
* **Default**: Defaults to the value specified in `pftools.DefaultManagerOptions()` (e.g., if the internal default is 60 seconds, it will be `60`).
* `FUNCTIONS_AS_TOOLS_TENANT_NAMESPACES`
* **Description**: A comma-separated list of Pulsar `tenant/namespace` strings that the MCP Server should scan for Pulsar Functions. This allows you to restrict function discovery to specific namespaces. If not set, the server might attempt to discover functions from all namespaces it has access to, as permitted by its Pulsar client configuration.
* **Format**: `tenant1/namespace1,tenant2/namespace2`
* **Example**: `public/default,my-tenant/app-functions`
* **Default**: Empty (meaning discover from all accessible namespaces (only on StreamNative Cloud)).
* `FUNCTIONS_AS_TOOLS_STRICT_EXPORT`
* **Description**: Only export functions with `MCP_TOOL_NAME` and `MCP_TOOL_DESCRIPTION` defined.
* **Format**: `true` or `false`
* **Example**: `false`
* **Default**: `true`

## Considerations and Limitations

* **Schema Definition**: For reliable schema conversion, ensure your Pulsar Functions have clearly defined input and output schemas using Pulsar's schema registry capabilities. Functions with ambiguous or `BYTES` schemas might not be converted effectively or might default to generic byte array inputs/outputs.
* **Function State**: This feature primarily focuses on the stateless request/response invocation pattern of functions.
* **Discovery Latency**: There might be a slight delay between deploying/updating a function and it appearing as an MCP tool, due to the server's polling interval for function discovery.
* **Error Handling**: The MCP Server will attempt to relay errors from function executions, but the specifics might vary.
* **Security**: Ensure that only intended functions are exposed by managing permissions within your Pulsar cluster. The MCP Server will operate with the permissions of its Pulsar client.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ require (
github.com/99designs/keyring v1.2.2
github.com/apache/pulsar-client-go v0.13.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/google/go-cmp v0.7.0
github.com/hamba/avro/v2 v2.28.0
github.com/mark3labs/mcp-go v0.27.0
github.com/mark3labs/mcp-go v0.28.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
Expand All @@ -16,6 +17,7 @@ require (
github.com/streamnative/pulsarctl v0.4.3-0.20250312214758-e472faec284b
github.com/streamnative/streamnative-mcp-server/sdk/sdk-apiserver v0.0.0-20250506174209-b67ea08ddd82
github.com/streamnative/streamnative-mcp-server/sdk/sdk-kafkaconnect v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.10.0
github.com/twmb/franz-go v1.18.1
github.com/twmb/franz-go/pkg/kadm v1.16.0
github.com/twmb/franz-go/pkg/sr v1.3.0
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down Expand Up @@ -91,9 +90,8 @@ github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -131,8 +129,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mark3labs/mcp-go v0.27.0 h1:iok9kU4DUIU2/XVLgFS2Q9biIDqstC0jY4EQTK2Erzc=
github.com/mark3labs/mcp-go v0.27.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mark3labs/mcp-go v0.28.0 h1:7yl4y5D1KYU2f/9Uxp7xfLIggfunHoESCRbrjcytcLM=
github.com/mark3labs/mcp-go v0.28.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/mark3labs/mcp-go v0.23.1 h1:RzTzZ5kJ+HxwnutKA4rll8N/pKV6Wh5dhCmiJUu5S9I=
github.com/mark3labs/mcp-go v0.23.1/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mark3labs/mcp-go v0.28.0 h1:7yl4y5D1KYU2f/9Uxp7xfLIggfunHoESCRbrjcytcLM=
github.com/mark3labs/mcp-go v0.28.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
Expand Down
20 changes: 13 additions & 7 deletions pkg/cmd/mcp/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/mark3labs/mcp-go/server"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/streamnative/streamnative-mcp-server/pkg/common"
"github.com/streamnative/streamnative-mcp-server/pkg/mcp"
)

Expand Down Expand Up @@ -68,24 +69,29 @@ func runSseServer(configOpts *ServerOptions) error {
}

// 3. Create a new MCP server
ctx = context.WithValue(ctx, mcp.OptionsKey, configOpts.Options)
mcpServer := server.NewSSEServer(
newMcpServer(configOpts, logger),
ctx = context.WithValue(ctx, common.OptionsKey, configOpts.Options)
mcpServer := newMcpServer(configOpts, logger)

// add Pulsar Functions as MCP tools
mcp.PulsarFunctionManagedMcpTools(mcpServer, false, configOpts.Features)

sseServer := server.NewSSEServer(
mcpServer,
server.WithStaticBasePath(configOpts.HTTPPath),
server.WithHTTPContextFunc(func(ctx context.Context, _ *http.Request) context.Context {
return context.WithValue(ctx, mcp.OptionsKey, configOpts.Options)
return context.WithValue(ctx, common.OptionsKey, configOpts.Options)
}),
)

// 4. Expose the full SSE URL to the user
ssePath := mcpServer.CompleteSsePath()
ssePath := sseServer.CompleteSsePath()
fmt.Fprintf(os.Stderr, "StreamNative Cloud MCP Server listening on http://%s%s\n",
configOpts.HTTPAddr, ssePath)

// 5. Run the HTTP listener in a goroutine
errCh := make(chan error, 1)
go func() {
if err := mcpServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
if err := sseServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err // bubble up real crashes
}
}()
Expand All @@ -108,7 +114,7 @@ func runSseServer(configOpts *ServerOptions) error {
defer cancel()

// First try to shut down the SSE server
if err := mcpServer.Shutdown(shCtx); err != nil {
if err := sseServer.Shutdown(shCtx); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
logger.Errorf("Error shutting down SSE server: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/mcp/stdio.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"github.com/mark3labs/mcp-go/server"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/streamnative/streamnative-mcp-server/pkg/common"
"github.com/streamnative/streamnative-mcp-server/pkg/log"
"github.com/streamnative/streamnative-mcp-server/pkg/mcp"
)

func NewCmdMcpStdioServer(configOpts *ServerOptions) *cobra.Command {
Expand Down Expand Up @@ -63,7 +63,7 @@ func runStdioServer(configOpts *ServerOptions) error {
}

// Create a new MCP server
ctx = context.WithValue(ctx, mcp.OptionsKey, configOpts.Options)
ctx = context.WithValue(ctx, common.OptionsKey, configOpts.Options)
stdLogger := stdlog.New(logger.Writer(), "snmcp-server", 0)
stdioServer := server.NewStdioServer(newMcpServer(configOpts, logger))

Expand Down
Loading