Skip to content

Commit 1168783

Browse files
authored
Enhance Pulsar admin functions and connectors tools with package URL support (#16)
- Updated function descriptions to mark triggerValue as required for 'trigger' operations. - Introduced IsPackageURLSupported function to validate package URLs for functions, sinks, and sources. - Refactored function creation and update logic to handle package URLs appropriately in Pulsar admin tools.
1 parent 0df3bd2 commit 1168783

File tree

5 files changed

+45
-23
lines changed

5 files changed

+45
-23
lines changed

pkg/mcp/pulsar_admin_functions_tools.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func PulsarAdminAddFunctionsTools(s *server.MCPServer, readOnly bool, features [
142142
"Used when triggering a function that consumes from multiple topics. "+
143143
"If not provided, the first input topic will be used.")),
144144
mcp.WithString("triggerValue",
145-
mcp.Description("The value with which to trigger the function. Optional for 'trigger' operation. "+
145+
mcp.Description("The value with which to trigger the function. Required for 'trigger' operation. "+
146146
"This value will be passed to the function as if it were a message from the input topic. "+
147147
"String values are sent as is; for typed values, ensure proper formatting based on function expectations. "+
148148
"The function processes this value just like a normal message.")),
@@ -411,11 +411,19 @@ func handleFunctionCreate(_ context.Context, client cmdutils.Client, tenant, nam
411411
functionConfig.UserConfig = userConfigMap
412412
}
413413

414-
// Create function
415-
err = client.Functions().CreateFunc(functionConfig, fileName)
416-
if err != nil {
417-
return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.",
418-
name, tenant, namespace, err)), nil
414+
if IsPackageURLSupported(fileName) {
415+
err = client.Functions().CreateFuncWithURL(functionConfig, fileName)
416+
if err != nil {
417+
return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.",
418+
name, tenant, namespace, err)), nil
419+
}
420+
} else {
421+
// Create function
422+
err = client.Functions().CreateFunc(functionConfig, fileName)
423+
if err != nil {
424+
return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify all parameters are correct and required resources exist.",
425+
name, tenant, namespace, err)), nil
426+
}
419427
}
420428

