Skip to content

Commit c5ddc05

Browse files
committed
Refactor Pulsar Function Manager to Use Type-Safe Interfaces
- Updated the Pulsar Function Manager to utilize type-safe interfaces for Kafka and Pulsar sessions, improving code clarity and maintainability. - Changed the session fields in the Server struct to specific types, enhancing type safety. - Simplified the retrieval of the Pulsar client by removing unnecessary interface assertions, streamlining the code. - Updated producer cache to use the new type-safe producer, ensuring consistency across the codebase.
1 parent 772cfe2 commit c5ddc05

File tree

1 file changed

+13
-17
lines changed

1 file changed

+13
-17
lines changed

pkg/mcp/pftools/manager.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"sync"
2727
"time"
2828

29-
"github.com/apache/pulsar-client-go/pulsar"
29+
pulsarclient "github.com/apache/pulsar-client-go/pulsar"
3030
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
3131
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
3232
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
@@ -35,6 +35,8 @@ import (
3535
"github.com/mark3labs/mcp-go/mcp"
3636
"github.com/mark3labs/mcp-go/server"
3737
"github.com/streamnative/pulsarctl/pkg/cmdutils"
38+
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
39+
"github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
3840
"github.com/streamnative/streamnative-mcp-server/pkg/schema"
3941
)
4042

@@ -56,8 +58,8 @@ var DefaultStringSchemaInfo = &SchemaInfo{
5658
// Server is imported directly to avoid circular dependency
5759
type Server struct {
5860
MCPServer *server.MCPServer
59-
KafkaSession interface{}
60-
PulsarSession interface{}
61+
KafkaSession *kafka.Session
62+
PulsarSession *pulsar.Session
6163
Logger interface{}
6264
}
6365

@@ -68,16 +70,10 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
6870
return nil, fmt.Errorf("Pulsar session not found in context")
6971
}
7072

71-
// Get Pulsar client from session using interface assertion
72-
var pulsarClient pulsar.Client
73-
if ps, ok := snServer.PulsarSession.(interface{ GetPulsarClient() (pulsar.Client, error) }); ok {
74-
var err error
75-
pulsarClient, err = ps.GetPulsarClient()
76-
if err != nil {
77-
return nil, fmt.Errorf("failed to get Pulsar client: %w", err)
78-
}
79-
} else {
80-
return nil, fmt.Errorf("Pulsar session does not support GetPulsarClient method")
73+
// Get Pulsar client from session using type-safe interface
74+
pulsarClient, err := snServer.PulsarSession.GetPulsarClient()
75+
if err != nil {
76+
return nil, fmt.Errorf("failed to get Pulsar client: %w", err)
8177
}
8278

8379
adminClient := cmdutils.NewPulsarClientWithAPIVersion(config.V3)
@@ -93,7 +89,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
9389
pulsarClient: pulsarClient,
9490
fnToToolMap: make(map[string]*FunctionTool),
9591
mutex: sync.RWMutex{},
96-
producerCache: make(map[string]pulsar.Producer),
92+
producerCache: make(map[string]pulsarclient.Producer),
9793
producerMutex: sync.RWMutex{},
9894
pollInterval: options.PollInterval,
9995
stopCh: make(chan struct{}),
@@ -125,7 +121,7 @@ func (m *PulsarFunctionManager) Stop() {
125121
log.Printf("Closing producer for topic: %s", topic)
126122
producer.Close()
127123
}
128-
m.producerCache = make(map[string]pulsar.Producer)
124+
m.producerCache = make(map[string]pulsarclient.Producer)
129125
log.Println("All cached producers closed and cache cleared.")
130126
}
131127

@@ -538,7 +534,7 @@ func retrieveToolDescription(fn *utils.FunctionConfig) string {
538534
}
539535

540536
// GetProducer retrieves a producer from the cache or creates a new one if not found.
541-
func (m *PulsarFunctionManager) GetProducer(topic string) (pulsar.Producer, error) {
537+
func (m *PulsarFunctionManager) GetProducer(topic string) (pulsarclient.Producer, error) {
542538
m.producerMutex.RLock()
543539
producer, found := m.producerCache[topic]
544540
m.producerMutex.RUnlock()
@@ -555,7 +551,7 @@ func (m *PulsarFunctionManager) GetProducer(topic string) (pulsar.Producer, erro
555551
return producer, nil
556552
}
557553

558-
newProducer, err := m.pulsarClient.CreateProducer(pulsar.ProducerOptions{
554+
newProducer, err := m.pulsarClient.CreateProducer(pulsarclient.ProducerOptions{
559555
Topic: topic,
560556
})
561557
if err != nil {

0 commit comments

Comments
 (0)