Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions pkg/mcp/pulsar_admin_functions_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func PulsarAdminAddFunctionsTools(s *server.MCPServer, readOnly bool, features [
"Used when triggering a function that consumes from multiple topics. "+
"If not provided, the first input topic will be used.")),
mcp.WithString("triggerValue",
mcp.Description("The value with which to trigger the function. Optional for 'trigger' operation. "+
mcp.Description("The value with which to trigger the function. Required for 'trigger' operation. "+
"This value will be passed to the function as if it were a message from the input topic. "+
"String values are sent as is; for typed values, ensure proper formatting based on function expectations. "+
"The function processes this value just like a normal message.")),
Expand Down Expand Up @@ -411,11 +411,19 @@ func handleFunctionCreate(_ context.Context, client cmdutils.Client, tenant, nam
functionConfig.UserConfig = userConfigMap
}

// Create function
err = client.Functions().CreateFunc(functionConfig, fileName)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.",
name, tenant, namespace, err)), nil
if IsPackageURLSupported(fileName) {
err = client.Functions().CreateFuncWithURL(functionConfig, fileName)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.",
name, tenant, namespace, err)), nil
}
} else {
// Create function
err = client.Functions().CreateFunc(functionConfig, fileName)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.",
name, tenant, namespace, err)), nil
}
}

return mcp.NewToolResultText(fmt.Sprintf("Created function '%s' successfully in tenant '%s' namespace '%s'. The function will start processing messages from input topics.",
Expand Down Expand Up @@ -488,7 +496,12 @@ func handleFunctionUpdate(_ context.Context, client cmdutils.Client, tenant, nam
}

// Update function
err := client.Functions().UpdateFunction(functionConfig, fileName, updateOptions)
var err error
if IsPackageURLSupported(fileName) {
err = client.Functions().UpdateFunctionWithURL(functionConfig, fileName, updateOptions)
} else {
err = client.Functions().UpdateFunction(functionConfig, fileName, updateOptions)
}
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to update function '%s' in tenant '%s' namespace '%s': %v. Verify the function exists and all parameters are valid.",
name, tenant, namespace, err)), nil
Expand Down
21 changes: 21 additions & 0 deletions pkg/mcp/pulsar_admin_packages_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ import (
"github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
)

const (
HTTP = "http"
FILE = "file"
BUILTIN = "builtin"

FUNCTION = "function"
SINK = "sink"
SOURCE = "source"

PublicTenant = "public"
DefaultNamespace = "default"
)

func IsPackageURLSupported(functionPkgURL string) bool {
return functionPkgURL != "" && (strings.HasPrefix(functionPkgURL, HTTP) ||
strings.HasPrefix(functionPkgURL, FILE) ||
strings.HasPrefix(functionPkgURL, FUNCTION) ||
strings.HasPrefix(functionPkgURL, SINK) ||
strings.HasPrefix(functionPkgURL, SOURCE))
}

// PulsarAdminAddPackagesTools adds package-related tools to the MCP server
func PulsarAdminAddPackagesTools(s *server.MCPServer, readOnly bool, features []string) {
if !slices.Contains(features, string(FeaturePulsarAdminPackages)) && !slices.Contains(features, string(FeatureAll)) && !slices.Contains(features, string(FeatureAllPulsar)) && !slices.Contains(features, string(FeaturePulsarAdmin)) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcp/pulsar_admin_sinks_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func handleSinkCreate(_ context.Context, admin cmdutils.Client, arguments map[st
}

// Create the sink
if sinkData.Archive != "" && isPackageURLSupported(sinkData.Archive) {
if sinkData.Archive != "" && IsPackageURLSupported(sinkData.Archive) {
err = admin.Sinks().CreateSinkWithURL(sinkData.SinkConf, sinkData.Archive)
} else {
err = admin.Sinks().CreateSink(sinkData.SinkConf, sinkData.Archive)
Expand Down Expand Up @@ -453,7 +453,7 @@ func handleSinkUpdate(_ context.Context, admin cmdutils.Client, arguments map[st
}

// Update the sink
if sinkData.Archive != "" && isPackageURLSupported(sinkData.Archive) {
if sinkData.Archive != "" && IsPackageURLSupported(sinkData.Archive) {
err = admin.Sinks().UpdateSinkWithURL(sinkData.SinkConf, sinkData.Archive, updateOptions)
} else {
err = admin.Sinks().UpdateSink(sinkData.SinkConf, sinkData.Archive, updateOptions)
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcp/pulsar_admin_sources_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func handleSourceCreate(_ context.Context, admin cmdutils.Client, arguments map[
}

// Create the source
if sourceData.Archive != "" && isPackageURLSupported(sourceData.Archive) {
if sourceData.Archive != "" && IsPackageURLSupported(sourceData.Archive) {
err = admin.Sources().CreateSourceWithURL(sourceData.SourceConf, sourceData.Archive)
} else {
err = admin.Sources().CreateSource(sourceData.SourceConf, sourceData.Archive)
Expand Down Expand Up @@ -480,7 +480,7 @@ func handleSourceUpdate(_ context.Context, admin cmdutils.Client, arguments map[
}

// Update the source
if sourceData.Archive != "" && isPackageURLSupported(sourceData.Archive) {
if sourceData.Archive != "" && IsPackageURLSupported(sourceData.Archive) {
err = admin.Sources().UpdateSourceWithURL(sourceData.SourceConf, sourceData.Archive, updateOptions)
} else {
err = admin.Sources().UpdateSource(sourceData.SourceConf, sourceData.Archive, updateOptions)
Expand Down
12 changes: 0 additions & 12 deletions pkg/mcp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,6 @@ func isTokenAboutToExpire(cachedGrant *auth.AuthorizationGrant, window time.Dura
return timeUntilExpiry <= window, nil
}

// isPackageURLSupported checks if the provided URL protocol is supported for package download
func isPackageURLSupported(packageURL string) bool {
// Check if the URL has a supported protocol: http, https, file
supportedProtocols := []string{"http://", "https://", "file://"}
for _, protocol := range supportedProtocols {
if strings.HasPrefix(packageURL, protocol) {
return true
}
}
return false
}

// parseMessageConfigs parses a list of key=value strings into a map
func parseMessageConfigs(configs []string) (map[string]*string, error) {
result := make(map[string]*string)
Expand Down