421429
return mcp.NewToolResultText(fmt.Sprintf("Created function '%s' successfully in tenant '%s' namespace '%s'. The function will start processing messages from input topics.",
@@ -488,7 +496,12 @@ func handleFunctionUpdate(_ context.Context, client cmdutils.Client, tenant, nam
488496
}
489497

490498
// Update function
491-
err := client.Functions().UpdateFunction(functionConfig, fileName, updateOptions)
499+
var err error
500+
if IsPackageURLSupported(fileName) {
501+
err = client.Functions().UpdateFunctionWithURL(functionConfig, fileName, updateOptions)
502+
} else {
503+
err = client.Functions().UpdateFunction(functionConfig, fileName, updateOptions)
504+
}
492505
if err != nil {
493506
return mcp.NewToolResultError(fmt.Sprintf("Failed to update function '%s' in tenant '%s' namespace '%s': %v. Verify the function exists and all parameters are valid.",
494507
name, tenant, namespace, err)), nil

pkg/mcp/pulsar_admin_packages_tools.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,27 @@ import (
3030
"github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
3131
)
3232

33+
const (
34+
HTTP = "http"
35+
FILE = "file"
36+
BUILTIN = "builtin"
37+
38+
FUNCTION = "function"
39+
SINK = "sink"
40+
SOURCE = "source"
41+
42+
PublicTenant = "public"
43+
DefaultNamespace = "default"
44+
)
45+
46+
func IsPackageURLSupported(functionPkgURL string) bool {
47+
return functionPkgURL != "" && (strings.HasPrefix(functionPkgURL, HTTP) ||
48+
strings.HasPrefix(functionPkgURL, FILE) ||
49+
strings.HasPrefix(functionPkgURL, FUNCTION) ||
50+
strings.HasPrefix(functionPkgURL, SINK) ||
51+
strings.HasPrefix(functionPkgURL, SOURCE))
52+
}
53+
3354
// PulsarAdminAddPackagesTools adds package-related tools to the MCP server
3455
func PulsarAdminAddPackagesTools(s *server.MCPServer, readOnly bool, features []string) {
3556
if !slices.Contains(features, string(FeaturePulsarAdminPackages)) && !slices.Contains(features, string(FeatureAll)) && !slices.Contains(features, string(FeatureAllPulsar)) && !slices.Contains(features, string(FeaturePulsarAdmin)) {

pkg/mcp/pulsar_admin_sinks_tools.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ func handleSinkCreate(_ context.Context, admin cmdutils.Client, arguments map[st
350350
}
351351

352352
// Create the sink
353-
if sinkData.Archive != "" && isPackageURLSupported(sinkData.Archive) {
353+
if sinkData.Archive != "" && IsPackageURLSupported(sinkData.Archive) {
354354
err = admin.Sinks().CreateSinkWithURL(sinkData.SinkConf, sinkData.Archive)
355355
} else {
356356
err = admin.Sinks().CreateSink(sinkData.SinkConf, sinkData.Archive)
@@ -453,7 +453,7 @@ func handleSinkUpdate(_ context.Context, admin cmdutils.Client, arguments map[st
453453
}
454454

455455
// Update the sink
456-
if sinkData.Archive != "" && isPackageURLSupported(sinkData.Archive) {
456+
if sinkData.Archive != "" && IsPackageURLSupported(sinkData.Archive) {
457457
err = admin.Sinks().UpdateSinkWithURL(sinkData.SinkConf, sinkData.Archive, updateOptions)
458458
} else {
459459
err = admin.Sinks().UpdateSink(sinkData.SinkConf, sinkData.Archive, updateOptions)

pkg/mcp/pulsar_admin_sources_tools.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func handleSourceCreate(_ context.Context, admin cmdutils.Client, arguments map[
367367
}
368368

369369
// Create the source
370-
if sourceData.Archive != "" && isPackageURLSupported(sourceData.Archive) {
370+
if sourceData.Archive != "" && IsPackageURLSupported(sourceData.Archive) {
371371
err = admin.Sources().CreateSourceWithURL(sourceData.SourceConf, sourceData.Archive)
372372
} else {
373373
err = admin.Sources().CreateSource(sourceData.SourceConf, sourceData.Archive)
@@ -480,7 +480,7 @@ func handleSourceUpdate(_ context.Context, admin cmdutils.Client, arguments map[
480480
}
481481

482482
// Update the source
483-
if sourceData.Archive != "" && isPackageURLSupported(sourceData.Archive) {
483+
if sourceData.Archive != "" && IsPackageURLSupported(sourceData.Archive) {
484484
err = admin.Sources().UpdateSourceWithURL(sourceData.SourceConf, sourceData.Archive, updateOptions)
485485
} else {
486486
err = admin.Sources().UpdateSource(sourceData.SourceConf, sourceData.Archive, updateOptions)

pkg/mcp/utils.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -246,18 +246,6 @@ func isTokenAboutToExpire(cachedGrant *auth.AuthorizationGrant, window time.Dura
246246
return timeUntilExpiry <= window, nil
247247
}
248248

249-
// isPackageURLSupported checks if the provided URL protocol is supported for package download
250-
func isPackageURLSupported(packageURL string) bool {
251-
// Check if the URL has a supported protocol: http, https, file
252-
supportedProtocols := []string{"http://", "https://", "file://"}
253-
for _, protocol := range supportedProtocols {
254-
if strings.HasPrefix(packageURL, protocol) {
255-
return true
256-
}
257-
}
258-
return false
259-
}
260-
261249
// parseMessageConfigs parses a list of key=value strings into a map
262250
func parseMessageConfigs(configs []string) (map[string]*string, error) {
263251
result := make(map[string]*string)

0 commit comments

Comments
 (0)