Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/google/go-cmp v0.7.0
github.com/hamba/avro/v2 v2.28.0
github.com/mark3labs/mcp-go v0.28.0
github.com/mark3labs/mcp-go v0.31.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mark3labs/mcp-go v0.28.0 h1:7yl4y5D1KYU2f/9Uxp7xfLIggfunHoESCRbrjcytcLM=
github.com/mark3labs/mcp-go v0.28.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mark3labs/mcp-go v0.31.0 h1:4UxSV8aM770OPmTvaVe/b1rA2oZAjBMhGBfUgOGut+4=
github.com/mark3labs/mcp-go v0.31.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
Expand Down
4 changes: 2 additions & 2 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/mark3labs/mcp-go v0.23.1 h1:RzTzZ5kJ+HxwnutKA4rll8N/pKV6Wh5dhCmiJUu5S9I=
github.com/mark3labs/mcp-go v0.23.1/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mark3labs/mcp-go v0.28.0 h1:7yl4y5D1KYU2f/9Uxp7xfLIggfunHoESCRbrjcytcLM=
github.com/mark3labs/mcp-go v0.28.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mark3labs/mcp-go v0.31.0 h1:4UxSV8aM770OPmTvaVe/b1rA2oZAjBMhGBfUgOGut+4=
github.com/mark3labs/mcp-go v0.31.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/mcp/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func runSseServer(configOpts *ServerOptions) error {
sseServer := server.NewSSEServer(
mcpServer,
server.WithStaticBasePath(configOpts.HTTPPath),
server.WithHTTPContextFunc(func(ctx context.Context, _ *http.Request) context.Context {
server.WithSSEContextFunc(func(ctx context.Context, _ *http.Request) context.Context {
return context.WithValue(ctx, common.OptionsKey, configOpts.Options)
}),
)
Expand Down
13 changes: 13 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,19 @@ func RequiredParamObject(arguments map[string]interface{}, name string) (map[str
return nil, fmt.Errorf("%s parameter must be an object", name)
}

func OptionalParamObject(arguments map[string]interface{}, name string) (map[string]interface{}, bool) {
paramValue, found := arguments[name]
if !found || paramValue == nil {
return nil, false
}

if mapVal, ok := paramValue.(map[string]interface{}); ok {
return mapVal, true
}

return nil, false
}

func GetOptions(ctx context.Context) *config.Options {
return ctx.Value(OptionsKey).(*config.Options)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcp/context_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func handleWhoami(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResu
func handleSetContext(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
options := ctx.Value(common.OptionsKey).(*config.Options)

instanceName, err := common.RequiredParam[string](request.Params.Arguments, "instanceName")
instanceName, err := request.RequireString("instanceName")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get instance name: %v", err)), nil
}

clusterName, err := common.RequiredParam[string](request.Params.Arguments, "clusterName")
clusterName, err := request.RequireString("clusterName")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get cluster name: %v", err)), nil
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/mcp/kafka_admin_connect_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ func KafkaAdminAddKafkaConnectTools(s *server.MCPServer, readOnly bool, features
func handleKafkaConnectTool(readOnly bool) func(context.Context, mcp.CallToolRequest) (*mcp.CallToolResult, error) {
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get required parameters
resource, err := common.RequiredParam[string](request.Params.Arguments, "resource")
resource, err := request.RequireString("resource")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get resource: %v", err)), nil
}

operation, err := common.RequiredParam[string](request.Params.Arguments, "operation")
operation, err := request.RequireString("operation")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get operation: %v", err)), nil
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func handleKafkaConnectorsList(ctx context.Context, admin kafka.Connect, _ mcp.C

func handleKafkaConnectorGet(ctx context.Context, admin kafka.Connect, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get a specific connector
name, err := common.RequiredParam[string](request.Params.Arguments, "name")
name, err := request.RequireString("name")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get connector name: %v", err)), nil
}
Expand All @@ -276,12 +276,12 @@ func handleKafkaConnectorGet(ctx context.Context, admin kafka.Connect, request m

func handleKafkaConnectorCreate(ctx context.Context, admin kafka.Connect, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Create a new connector
name, err := common.RequiredParam[string](request.Params.Arguments, "name")
name, err := request.RequireString("name")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get connector name: %v", err)), nil
}

configMap, err := common.RequiredParamObject(request.Params.Arguments, "config")
configMap, err := common.RequiredParamObject(request.GetArguments(), "config")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get config: %v", err)), nil
}
Expand All @@ -305,12 +305,12 @@ func handleKafkaConnectorCreate(ctx context.Context, admin kafka.Connect, reques

func handleKafkaConnectorUpdate(ctx context.Context, admin kafka.Connect, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Update a connector
name, err := common.RequiredParam[string](request.Params.Arguments, "name")
name, err := request.RequireString("name")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get connector name: %v", err)), nil
}

configMap, err := common.RequiredParamObject(request.Params.Arguments, "config")
configMap, err := common.RequiredParamObject(request.GetArguments(), "config")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get config: %v", err)), nil
}
Expand All @@ -334,7 +334,7 @@ func handleKafkaConnectorUpdate(ctx context.Context, admin kafka.Connect, reques

func handleKafkaConnectorDelete(ctx context.Context, admin kafka.Connect, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Delete a connector
name, err := common.RequiredParam[string](request.Params.Arguments, "name")
name, err := request.RequireString("name")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get connector name: %v", err)), nil
}
Expand All @@ -349,7 +349,7 @@ func handleKafkaConnectorDelete(ctx context.Context, admin kafka.Connect, reques

func handleKafkaConnectorRestart(ctx context.Context, admin kafka.Connect, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Restart a connector
name, err := common.RequiredParam[string](request.Params.Arguments, "name")
name, err := request.RequireString("name")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get connector name: %v", err)), nil
}
Expand All @@ -364,7 +364,7 @@ func handleKafkaConnectorRestart(ctx context.Context, admin kafka.Connect, reque

func handleKafkaConnectorPause(ctx context.Context, admin kafka.Connect, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Pause a connector
name, err := common.RequiredParam[string](request.Params.Arguments, "name")
name, err := request.RequireString("name")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get connector name: %v", err)), nil
}
Expand All @@ -379,7 +379,7 @@ func handleKafkaConnectorPause(ctx context.Context, admin kafka.Connect, request

func handleKafkaConnectorResume(ctx context.Context, admin kafka.Connect, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Resume a connector
name, err := common.RequiredParam[string](request.Params.Arguments, "name")
name, err := request.RequireString("name")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get connector name: %v", err)), nil
}
Expand Down
25 changes: 12 additions & 13 deletions pkg/mcp/kafka_admin_groups_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/streamnative/streamnative-mcp-server/pkg/common"
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
"github.com/twmb/franz-go/pkg/kadm"
)
Expand Down Expand Up @@ -125,12 +124,12 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
func handleKafkaGroupsTool(readOnly bool) func(context.Context, mcp.CallToolRequest) (*mcp.CallToolResult, error) {
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get required parameters
resource, err := common.RequiredParam[string](request.Params.Arguments, "resource")
resource, err := request.RequireString("resource")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get resource: %v", err)), nil
}

operation, err := common.RequiredParam[string](request.Params.Arguments, "operation")
operation, err := request.RequireString("operation")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get operation: %v", err)), nil
}
Expand Down Expand Up @@ -181,7 +180,7 @@ func handleKafkaGroupsTool(readOnly bool) func(context.Context, mcp.CallToolRequ
}

func handleKafkaGroupDescribe(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
groupName, err := common.RequiredParam[string](request.Params.Arguments, "group")
groupName, err := request.RequireString("group")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil
}
Expand Down Expand Up @@ -210,12 +209,12 @@ func handleKafkaGroupDescribe(ctx context.Context, admin *kadm.Client, request m
}

func handleKafkaGroupRemoveMembers(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
groupName, err := common.RequiredParam[string](request.Params.Arguments, "group")
groupName, err := request.RequireString("group")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil
}

members, err := common.RequiredParam[string](request.Params.Arguments, "members")
members, err := request.RequireString("members")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get members: %v", err)), nil
}
Expand Down Expand Up @@ -248,7 +247,7 @@ func handleKafkaGroupsList(ctx context.Context, admin *kadm.Client, _ mcp.CallTo
}

func handleKafkaGroupOffsets(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
groupName, err := common.RequiredParam[string](request.Params.Arguments, "group")
groupName, err := request.RequireString("group")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil
}
Expand All @@ -267,12 +266,12 @@ func handleKafkaGroupOffsets(ctx context.Context, admin *kadm.Client, request mc
}

func handleKafkaGroupDeleteOffset(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
groupName, err := common.RequiredParam[string](request.Params.Arguments, "group")
groupName, err := request.RequireString("group")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil
}

topic, err := common.RequiredParam[string](request.Params.Arguments, "topic")
topic, err := request.RequireString("topic")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil
}
Expand Down Expand Up @@ -303,23 +302,23 @@ func handleKafkaGroupDeleteOffset(ctx context.Context, admin *kadm.Client, reque

func handleKafkaGroupSetOffset(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get required parameters
groupName, err := common.RequiredParam[string](request.Params.Arguments, "group")
groupName, err := request.RequireString("group")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil
}

topic, err := common.RequiredParam[string](request.Params.Arguments, "topic")
topic, err := request.RequireString("topic")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil
}

partition, err := common.RequiredParam[float64](request.Params.Arguments, "partition")
partition, err := request.RequireFloat("partition")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get partition number: %v", err)), nil
}
partitionInt := int32(partition)

offset, err := common.RequiredParam[float64](request.Params.Arguments, "offset")
offset, err := request.RequireFloat("offset")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get offset value: %v", err)), nil
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/mcp/kafka_admin_partitions_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/streamnative/streamnative-mcp-server/pkg/common"
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
"github.com/twmb/franz-go/pkg/kadm"
)
Expand Down Expand Up @@ -88,12 +87,12 @@ func KafkaAdminAddPartitionsTools(s *server.MCPServer, readOnly bool, features [
func handleKafkaPartitionsTool(readOnly bool) func(context.Context, mcp.CallToolRequest) (*mcp.CallToolResult, error) {
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get required parameters
resource, err := common.RequiredParam[string](request.Params.Arguments, "resource")
resource, err := request.RequireString("resource")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get resource: %v", err)), nil
}

operation, err := common.RequiredParam[string](request.Params.Arguments, "operation")
operation, err := request.RequireString("operation")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get operation: %v", err)), nil
}
Expand Down Expand Up @@ -129,12 +128,12 @@ func handleKafkaPartitionsTool(readOnly bool) func(context.Context, mcp.CallTool
}

func handleKafkaPartitionUpdate(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
topicName, err := common.RequiredParam[string](request.Params.Arguments, "topic")
topicName, err := request.RequireString("topic")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil
}

newTotal, err := common.RequiredParam[int](request.Params.Arguments, "new-total")
newTotal, err := request.RequireInt("new-total")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get new total: %v", err)), nil
}
Expand Down
Loading