Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ If you want to access to your StreamNative Cloud, you will need to have followin

### Start the MCP Server

#### Using stdio Server

```bash
# Start MCP server with StreamNative Cloud authentication
bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json
Expand All @@ -66,6 +68,19 @@ snmcp stdio --use-external-pulsar --pulsar-web-service-url http://pulsar.example
bin/snmcp stdio --use-external-pulsar --pulsar-web-service-url http://pulsar.example.com:8080 --pulsar-token "xxx"
```

#### Using SSE (Server-Sent Events) Server

```bash
# Start MCP server with SSE and StreamNative Cloud authentication
snmcp sse --http-addr :9090 --http-path /mcp --organization my-org --key-file /path/to/key-file.json

# Start MCP server with SSE and external Kafka
snmcp sse --http-addr :9090 --http-path /mcp --use-external-kafka --kafka-bootstrap-servers localhost:9092

# Start MCP server with SSE and external Pulsar
snmcp sse --http-addr :9090 --http-path /mcp --use-external-pulsar --pulsar-web-service-url http://pulsar.example.com:8080
```

### Command-line Options

```
Expand Down Expand Up @@ -115,6 +130,8 @@ Flags:
--server string The server to connect to (default "https://api.streamnative.cloud")
--use-external-kafka Use external Kafka
--use-external-pulsar Use external Pulsar
--http-addr string HTTP server address (default ":9090")
--http-path string HTTP server path for SSE endpoint (default "/mcp")
-v, --version version for snmcp
```

Expand Down Expand Up @@ -168,6 +185,32 @@ To enable only specific feature sets:
bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json --features pulsar-client
```

## Inspecting the MCP Server

You can use the [@modelcontextprotocol/inspector](https://www.npmjs.com/package/@modelcontextprotocol/inspector) tool to inspect and test your MCP server. This is particularly useful for debugging and verifying your server's configuration.

### Installation

```bash
npm install -g @modelcontextprotocol/inspector
```

### Usage

```bash
# Inspect a stdio server
mcp-inspector stdio --command "snmcp stdio --organization my-org --key-file /path/to/key-file.json"

# Inspect an SSE server
mcp-inspector sse --url "http://localhost:9090/mcp"
```

The inspector provides a web interface where you can:
- View available tools and their schemas
- Test tool invocations
- Monitor server responses
- Debug connection issues

## Integration with MCP Clients

This server can be used with any MCP-compatible client, such as:
Expand All @@ -182,10 +225,12 @@ Without it, you may encounter the error: `message will exceed the length limit f

### Usage with Claude Desktop

#### Using stdio Server

```json
{
"mcpServers": {
"snmcp": {
"mcp-streamnative": {
"command": "${PATH_TO_SNMCP}/bin/snmcp",
"args": [
"stdio",
Expand All @@ -201,6 +246,36 @@ Without it, you may encounter the error: `message will exceed the length limit f

Please remember to replace `${PATH_TO_SNMCP}` with the actual path to the `snmcp` binary and `${STREAMNATIVE_CLOUD_ORGANIZATION_ID}` and `${STREAMNATIVE_CLOUD_KEY_FILE}` with your StreamNative Cloud organization ID and key file path, respectively.

#### Using SSE Server

First, install the mcp-proxy tool:

```bash
pip install mcp-proxy
```

Then configure Claude Desktop to use the SSE server:

```json
{
"mcpServers": {
"mcp-streamnative-proxy": {
"command": "mcp-proxy",
"args": [
"http://localhost:9090/mcp/sse"
]
}
}
}
```

Note: If mcp-proxy is not in your system PATH, you'll need to provide the full path to the executable. For example:
- On macOS: `/Library/Frameworks/Python.framework/Versions/3.11/bin/mcp-proxy`
- On Linux: `/usr/local/bin/mcp-proxy`
- On Windows: `C:\Python311\Scripts\mcp-proxy.exe`

Please remember to replace `http://localhost:9090/mcp/sse` with the right URL.

## About Model Context Protocol (MCP)

