@@ -26,6 +26,7 @@ import (
2626 "sync"
2727 "time"
2828
29+ "github.com/apache/pulsar-client-go/pulsar"
2930 "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
3031 "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
3132 "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
@@ -34,7 +35,7 @@ import (
3435 "github.com/mark3labs/mcp-go/mcp"
3536 "github.com/mark3labs/mcp-go/server"
3637 "github.com/streamnative/pulsarctl/pkg/cmdutils"
37- "github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
38+ pulsarutils "github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
3839 "github.com/streamnative/streamnative-mcp-server/pkg/schema"
3940)
4041
@@ -56,7 +57,7 @@ var DefaultStringSchemaInfo = &SchemaInfo{
5657// NewPulsarFunctionManager creates a new PulsarFunctionManager
5758func NewPulsarFunctionManager (mcpServer * server.MCPServer , readOnly bool , options * ManagerOptions ) (* PulsarFunctionManager , error ) {
5859 // Get Pulsar client and admin client
59- pulsarClient , err := pulsar .GetPulsarClient ()
60+ pulsarClient , err := pulsarutils .GetPulsarClient ()
6061 if err != nil {
6162 return nil , fmt .Errorf ("failed to get Pulsar client: %w" , err )
6263 }
@@ -74,6 +75,8 @@ func NewPulsarFunctionManager(mcpServer *server.MCPServer, readOnly bool, option
7475 pulsarClient : pulsarClient ,
7576 fnToToolMap : make (map [string ]* FunctionTool ),
7677 mutex : sync.RWMutex {},
78+ producerCache : make (map [string ]pulsar.Producer ),
79+ producerMutex : sync.RWMutex {},
7780 pollInterval : options .PollInterval ,
7881 stopCh : make (chan struct {}),
7982 callInProgressMap : make (map [string ]context.CancelFunc ),
@@ -96,6 +99,15 @@ func (m *PulsarFunctionManager) Start() {
9699// Stop stops polling for functions
97100func (m * PulsarFunctionManager ) Stop () {
98101 close (m .stopCh )
102+
103+ m .producerMutex .Lock ()
104+ defer m .producerMutex .Unlock ()
105+ for topic , producer := range m .producerCache {
106+ log .Printf ("Closing producer for topic: %s" , topic )
107+ producer .Close ()
108+ }
109+ m .producerCache = make (map [string ]pulsar.Producer )
110+ log .Println ("All cached producers closed and cache cleared." )
99111}
100112
101113// pollFunctions polls for functions periodically
@@ -406,7 +418,7 @@ func (m *PulsarFunctionManager) handleToolCall(fnTool *FunctionTool) func(ctx co
406418 }
407419
408420 // Create function invoker
409- invoker := NewFunctionInvoker (m . pulsarClient )
421+ invoker := NewFunctionInvoker (m )
410422
411423 // Create context with timeout
412424 timeoutCtx , cancel := context .WithTimeout (ctx , m .defaultTimeout )
@@ -484,3 +496,33 @@ func retrieveToolDescription(fn *utils.FunctionConfig) string {
484496 }
485497 return fallbackDescription
486498}
499+
500+ // GetProducer retrieves a producer from the cache or creates a new one if not found.
501+ func (m * PulsarFunctionManager ) GetProducer (topic string ) (pulsar.Producer , error ) {
502+ m .producerMutex .RLock ()
503+ producer , found := m .producerCache [topic ]
504+ m .producerMutex .RUnlock ()
505+
506+ if found {
507+ return producer , nil
508+ }
509+
510+ m .producerMutex .Lock ()
511+ defer m .producerMutex .Unlock ()
512+
513+ producer , found = m .producerCache [topic ]
514+ if found {
515+ return producer , nil
516+ }
517+
518+ newProducer , err := m .pulsarClient .CreateProducer (pulsar.ProducerOptions {
519+ Topic : topic ,
520+ })
521+ if err != nil {
522+ return nil , fmt .Errorf ("failed to create producer for topic %s: %w" , topic , err )
523+ }
524+
525+ m .producerCache [topic ] = newProducer
526+ log .Printf ("Created and cached producer for topic: %s" , topic )
527+ return newProducer , nil
528+ }
0 commit comments