diff --git a/go.mod b/go.mod index 9849693..e9b9c7b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/99designs/keyring v1.2.2 github.com/apache/pulsar-client-go v0.13.1 github.com/dgrijalva/jwt-go v3.2.0+incompatible - github.com/mark3labs/mcp-go v0.25.0 + github.com/mark3labs/mcp-go v0.26.0 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 @@ -75,11 +75,10 @@ require ( github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/crypto v0.36.0 // indirect + golang.org/x/crypto v0.32.0 // indirect golang.org/x/mod v0.20.0 // indirect - golang.org/x/net v0.38.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/term v0.30.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/term v0.28.0 // indirect google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index b578598..77d7cd0 100644 --- a/go.sum +++ b/go.sum @@ -129,8 +129,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/mark3labs/mcp-go v0.25.0 h1:UUpcMT3L5hIhuDy7aifj4Bphw4Pfx1Rf8mzMXDe8RQw= -github.com/mark3labs/mcp-go v0.25.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4= +github.com/mark3labs/mcp-go v0.26.0 h1:xz/Kv1cHLYovF8txv6btBM39/88q3YOjnxqhi51jB0w= +github.com/mark3labs/mcp-go v0.26.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= @@ -251,16 +251,16 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= -golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70= golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -272,16 +272,16 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= -golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/cmd/mcp/stdio.go b/pkg/cmd/mcp/stdio.go index b412b2c..89c2dc8 100644 --- a/pkg/cmd/mcp/stdio.go +++ b/pkg/cmd/mcp/stdio.go @@ -144,6 +144,7 @@ func newStdioServer(configOpts *ServerOptions, logrusLogger *logrus.Logger) *ser mcp.RegisterPrompts(s) mcp.RegisterContextTools(s, configOpts.Features) mcp.StreamNativeAddLogTools(s, configOpts.ReadOnly, configOpts.Features) + mcp.StreamNativeAddResourceTools(s, configOpts.ReadOnly, configOpts.Features) } case snConfig.ExternalKafka != nil: { diff --git a/pkg/mcp/context_tools.go b/pkg/mcp/context_tools.go index d772647..bf459fa 100644 --- a/pkg/mcp/context_tools.go +++ b/pkg/mcp/context_tools.go @@ -34,15 +34,15 @@ func RegisterContextTools(s *server.MCPServer, features []string) { return } // Add whoami tool - whoamiTool := mcp.NewTool("streamnative_cloud_context_whoami", + whoamiTool := mcp.NewTool("sncloud_context_whoami", mcp.WithDescription("Display the currently logged-in service account. "+ "Returns the name of the authenticated service account and the organization."), ) s.AddTool(whoamiTool, handleWhoami) // Add set-context tool - setContextTool := mcp.NewTool("streamnative_cloud_context_use_cluster", - mcp.WithDescription("Set the current context to a specific StreamNative Cloud cluster, once you set the context, you can use pulsar and kafka tools to interact with the cluster. If you encounter ContextNotSetErr, please use `streamnative_cloud_context_available_clusters` to list the available clusters and set the context to a specific cluster."), + setContextTool := mcp.NewTool("sncloud_context_use_cluster", + mcp.WithDescription("Set the current context to a specific StreamNative Cloud cluster, once you set the context, you can use pulsar and kafka tools to interact with the cluster. If you encounter ContextNotSetErr, please use `sncloud_context_available_clusters` to list the available clusters and set the context to a specific cluster."), mcp.WithString("instanceName", mcp.Required(), mcp.Description("The name of the pulsar instance to use"), ), @@ -53,8 +53,8 @@ func RegisterContextTools(s *server.MCPServer, features []string) { s.AddTool(setContextTool, handleSetContext) // Add available-contexts tool - availableContextsTool := mcp.NewTool("streamnative_cloud_available_contexts", - mcp.WithDescription("Display the available pulsar clusters for the current organization on StreamNative Cloud. You can use `streamnative_cloud_context_use_cluster` to change the context to a specific cluster. You will need to ask for the USER to confirm the target context cluster if there are multiple clusters."), + availableContextsTool := mcp.NewTool("sncloud_context_available_clusters", + mcp.WithDescription("Display the available pulsar clusters for the current organization on StreamNative Cloud. You can use `sncloud_context_use_cluster` to change the context to a specific cluster. You will need to ask for the USER to confirm the target context cluster if there are multiple clusters."), ) s.AddTool(availableContextsTool, handleAvailableContexts) } diff --git a/pkg/mcp/prompts.go b/pkg/mcp/prompts.go index e7b4438..1a12faf 100644 --- a/pkg/mcp/prompts.go +++ b/pkg/mcp/prompts.go @@ -21,21 +21,63 @@ import ( "context" "encoding/json" "fmt" + "slices" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" "github.com/streamnative/streamnative-mcp-server/pkg/config" sncloud "github.com/streamnative/streamnative-mcp-server/sdk/sdk-apiserver" + "k8s.io/utils/ptr" +) + +type ServerlessPoolMember struct { + Provider string + Namespace string + Pool string + Location string +} + +var ( + ServerlessPoolMemberList = []ServerlessPoolMember{ + { + Provider: "azure", + Namespace: "streamnative", + Pool: "shared-azure", + Location: "eastus", + }, + { + Provider: "aws", + Namespace: "streamnative", + Pool: "shared-aws", + Location: "us-east-2", + }, + // { + // Provider: "gcloud", + // Namespace: "streamnative", + // Pool: "shared-gcp", + // Location: "us-central1", + // }, + } + AvailableProviders = []string{"azure", "aws", "gcloud"} ) func RegisterPrompts(s *server.MCPServer) { - s.AddPrompt(mcp.NewPrompt("list-streamnative-cloud-pulsar-clusters", - mcp.WithPromptDescription("List all Pulsar clusters in the StreamNative Cloud"), + s.AddPrompt(mcp.NewPrompt("list-sncloud-clusters", + mcp.WithPromptDescription("List all clusters from the StreamNative Cloud"), ), handleListPulsarClusters) - s.AddPrompt(mcp.NewPrompt("read-streamnative-cloud-pulsar-cluster", - mcp.WithPromptDescription("Read a Pulsar cluster in the StreamNative Cloud"), - mcp.WithArgument("name", mcp.RequiredArgument(), mcp.ArgumentDescription("The name of the Pulsar cluster")), + s.AddPrompt(mcp.NewPrompt("read-sncloud-cluster", + mcp.WithPromptDescription("Read a cluster from the StreamNative Cloud"), + mcp.WithArgument("name", mcp.RequiredArgument(), mcp.ArgumentDescription("The name of the cluster")), ), handleReadPulsarCluster) + s.AddPrompt( + mcp.NewPrompt("build-sncloud-serverless-cluster", + mcp.WithPromptDescription("Build a Serverless cluster in the StreamNative Cloud"), + mcp.WithArgument("instance-name", mcp.RequiredArgument(), mcp.ArgumentDescription("The name of the Pulsar instance, cannot reuse the name of existing instance.")), + mcp.WithArgument("cluster-name", mcp.RequiredArgument(), mcp.ArgumentDescription("The name of the Pulsar cluster, cannot reuse the name of existing cluster.")), + mcp.WithArgument("provider", mcp.ArgumentDescription("The cloud provider, could be `aws`, `gcp`, `azure`. If the selected provider do not serve serverless cluster, the prompt will return an error. If not specified, the system will use a random provider depending on the availability.")), + ), + handleBuildServerlessPulsarCluster, + ) } func handleListPulsarClusters(ctx context.Context, _ mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { @@ -99,7 +141,7 @@ func handleListPulsarClusters(ctx context.Context, _ mcp.GetPromptRequest) (*mcp } return &mcp.GetPromptResult{ - Description: fmt.Sprintf("Pulsar clusters from StreamNative Cloud organization %s, you can use `streamnative_cloud_context_use_cluster` tool to switch to selected cluster, and use pulsar and kafka tools to interact with the cluster.", options.Organization), + Description: fmt.Sprintf("Pulsar clusters from StreamNative Cloud organization %s, you can use `sncloud_context_use_cluster` tool to switch to selected cluster, and use pulsar and kafka tools to interact with the cluster.", options.Organization), Messages: messages, }, nil } @@ -156,7 +198,152 @@ func handleReadPulsarCluster(ctx context.Context, request mcp.GetPromptRequest) } return &mcp.GetPromptResult{ - Description: fmt.Sprintf("Detailed information of Pulsar cluster %s, you can use `streamnative_cloud_context_use_cluster` tool to switch to this cluster, and use pulsar and kafka tools to interact with the cluster.", name), + Description: fmt.Sprintf("Detailed information of Pulsar cluster %s, you can use `sncloud_context_use_cluster` tool to switch to this cluster, and use pulsar and kafka tools to interact with the cluster.", name), + Messages: messages, + }, nil +} + +func handleBuildServerlessPulsarCluster(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { + options := getOptions(ctx) + apiClient, err := config.GetAPIClient() + if err != nil { + return nil, fmt.Errorf("failed to get API client: %v", err) + } + arguments := convertToMapInterface(request.Params.Arguments) + + instanceName, err := requiredParam[string](arguments, "instance-name") + if err != nil { + return nil, fmt.Errorf("failed to get instance name: %v", err) + } + + clusterName, err := requiredParam[string](arguments, "cluster-name") + if err != nil { + return nil, fmt.Errorf("failed to get cluster name: %v", err) + } + + provider, hasProvider := optionalParam[string](arguments, "provider") + if !hasProvider { + provider = "" + } + if provider != "" { + if !slices.Contains(AvailableProviders, provider) { + return nil, fmt.Errorf("invalid provider: %s, available providers: %v", provider, AvailableProviders) + } + } + + poolOptions, poolOptionsBody, err := apiClient.CloudStreamnativeIoV1alpha1Api.ListCloudStreamnativeIoV1alpha1NamespacedPoolOption(ctx, options.Organization).Execute() + if err != nil { + return nil, fmt.Errorf("failed to list pool options: %v", err) + } + defer poolOptionsBody.Body.Close() + if poolOptions == nil { + return nil, fmt.Errorf("no pool options found") + } + + var poolRef *sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PoolRef + var selectedLocation *string + + for _, poolOpt := range poolOptions.Items { + if pr, ok := poolOpt.Spec.GetPoolRefOk(); ok { + for _, poolMember := range ServerlessPoolMemberList { + if provider != "" && poolOpt.Spec.CloudType != provider { + continue + } + if pr.Name == poolMember.Pool && pr.Namespace == poolMember.Namespace { + for _, location := range poolOpt.Spec.Locations { + if location.Location == poolMember.Location { + poolRef = pr + selectedLocation = &location.Location + break + } + } + } + } + } + } + + if poolRef == nil || selectedLocation == nil { + return nil, fmt.Errorf("no available pool") + } + + inst := sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarInstance{} + clus := sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarCluster{} + + inst.ApiVersion = ptr.To("cloud.streamnative.io/v1alpha1") + inst.Kind = ptr.To("PulsarInstance") + inst.Metadata = &sncloud.V1ObjectMeta{ + Name: &instanceName, + Namespace: &options.Organization, + Labels: &map[string]string{ + "managed-by": "streamnative-mcp", + }, + } + + inst.Spec = &sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarInstanceSpec{ + AvailabilityMode: "zonal", + PoolRef: poolRef, + Type: ptr.To("serverless"), + } + + clus.ApiVersion = ptr.To("cloud.streamnative.io/v1alpha1") + clus.Kind = ptr.To("PulsarCluster") + clus.Metadata = &sncloud.V1ObjectMeta{ + Name: ptr.To(""), + Namespace: &options.Organization, + Labels: &map[string]string{ + "managed-by": "streamnative-mcp", + }, + } + + clus.Spec = &sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarClusterSpec{ + Broker: sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1Broker{ + Replicas: 2, + Resources: &sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1DefaultNodeResource{ + Cpu: "1000m", + Memory: "4294967296", + }, + }, + DisplayName: ptr.To(clusterName), + InstanceName: instanceName, + Location: *selectedLocation, + ReleaseChannel: ptr.To("rapid"), + } + + instJSON, err := json.Marshal(inst) + if err != nil { + return nil, fmt.Errorf("failed to marshal instance: %v", err) + } + clusJSON, err := json.Marshal(clus) + if err != nil { + return nil, fmt.Errorf("failed to marshal cluster: %v", err) + } + + messages := []mcp.PromptMessage{ + { + Content: mcp.TextContent{ + Type: "text", + Text: "The following is the Pulsar instance JSON definition and the Pulsar cluster JSON definition, you can use the `sncloud_resources_apply` tool to apply the resources to the StreamNative Cloud. Please directly use the JSON content and not modify the content. The PulsarCluster name is required to be empty. You will need to apply PulsarInstance first, then apply PulsarCluster.", + }, + Role: mcp.RoleUser, + }, + { + Content: mcp.TextContent{ + Type: "text", + Text: string(instJSON), + }, + Role: mcp.RoleUser, + }, + { + Content: mcp.TextContent{ + Type: "text", + Text: string(clusJSON), + }, + Role: mcp.RoleUser, + }, + } + + return &mcp.GetPromptResult{ + Description: fmt.Sprintf("Create a new Serverless Pulsar cluster %s's related resources that can be applied to the StreamNative Cloud.", clusterName), Messages: messages, }, nil } diff --git a/pkg/mcp/streamnative_resources_log_tools.go b/pkg/mcp/streamnative_resources_log_tools.go index dc70311..3146681 100644 --- a/pkg/mcp/streamnative_resources_log_tools.go +++ b/pkg/mcp/streamnative_resources_log_tools.go @@ -42,9 +42,9 @@ func StreamNativeAddLogTools(s *server.MCPServer, _ bool, features []string) { return } - logTool := mcp.NewTool("streamnative_resources_log", + logTool := mcp.NewTool("sncloud_logs", mcp.WithDescription("Display the logs of resources in StreamNative Cloud, including pulsar functions, pulsar source connectors, pulsar sink connectors, and kafka connect connectors logs running along with PulsarInstance and PulsarCluster."+ - "This tool is used to help you debug the issues of resources in StreamNative Cloud. You can use `streamnative_cloud_context_use_cluster` to change the context to a specific cluster first, then use this tool to get the logs of resources in the cluster. This tool is suggested to be used with 'pulsar_admin_functions', 'pulsar_admin_sinks', 'pulsar_admin_sources', and 'kafka_admin_connect'"), + "This tool is used to help you debug the issues of resources in StreamNative Cloud. You can use `sncloud_context_use_cluster` to change the context to a specific cluster first, then use this tool to get the logs of resources in the cluster. This tool is suggested to be used with 'pulsar_admin_functions', 'pulsar_admin_sinks', 'pulsar_admin_sources', and 'kafka_admin_connect'"), mcp.WithString("component", mcp.Required(), mcp.Description("The component to get logs from, including "+strings.Join(FunctionConnectorList, ", ")), mcp.Enum(FunctionConnectorList...), @@ -115,7 +115,7 @@ func handleStreamNativeResourcesLog(ctx context.Context, request mcp.CallToolReq snConfig := getOptions(ctx) instance, cluster, organization := GetMcpContext() if instance == "" || cluster == "" || organization == "" { - return mcp.NewToolResultError("No context is set, please use `streamnative_cloud_context_use_cluster` to set the context first."), nil + return mcp.NewToolResultError("No context is set, please use `sncloud_context_use_cluster` to set the context first."), nil } // Extract required parameters with validation diff --git a/pkg/mcp/streamnative_resources_tools.go b/pkg/mcp/streamnative_resources_tools.go new file mode 100644 index 0000000..36ed6a3 --- /dev/null +++ b/pkg/mcp/streamnative_resources_tools.go @@ -0,0 +1,361 @@ +package mcp + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "slices" + "strings" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" + "github.com/streamnative/streamnative-mcp-server/pkg/config" + sncloud "github.com/streamnative/streamnative-mcp-server/sdk/sdk-apiserver" +) + +// StreamNativeAddResourceTools adds StreamNative resources tools +func StreamNativeAddResourceTools(s *server.MCPServer, readOnly bool, features []string) { + if !slices.Contains(features, string(FeatureStreamNativeCloud)) && !slices.Contains(features, string(FeatureAll)) { + return + } + + if !readOnly { + // Add Apply tool + applyTool := mcp.NewTool("sncloud_resources_apply", + mcp.WithDescription("Apply StreamNative Cloud resources from JSON definitions. This tool allows you to apply (create or update) StreamNative Cloud resources such as PulsarInstances and PulsarClusters using JSON definitions. Please give feedback to USER if the resource is applied with error, and ask USER to check the resource definition."), + mcp.WithString("json_content", mcp.Required(), + mcp.Description("The JSON content to apply."), + ), + mcp.WithBoolean("dry_run", + mcp.Description("If true, only validate the resource without applying it to the server."), + mcp.DefaultBool(false), + ), + mcp.WithToolAnnotation(mcp.ToolAnnotation{ + Title: "Apply StreamNative Cloud Resources", + }), + ) + // Add delete tool + deleteTool := mcp.NewTool("sncloud_resources_delete", + mcp.WithDescription("Delete StreamNative Cloud resources. This tool allows you to delete StreamNative Cloud resources such as PulsarInstances and PulsarClusters."), + mcp.WithString("name", mcp.Required(), + mcp.Description("The name of the resource to delete."), + ), + mcp.WithString("type", mcp.Required(), + mcp.Description("The type of the resource to delete, it can be PulsarInstance or PulsarCluster."), + mcp.Enum("PulsarInstance", "PulsarCluster"), + ), + mcp.WithToolAnnotation(mcp.ToolAnnotation{ + Title: "Delete StreamNative Cloud Resources", + DestructiveHint: true, + }), + ) + s.AddTool(applyTool, handleStreamNativeResourcesApply) + s.AddTool(deleteTool, handleStreamNativeResourcesDelete) + } +} + +// Define simple resource structure for parsing YAML documents +type Resource struct { + APIVersion string `json:"apiVersion" yaml:"apiVersion"` + Kind string `json:"kind" yaml:"kind"` + Metadata Metadata `json:"metadata" yaml:"metadata"` + Spec map[string]interface{} `json:"spec" yaml:"spec"` +} + +type Metadata struct { + Name string `json:"name" yaml:"name"` + Namespace string `json:"namespace" yaml:"namespace"` + Labels map[string]string `json:"labels" yaml:"labels"` +} + +// handleStreamNativeResourcesApply handles the streaming_cloud_resources_apply tool +func handleStreamNativeResourcesApply(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + // Get necessary parameters + snConfig := getOptions(ctx) + organization := snConfig.Organization + if organization == "" { + return mcp.NewToolResultError("No organization is set. Please set the organization using the appropriate context tool."), nil + } + + // Get YAML content + jsonContent, err := requiredParam[string](request.Params.Arguments, "json_content") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get json_content: %v", err)), nil + } + + // Get dry_run flag + dryRun, hasDryRun := optionalParam[bool](request.Params.Arguments, "dry_run") + if !hasDryRun { + dryRun = false + } + + // Get API client + apiClient, err := config.GetAPIClient() + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get API client: %v", err)), nil + } + + jsonContent = strings.TrimSpace(jsonContent) + if jsonContent == "" { + return mcp.NewToolResultError("No valid resources found in the provided JSON."), nil + } + + // Parse YAML document + var resource Resource + err = json.Unmarshal([]byte(jsonContent), &resource) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to parse JSON document: %v", err)), nil + } + + // Check if resource is valid + if resource.APIVersion == "" || resource.Kind == "" { + return mcp.NewToolResultError("Invalid resource definition."), nil + } + + // Set namespace if not specified + if resource.Metadata.Namespace == "" { + resource.Metadata.Namespace = organization + } + + // Apply resource + result, err := applyResource(ctx, apiClient, resource, jsonContent, organization, dryRun) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to apply resource: %v", err)), nil + } + + return mcp.NewToolResultText(result), nil +} + +// applyResource applies the resource based on its type +func applyResource(ctx context.Context, apiClient *sncloud.APIClient, resource Resource, jsonContent string, organization string, dryRun bool) (string, error) { + apiVersion := resource.APIVersion + kind := resource.Kind + + // Call different APIs based on resource type + switch { + case apiVersion == "cloud.streamnative.io/v1alpha1" && kind == "PulsarInstance": + return applyPulsarInstance(ctx, apiClient, jsonContent, organization, dryRun) + case apiVersion == "cloud.streamnative.io/v1alpha1" && kind == "PulsarCluster": + return applyPulsarCluster(ctx, apiClient, jsonContent, organization, dryRun) + // Can add handling for more resource types + default: + return "", fmt.Errorf("unsupported resource type: %s/%s", apiVersion, kind) + } +} + +// applyPulsarInstance applies PulsarInstance resource +func applyPulsarInstance(ctx context.Context, apiClient *sncloud.APIClient, jsonContent string, organization string, dryRun bool) (string, error) { + var instance sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarInstance + if err := json.Unmarshal([]byte(jsonContent), &instance); err != nil { + return "", fmt.Errorf("failed to unmarshal JSON to PulsarInstance: %v", err) + } + + // Ensure namespace is set correctly + if instance.Metadata == nil { + instance.Metadata = &sncloud.V1ObjectMeta{} + } + if instance.Metadata.Namespace == nil || *instance.Metadata.Namespace == "" { + ns := organization + instance.Metadata.Namespace = &ns + } + + name := "" + if instance.Metadata.Name != nil { + name = *instance.Metadata.Name + } + + // Check if resource already exists + exists := false + var existingResourceVersion *string + + if name != "" { + // Try to get existing resource + existingInstance, bdy, err := apiClient.CloudStreamnativeIoV1alpha1Api.ReadCloudStreamnativeIoV1alpha1NamespacedPulsarInstance(ctx, name, organization).Execute() + defer bdy.Body.Close() + if err == nil { + exists = true + if existingInstance.Metadata != nil && existingInstance.Metadata.ResourceVersion != nil { + existingResourceVersion = existingInstance.Metadata.ResourceVersion + } + } + } + + var verb string + + // Convert dryRun bool to string parameter required by API + dryRunStr := "All" + + // Create or update based on whether resource exists + var bdy *http.Response + var err error + if exists { + verb = "updated" + // Make sure resourceVersion is set to support updates + if existingResourceVersion != nil { + if instance.Metadata.ResourceVersion == nil { + instance.Metadata.ResourceVersion = existingResourceVersion + } + } + + // Use Replace method to update resource + request := apiClient.CloudStreamnativeIoV1alpha1Api.ReplaceCloudStreamnativeIoV1alpha1NamespacedPulsarInstance( + ctx, name, organization).Body(instance) + if dryRun { + request = request.DryRun(dryRunStr) + } + _, bdy, err = request.Execute() + defer bdy.Body.Close() + } else { + verb = "created" + // Create new resource + request := apiClient.CloudStreamnativeIoV1alpha1Api.CreateCloudStreamnativeIoV1alpha1NamespacedPulsarInstance( + ctx, organization).Body(instance) + if dryRun { + request = request.DryRun(dryRunStr) + } + _, bdy, err = request.Execute() + defer bdy.Body.Close() + } + + if err != nil { + body, innerErr := io.ReadAll(bdy.Body) + if innerErr != nil { + return "", fmt.Errorf("failed to read body: %v", innerErr) + } + return "", fmt.Errorf("failed to %s PulsarInstance: %v (%s)", verb, err, string(body)) + } + + if dryRun { + return fmt.Sprintf("PulsarInstance %q would be %s (dry run)", name, verb), nil + } + return fmt.Sprintf("PulsarInstance %q %s", name, verb), nil +} + +// applyPulsarCluster applies PulsarCluster resource +func applyPulsarCluster(ctx context.Context, apiClient *sncloud.APIClient, jsonContent string, organization string, dryRun bool) (string, error) { + var cluster sncloud.ComGithubStreamnativeCloudApiServerPkgApisCloudV1alpha1PulsarCluster + if err := json.Unmarshal([]byte(jsonContent), &cluster); err != nil { + return "", fmt.Errorf("failed to unmarshal JSON to PulsarCluster: %v", err) + } + + // Ensure namespace is set correctly + if cluster.Metadata == nil { + cluster.Metadata = &sncloud.V1ObjectMeta{} + } + if cluster.Metadata.Namespace == nil || *cluster.Metadata.Namespace == "" { + ns := organization + cluster.Metadata.Namespace = &ns + } + + name := "" + if cluster.Metadata.Name != nil { + name = *cluster.Metadata.Name + } + // Check if resource already exists + exists := false + var existingResourceVersion *string + + if name != "" { + // Try to get existing resource + existingCluster, bdy, err := apiClient.CloudStreamnativeIoV1alpha1Api.ReadCloudStreamnativeIoV1alpha1NamespacedPulsarCluster(ctx, name, organization).Execute() + defer bdy.Body.Close() + if err == nil { + exists = true + if existingCluster.Metadata != nil && existingCluster.Metadata.ResourceVersion != nil { + existingResourceVersion = existingCluster.Metadata.ResourceVersion + } + } + } + + var verb string + + // Convert dryRun bool to string parameter required by API + dryRunStr := "All" + + // Create or update based on whether resource exists + var bdy *http.Response + var err error + if exists { + verb = "updated" + // Make sure resourceVersion is set to support updates + if existingResourceVersion != nil { + if cluster.Metadata.ResourceVersion == nil { + cluster.Metadata.ResourceVersion = existingResourceVersion + } + } + + // Use Replace method to update resource + request := apiClient.CloudStreamnativeIoV1alpha1Api.ReplaceCloudStreamnativeIoV1alpha1NamespacedPulsarCluster( + ctx, name, organization).Body(cluster) + if dryRun { + request = request.DryRun(dryRunStr) + } + + _, bdy, err = request.Execute() + defer bdy.Body.Close() + } else { + verb = "created" + // Create new resource + request := apiClient.CloudStreamnativeIoV1alpha1Api.CreateCloudStreamnativeIoV1alpha1NamespacedPulsarCluster( + ctx, organization).Body(cluster) + if dryRun { + request = request.DryRun(dryRunStr) + } + _, bdy, err = request.Execute() + defer bdy.Body.Close() + } + + if err != nil { + body, innerErr := io.ReadAll(bdy.Body) + if innerErr != nil { + return "", fmt.Errorf("failed to read body: %v", innerErr) + } + return "", fmt.Errorf("failed to %s PulsarCluster: %v (%s)", verb, err, string(body)) + } + + if dryRun { + return fmt.Sprintf("PulsarCluster %q would be %s (dry run)", name, verb), nil + } + return fmt.Sprintf("PulsarCluster %q %s", name, verb), nil +} + +// handleStreamNativeResourcesDelete handles the streaming_cloud_resources_delete tool +func handleStreamNativeResourcesDelete(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + snConfig := getOptions(ctx) + organization := snConfig.Organization + + name, err := requiredParam[string](request.Params.Arguments, "name") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get name: %v", err)), nil + } + + resourceType, err := requiredParam[string](request.Params.Arguments, "type") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get type: %v", err)), nil + } + + apiClient, err := config.GetAPIClient() + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get API client: %v", err)), nil + } + + switch resourceType { + case "PulsarInstance": + //nolint:bodyclose + _, _, err = apiClient.CloudStreamnativeIoV1alpha1Api.DeleteCloudStreamnativeIoV1alpha1NamespacedPulsarInstance(ctx, name, organization).Execute() + case "PulsarCluster": + //nolint:bodyclose + _, _, err = apiClient.CloudStreamnativeIoV1alpha1Api.DeleteCloudStreamnativeIoV1alpha1NamespacedPulsarCluster(ctx, name, organization).Execute() + default: + return mcp.NewToolResultError(fmt.Sprintf("Unsupported resource type: %s", resourceType)), nil + } + + // the delete operation will return a V1Status object, which is not handled by the SDK + if err != nil && !strings.Contains(err.Error(), "json: cannot unmarshal") { + return mcp.NewToolResultError(fmt.Sprintf("failed to delete resource: %v", err)), nil + } + + return mcp.NewToolResultText(fmt.Sprintf("Resource %q %s deleted", name, resourceType)), nil +}