Skip to content

Commit 564bcb2

Browse files
committed
Enhance Pulsar Function Management with Session ID Support
- Updated `PulsarFunctionManagedMcpTools` to accept a `sessionId` parameter, allowing for session-based tool management. - Modified the `NewPulsarFunctionManager` function to include `sessionId`, improving the handling of session-specific tools. - Enhanced the `updateFunctions` method to manage tools based on the presence of a session ID, ensuring proper addition and deletion of session tools. - Added comments for better clarity on the changes and their implications for session management.
1 parent b63de3e commit 564bcb2

File tree

4 files changed

+31
-7
lines changed

4 files changed

+31
-7
lines changed

pkg/cmd/mcp/sse.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ func runSseServer(configOpts *ServerOptions) error {
7676
}
7777

7878
// add Pulsar Functions as MCP tools
79-
mcpServer.PulsarFunctionManagedMcpTools(configOpts.ReadOnly, configOpts.Features)
79+
// SSE is not support session-based tools, so we pass an empty sessionId
80+
mcpServer.PulsarFunctionManagedMcpTools(configOpts.ReadOnly, configOpts.Features, "")
8081

8182
sseServer := server.NewSSEServer(
8283
mcpServer.MCPServer,

pkg/mcp/internal/pftools/manager.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type Server struct {
6262
}
6363

6464
// NewPulsarFunctionManager creates a new PulsarFunctionManager
65-
func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions) (*PulsarFunctionManager, error) {
65+
func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions, sessionId string) (*PulsarFunctionManager, error) {
6666
// Get Pulsar client and admin client
6767
if snServer.PulsarSession == nil {
6868
return nil, fmt.Errorf("Pulsar session not found in context")
@@ -104,6 +104,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
104104
circuitBreakers: make(map[string]*CircuitBreaker),
105105
tenantNamespaces: options.TenantNamespaces,
106106
strictExport: options.StrictExport,
107+
sessionId: sessionId,
107108
}
108109

109110
return manager, nil
@@ -197,9 +198,23 @@ func (m *PulsarFunctionManager) updateFunctions() {
197198
}
198199

199200
if changed {
200-
m.mcpServer.DeleteTools(fnTool.Tool.Name)
201+
if m.sessionId != "" {
202+
err := m.mcpServer.DeleteSessionTools(m.sessionId, fnTool.Tool.Name)
203+
if err != nil {
204+
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionId, err)
205+
}
206+
} else {
207+
m.mcpServer.DeleteTools(fnTool.Tool.Name)
208+
}
209+
}
210+
if m.sessionId != "" {
211+
err := m.mcpServer.AddSessionTool(m.sessionId, fnTool.Tool, m.handleToolCall(fnTool))
212+
if err != nil {
213+
log.Printf("Failed to add tool %s to session %s: %v", fnTool.Tool.Name, m.sessionId, err)
214+
}
215+
} else {
216+
m.mcpServer.AddTool(fnTool.Tool, m.handleToolCall(fnTool))
201217
}
202-
m.mcpServer.AddTool(fnTool.Tool, m.handleToolCall(fnTool))
203218

204219
// Add function to map
205220
m.mutex.Lock()
@@ -217,7 +232,14 @@ func (m *PulsarFunctionManager) updateFunctions() {
217232
m.mutex.Lock()
218233
for fullName, fnTool := range m.fnToToolMap {
219234
if !seenFunctions[fullName] {
220-
m.mcpServer.DeleteTools(fnTool.Tool.Name)
235+
if m.sessionId != "" {
236+
err := m.mcpServer.DeleteSessionTools(m.sessionId, fnTool.Tool.Name)
237+
if err != nil {
238+
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionId, err)
239+
}
240+
} else {
241+
m.mcpServer.DeleteTools(fnTool.Tool.Name)
242+
}
221243
delete(m.fnToToolMap, fullName)
222244
log.Printf("Removed function %s from MCP tools [%s]", fullName, fnTool.Tool.Name)
223245
}

pkg/mcp/internal/pftools/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type PulsarFunctionManager struct {
4747
circuitBreakers map[string]*CircuitBreaker
4848
tenantNamespaces []string
4949
strictExport bool
50+
sessionId string
5051
}
5152

5253
type FunctionTool struct {

pkg/mcp/pulsar_functions_as_tools.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func StopAllPulsarFunctionManagers() {
5151
log.Println("All Pulsar Function managers stopped")
5252
}
5353

54-
func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string) {
54+
func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string, sessionId string) {
5555
if !slices.Contains(features, string(FeatureAll)) &&
5656
!slices.Contains(features, string(FeatureFunctionsAsTools)) &&
5757
!slices.Contains(features, string(FeatureStreamNativeCloud)) {
@@ -106,7 +106,7 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string)
106106
Logger: s.logger,
107107
}
108108

109-
manager, err := pftools.NewPulsarFunctionManager(pftoolsServer, readOnly, options)
109+
manager, err := pftools.NewPulsarFunctionManager(pftoolsServer, readOnly, options, sessionId)
110110
if err != nil {
111111
log.Printf("Failed to create Pulsar Function manager: %v", err)
112112
return

0 commit comments

Comments
 (0)