diff --git a/pkg/mcp/pulsar_admin_functions_tools.go b/pkg/mcp/pulsar_admin_functions_tools.go index 3af0a25..45f6bbd 100644 --- a/pkg/mcp/pulsar_admin_functions_tools.go +++ b/pkg/mcp/pulsar_admin_functions_tools.go @@ -142,7 +142,7 @@ func PulsarAdminAddFunctionsTools(s *server.MCPServer, readOnly bool, features [ "Used when triggering a function that consumes from multiple topics. "+ "If not provided, the first input topic will be used.")), mcp.WithString("triggerValue", - mcp.Description("The value with which to trigger the function. Optional for 'trigger' operation. "+ + mcp.Description("The value with which to trigger the function. Required for 'trigger' operation. "+ "This value will be passed to the function as if it were a message from the input topic. "+ "String values are sent as is; for typed values, ensure proper formatting based on function expectations. "+ "The function processes this value just like a normal message.")), @@ -411,11 +411,19 @@ func handleFunctionCreate(_ context.Context, client cmdutils.Client, tenant, nam functionConfig.UserConfig = userConfigMap } - // Create function - err = client.Functions().CreateFunc(functionConfig, fileName) - if err != nil { - return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.", - name, tenant, namespace, err)), nil + if IsPackageURLSupported(fileName) { + err = client.Functions().CreateFuncWithURL(functionConfig, fileName) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.", + name, tenant, namespace, err)), nil + } + } else { + // Create function + err = client.Functions().CreateFunc(functionConfig, fileName) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.", + name, tenant, namespace, err)), nil + } } return mcp.NewToolResultText(fmt.Sprintf("Created function '%s' successfully in tenant '%s' namespace '%s'. The function will start processing messages from input topics.", @@ -488,7 +496,12 @@ func handleFunctionUpdate(_ context.Context, client cmdutils.Client, tenant, nam } // Update function - err := client.Functions().UpdateFunction(functionConfig, fileName, updateOptions) + var err error + if IsPackageURLSupported(fileName) { + err = client.Functions().UpdateFunctionWithURL(functionConfig, fileName, updateOptions) + } else { + err = client.Functions().UpdateFunction(functionConfig, fileName, updateOptions) + } if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to update function '%s' in tenant '%s' namespace '%s': %v. Verify the function exists and all parameters are valid.", name, tenant, namespace, err)), nil diff --git a/pkg/mcp/pulsar_admin_packages_tools.go b/pkg/mcp/pulsar_admin_packages_tools.go index 31b043a..7c33f54 100644 --- a/pkg/mcp/pulsar_admin_packages_tools.go +++ b/pkg/mcp/pulsar_admin_packages_tools.go @@ -30,6 +30,27 @@ import ( "github.com/streamnative/streamnative-mcp-server/pkg/pulsar" ) +const ( + HTTP = "http" + FILE = "file" + BUILTIN = "builtin" + + FUNCTION = "function" + SINK = "sink" + SOURCE = "source" + + PublicTenant = "public" + DefaultNamespace = "default" +) + +func IsPackageURLSupported(functionPkgURL string) bool { + return functionPkgURL != "" && (strings.HasPrefix(functionPkgURL, HTTP) || + strings.HasPrefix(functionPkgURL, FILE) || + strings.HasPrefix(functionPkgURL, FUNCTION) || + strings.HasPrefix(functionPkgURL, SINK) || + strings.HasPrefix(functionPkgURL, SOURCE)) +} + // PulsarAdminAddPackagesTools adds package-related tools to the MCP server func PulsarAdminAddPackagesTools(s *server.MCPServer, readOnly bool, features []string) { if !slices.Contains(features, string(FeaturePulsarAdminPackages)) && !slices.Contains(features, string(FeatureAll)) && !slices.Contains(features, string(FeatureAllPulsar)) && !slices.Contains(features, string(FeaturePulsarAdmin)) { diff --git a/pkg/mcp/pulsar_admin_sinks_tools.go b/pkg/mcp/pulsar_admin_sinks_tools.go index b6c138d..83d6a61 100644 --- a/pkg/mcp/pulsar_admin_sinks_tools.go +++ b/pkg/mcp/pulsar_admin_sinks_tools.go @@ -350,7 +350,7 @@ func handleSinkCreate(_ context.Context, admin cmdutils.Client, arguments map[st } // Create the sink - if sinkData.Archive != "" && isPackageURLSupported(sinkData.Archive) { + if sinkData.Archive != "" && IsPackageURLSupported(sinkData.Archive) { err = admin.Sinks().CreateSinkWithURL(sinkData.SinkConf, sinkData.Archive) } else { err = admin.Sinks().CreateSink(sinkData.SinkConf, sinkData.Archive) @@ -453,7 +453,7 @@ func handleSinkUpdate(_ context.Context, admin cmdutils.Client, arguments map[st } // Update the sink - if sinkData.Archive != "" && isPackageURLSupported(sinkData.Archive) { + if sinkData.Archive != "" && IsPackageURLSupported(sinkData.Archive) { err = admin.Sinks().UpdateSinkWithURL(sinkData.SinkConf, sinkData.Archive, updateOptions) } else { err = admin.Sinks().UpdateSink(sinkData.SinkConf, sinkData.Archive, updateOptions) diff --git a/pkg/mcp/pulsar_admin_sources_tools.go b/pkg/mcp/pulsar_admin_sources_tools.go index 157655e..98cac07 100644 --- a/pkg/mcp/pulsar_admin_sources_tools.go +++ b/pkg/mcp/pulsar_admin_sources_tools.go @@ -367,7 +367,7 @@ func handleSourceCreate(_ context.Context, admin cmdutils.Client, arguments map[ } // Create the source - if sourceData.Archive != "" && isPackageURLSupported(sourceData.Archive) { + if sourceData.Archive != "" && IsPackageURLSupported(sourceData.Archive) { err = admin.Sources().CreateSourceWithURL(sourceData.SourceConf, sourceData.Archive) } else { err = admin.Sources().CreateSource(sourceData.SourceConf, sourceData.Archive) @@ -480,7 +480,7 @@ func handleSourceUpdate(_ context.Context, admin cmdutils.Client, arguments map[ } // Update the source - if sourceData.Archive != "" && isPackageURLSupported(sourceData.Archive) { + if sourceData.Archive != "" && IsPackageURLSupported(sourceData.Archive) { err = admin.Sources().UpdateSourceWithURL(sourceData.SourceConf, sourceData.Archive, updateOptions) } else { err = admin.Sources().UpdateSource(sourceData.SourceConf, sourceData.Archive, updateOptions) diff --git a/pkg/mcp/utils.go b/pkg/mcp/utils.go index a6799a0..fef2ca3 100644 --- a/pkg/mcp/utils.go +++ b/pkg/mcp/utils.go @@ -246,18 +246,6 @@ func isTokenAboutToExpire(cachedGrant *auth.AuthorizationGrant, window time.Dura return timeUntilExpiry <= window, nil } -// isPackageURLSupported checks if the provided URL protocol is supported for package download -func isPackageURLSupported(packageURL string) bool { - // Check if the URL has a supported protocol: http, https, file - supportedProtocols := []string{"http://", "https://", "file://"} - for _, protocol := range supportedProtocols { - if strings.HasPrefix(packageURL, protocol) { - return true - } - } - return false -} - // parseMessageConfigs parses a list of key=value strings into a map func parseMessageConfigs(configs []string) (map[string]*string, error) { result := make(map[string]*string)