The Model Context Protocol (MCP) is an open protocol that standardizes how applications provide context to LLMs. MCP helps build agents and complex workflows on top of LLMs by providing:
Expand Down
1 change: 1 addition & 0 deletions cmd/streamnative-mcp-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ to interact with StreamNative Cloud resources and services.`,
o.AddFlags(rootCmd)
// Add subcommands
rootCmd.AddCommand(mcp.NewCmdMcpStdioServer(o))
rootCmd.AddCommand(mcp.NewCmdMcpSseServer(o))

rootCmd.SetVersionTemplate("{{.Short}}\n{{.Version}}\n")

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/apache/pulsar-client-go v0.13.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/hamba/avro/v2 v2.28.0
github.com/mark3labs/mcp-go v0.26.0
github.com/mark3labs/mcp-go v0.27.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mark3labs/mcp-go v0.26.0 h1:xz/Kv1cHLYovF8txv6btBM39/88q3YOjnxqhi51jB0w=
github.com/mark3labs/mcp-go v0.26.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mark3labs/mcp-go v0.27.0 h1:iok9kU4DUIU2/XVLgFS2Q9biIDqstC0jY4EQTK2Erzc=
github.com/mark3labs/mcp-go v0.27.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/mcp/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type ServerOptions struct {
LogFile string
LogCommands bool
Features []string
HTTPAddr string
HTTPPath string
*config.Options
}

Expand Down Expand Up @@ -160,6 +162,8 @@ func (o *ServerOptions) AddFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&o.LogFile, "log-file", "", "Path to log file")
cmd.PersistentFlags().BoolVar(&o.LogCommands, "enable-command-logging", false, "When enabled, the server will log all command requests and responses to the log file")
cmd.PersistentFlags().StringSliceVar(&o.Features, "features", []string{}, "Features to enable, defaults to `all`")
cmd.PersistentFlags().StringVar(&o.HTTPAddr, "http-addr", "", "HTTP address")
cmd.PersistentFlags().StringVar(&o.HTTPPath, "http-path", "", "HTTP path")
}

func (o *ServerOptions) newClientCredentialsFlow(
Expand Down
107 changes: 107 additions & 0 deletions pkg/cmd/mcp/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package mcp

import (
stdlog "log"
"os"

"github.com/mark3labs/mcp-go/server"
"github.com/sirupsen/logrus"
"github.com/streamnative/streamnative-mcp-server/pkg/mcp"
)

func newMcpServer(configOpts *ServerOptions, logrusLogger *logrus.Logger) *server.MCPServer {
snConfig := configOpts.Options.LoadConfigOrDie()
var s *server.MCPServer
switch {
case snConfig.KeyFile != "":
{
issuer := snConfig.Auth.Issuer()
userName, err := configOpts.Options.WhoAmI(issuer.Audience)
if err != nil {
stdlog.Fatalf("failed to get user name: %v", err)
os.Exit(1)
}
// Create a new MCP server
s = server.NewMCPServer(
"streamnative-mcp-server",
"0.0.1",
server.WithResourceCapabilities(true, true),
server.WithInstructions(mcp.GetStreamNativeCloudServerInstructions(userName, snConfig)),
server.WithLogging())

mcp.RegisterPrompts(s)
mcp.RegisterContextTools(s, configOpts.Features)
mcp.StreamNativeAddLogTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.StreamNativeAddResourceTools(s, configOpts.ReadOnly, configOpts.Features)
}
case snConfig.ExternalKafka != nil:
{
s = server.NewMCPServer(
"streamnative-mcp-server/kafka",
"0.0.1",
server.WithResourceCapabilities(true, true),
server.WithInstructions(mcp.GetExternalKafkaServerInstructions(snConfig.ExternalKafka.BootstrapServers)),
server.WithLogging())
}
case snConfig.ExternalPulsar != nil:
{
s = server.NewMCPServer(
"streamnative-mcp-server/pulsar",
"0.0.1",
server.WithResourceCapabilities(true, true),
server.WithInstructions(mcp.GetExternalPulsarServerInstructions(snConfig.ExternalPulsar.WebServiceURL)),
server.WithLogging())
}
default:
{
stdlog.Fatalf("no valid configuration found")
os.Exit(1)
}
}

mcp.PulsarAdminAddBrokersTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddBrokerStatsTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddClusterTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddFunctionsWorkerTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddNamespaceTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddNamespacePolicyTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddNsIsolationPolicyTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddPackagesTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddResourceQuotasTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddSchemasTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddSubscriptionTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddTenantTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddTopicTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddSinksTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddFunctionsTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddSourcesTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddTopicPolicyTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarClientAddConsumerTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarClientAddProducerTools(s, configOpts.ReadOnly, configOpts.Features)

mcp.KafkaAdminAddTopicTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.KafkaAdminAddPartitionsTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.KafkaAdminAddGroupsTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.KafkaAdminAddSchemaRegistryTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.KafkaAdminAddKafkaConnectTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.KafkaClientAddConsumeTools(s, configOpts.ReadOnly, logrusLogger, configOpts.Features)
mcp.KafkaClientAddProduceTools(s, configOpts.ReadOnly, configOpts.Features)
return s
}
Loading