Skip to content

Commit b5f9ef5

Browse files
committed
Refactor Pulsar Functions Management and Introduce New Tools
- Updated the Pulsar functions management to utilize a new internal package for better modularity and maintainability. - Refactored function manager to handle function invocation and result tracking more efficiently. - Introduced new files for circuit breaker, error handling, invocation logic, and schema management, enhancing the overall structure. - Improved error handling and added comments for better clarity and understanding of the code. - Ensured consistent usage of the new package across all relevant tools, improving code organization and readability.
1 parent 6ee1627 commit b5f9ef5

File tree

7 files changed

+5
-5
lines changed

7 files changed

+5
-5
lines changed
File renamed without changes.

pkg/mcp/pulsar_functions_as_tools.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"sync"
2727
"time"
2828

29-
"github.com/streamnative/streamnative-mcp-server/pkg/mcp/internal/pftools"
29+
pftools2 "github.com/streamnative/streamnative-mcp-server/pkg/mcp/pftools"
3030
)
3131

3232
var (
33-
functionManagers = make(map[string]*pftools.PulsarFunctionManager)
33+
functionManagers = make(map[string]*pftools2.PulsarFunctionManager)
3434
functionManagersLock sync.RWMutex
3535
)
3636

@@ -64,7 +64,7 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
6464
return
6565
}
6666

67-
options := pftools.DefaultManagerOptions()
67+
options := pftools2.DefaultManagerOptions()
6868

6969
if s.SNCloudSession.Ctx.Organization == "" || s.SNCloudSession.Ctx.PulsarInstance == "" || s.SNCloudSession.Ctx.PulsarCluster == "" {
7070
log.Printf("Skipping Pulsar Functions as MCP Tools because both organization, pulsar instance and pulsar cluster are not set")
@@ -110,14 +110,14 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
110110
}
111111

112112
// Convert Server to the internal pftools.Server type
113-
pftoolsServer := &pftools.Server{
113+
pftoolsServer := &pftools2.Server{
114114
MCPServer: s.MCPServer,
115115
KafkaSession: s.KafkaSession,
116116
PulsarSession: s.PulsarSession,
117117
Logger: s.logger,
118118
}
119119

120-
manager, err := pftools.NewPulsarFunctionManager(pftoolsServer, readOnly, options, sessionID)
120+
manager, err := pftools2.NewPulsarFunctionManager(pftoolsServer, readOnly, options, sessionID)
121121
if err != nil {
122122
log.Printf("Failed to create Pulsar Function manager: %v", err)
123123
return

0 commit comments

Comments
 (0)