Skip to content

Commit 2124b60

Browse files
committed
Refactor session management and enhance MCP server functionality
- Updated `newMcpServer` function to accept a context parameter, improving the encapsulation of server initialization. - Refactored Kafka and Pulsar session management to utilize session-based context handling, enhancing code readability and maintainability. - Modified `PulsarFunctionManagedMcpTools` to use `sessionID` instead of `sessionId`, ensuring consistency across session management. - Improved error handling and added comments for better understanding of the code structure and functionality.
1 parent 4294781 commit 2124b60

File tree

9 files changed

+53
-62
lines changed

9 files changed

+53
-62
lines changed

pkg/cmd/mcp/server.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
3232
)
3333

34-
func newMcpServer(ctx context.Context, configOpts *ServerOptions, logrusLogger *logrus.Logger) (*mcp.Server, error) {
34+
func newMcpServer(_ context.Context, configOpts *ServerOptions, logrusLogger *logrus.Logger) (*mcp.Server, error) {
3535
snConfig := configOpts.Options.LoadConfigOrDie()
3636
var s *server.MCPServer
3737
var mcpServer *mcp.Server
@@ -51,8 +51,6 @@ func newMcpServer(ctx context.Context, configOpts *ServerOptions, logrusLogger *
5151
}
5252
mcpServer = mcp.NewServer("streamnative-mcp-server", "0.0.1", logrusLogger, server.WithInstructions(mcp.GetStreamNativeCloudServerInstructions(userName, snConfig)))
5353
mcpServer.SNCloudSession = session
54-
mcpServer.KafkaSession = &kafka.Session{}
55-
mcpServer.PulsarSession = &pulsar.Session{}
5654

5755
s = mcpServer.MCPServer
5856
mcp.RegisterPrompts(s)
@@ -97,7 +95,7 @@ func newMcpServer(ctx context.Context, configOpts *ServerOptions, logrusLogger *
9795
TLSTrustCertsFilePath: snConfig.ExternalPulsar.TLSTrustCertsFilePath,
9896
TLSCertFile: snConfig.ExternalPulsar.TLSCertFile,
9997
TLSKeyFile: snConfig.ExternalPulsar.TLSKeyFile,
100-
}, nil, nil)
98+
})
10199
if err != nil {
102100
return nil, errors.Wrap(err, "failed to set external Pulsar context")
103101
}

pkg/kafka/connection.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,13 @@ func NewSession(ctx KafkaContext) (*Session, error) {
7676
Ctx: ctx,
7777
}
7878

79-
if err := session.SetKafkaContext(); err != nil {
79+
if err := session.SetKafkaContext(ctx); err != nil {
8080
return nil, fmt.Errorf("failed to set kafka context: %w", err)
8181
}
8282

8383
return session, nil
8484
}
8585

86-
func (s *Session) ChangeContext(ctx KafkaContext) error {
87-
s.Ctx = ctx
88-
return s.SetKafkaContext()
89-
}
90-
9186
type SASLConfig struct {
9287
Mechanism string
9388
Username string
@@ -152,7 +147,8 @@ func saslOpt(config *SASLConfig, opts []kgo.Opt) ([]kgo.Opt, error) {
152147
return opts, nil
153148
}
154149

155-
func (s *Session) SetKafkaContext() error {
150+
func (s *Session) SetKafkaContext(ctx KafkaContext) error {
151+
s.Ctx = ctx
156152
kc := &s.Ctx
157153
var err error
158154
s.Options = []kgo.Opt{}

pkg/mcp/internal/pftools/manager.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type Server struct {
6262
}
6363

6464
// NewPulsarFunctionManager creates a new PulsarFunctionManager
65-
func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions, sessionId string) (*PulsarFunctionManager, error) {
65+
func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerOptions, sessionID string) (*PulsarFunctionManager, error) {
6666
// Get Pulsar client and admin client
6767
if snServer.PulsarSession == nil {
6868
return nil, fmt.Errorf("Pulsar session not found in context")
@@ -104,7 +104,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
104104
circuitBreakers: make(map[string]*CircuitBreaker),
105105
tenantNamespaces: options.TenantNamespaces,
106106
strictExport: options.StrictExport,
107-
sessionId: sessionId,
107+
sessionID: sessionID,
108108
}
109109

110110
return manager, nil
@@ -198,19 +198,19 @@ func (m *PulsarFunctionManager) updateFunctions() {
198198
}
199199

200200
if changed {
201-
if m.sessionId != "" {
202-
err := m.mcpServer.DeleteSessionTools(m.sessionId, fnTool.Tool.Name)
201+
if m.sessionID != "" {
202+
err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name)
203203
if err != nil {
204-
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionId, err)
204+
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionID, err)
205205
}
206206
} else {
207207
m.mcpServer.DeleteTools(fnTool.Tool.Name)
208208
}
209209
}
210-
if m.sessionId != "" {
211-
err := m.mcpServer.AddSessionTool(m.sessionId, fnTool.Tool, m.handleToolCall(fnTool))
210+
if m.sessionID != "" {
211+
err := m.mcpServer.AddSessionTool(m.sessionID, fnTool.Tool, m.handleToolCall(fnTool))
212212
if err != nil {
213-
log.Printf("Failed to add tool %s to session %s: %v", fnTool.Tool.Name, m.sessionId, err)
213+
log.Printf("Failed to add tool %s to session %s: %v", fnTool.Tool.Name, m.sessionID, err)
214214
}
215215
} else {
216216
m.mcpServer.AddTool(fnTool.Tool, m.handleToolCall(fnTool))
@@ -232,10 +232,10 @@ func (m *PulsarFunctionManager) updateFunctions() {
232232
m.mutex.Lock()
233233
for fullName, fnTool := range m.fnToToolMap {
234234
if !seenFunctions[fullName] {
235-
if m.sessionId != "" {
236-
err := m.mcpServer.DeleteSessionTools(m.sessionId, fnTool.Tool.Name)
235+
if m.sessionID != "" {
236+
err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name)
237237
if err != nil {
238-
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionId, err)
238+
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionID, err)
239239
}
240240
} else {
241241
m.mcpServer.DeleteTools(fnTool.Tool.Name)

pkg/mcp/internal/pftools/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type PulsarFunctionManager struct {
4747
circuitBreakers map[string]*CircuitBreaker
4848
tenantNamespaces []string
4949
strictExport bool
50-
sessionId string
50+
sessionID string
5151
}
5252

5353
type FunctionTool struct {

pkg/mcp/pulsar_admin_topic_policy_tools.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2828
"github.com/mark3labs/mcp-go/mcp"
2929
"github.com/mark3labs/mcp-go/server"
30-
3130
)
3231

3332
// PulsarAdminAddTopicPolicyTools adds topic policy-related tools to the MCP server

pkg/mcp/pulsar_functions_as_tools.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func StopAllPulsarFunctionManagers() {
5151
log.Println("All Pulsar Function managers stopped")
5252
}
5353

54-
func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string, sessionId string) {
54+
func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string, sessionID string) {
5555
if !slices.Contains(features, string(FeatureAll)) &&
5656
!slices.Contains(features, string(FeatureFunctionsAsTools)) &&
5757
!slices.Contains(features, string(FeatureStreamNativeCloud)) {
@@ -60,6 +60,11 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
6060

6161
options := pftools.DefaultManagerOptions()
6262

63+
if s.SNCloudSession.Ctx.Organization == "" || s.SNCloudSession.Ctx.PulsarInstance == "" || s.SNCloudSession.Ctx.PulsarCluster == "" {
64+
log.Printf("Skipping Pulsar Functions as MCP Tools because both organization, pulsar instance and pulsar cluster are not set")
65+
return
66+
}
67+
6368
if pollIntervalStr := os.Getenv("FUNCTIONS_AS_TOOLS_POLL_INTERVAL"); pollIntervalStr != "" {
6469
if seconds, err := strconv.Atoi(pollIntervalStr); err == nil && seconds > 0 {
6570
options.PollInterval = time.Duration(seconds) * time.Second
@@ -106,7 +111,7 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
106111
Logger: s.logger,
107112
}
108113

109-
manager, err := pftools.NewPulsarFunctionManager(pftoolsServer, readOnly, options, sessionId)
114+
manager, err := pftools.NewPulsarFunctionManager(pftoolsServer, readOnly, options, sessionID)
110115
if err != nil {
111116
log.Printf("Failed to create Pulsar Function manager: %v", err)
112117
return

pkg/mcp/server.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ func addOpts(opts ...server.ServerOption) []server.ServerOption {
5353

5454
func createSNCloudMCPServer(s *server.MCPServer, logger *logrus.Logger) *Server {
5555
mcpserver := &Server{
56-
MCPServer: s,
57-
logger: logger,
56+
MCPServer: s,
57+
logger: logger,
58+
SNCloudSession: &config.Session{},
59+
KafkaSession: &kafka.Session{},
60+
PulsarSession: &pulsar.Session{},
5861
}
5962

6063
return mcpserver

pkg/mcp/sncontext_utils.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,11 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus
159159
if psession == nil {
160160
return fmt.Errorf("failed to get pulsar session")
161161
}
162-
err = psession.ChangeContext(pulsar.PulsarContext{
162+
err = psession.SetPulsarContext(pulsar.PulsarContext{
163163
WebServiceURL: getBasePath(snConfig.ProxyLocation, options.Organization, clusterUID),
164164
ServiceURL: getServiceURL(dnsName),
165165
Token: accessToken,
166-
}, issuer, &options.AuthOptions.Store)
166+
})
167167
if err != nil {
168168
return fmt.Errorf("failed to change pulsar context: %v", err)
169169
}
@@ -187,10 +187,14 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus
187187
if ksession == nil {
188188
return fmt.Errorf("failed to get kafka session")
189189
}
190-
err = ksession.ChangeContext(kctx)
190+
err = ksession.SetKafkaContext(kctx)
191191
if err != nil {
192192
return fmt.Errorf("failed to change kafka context: %v", err)
193193
}
194194

195+
// TODO: check if need to set log client
196+
// if issuer != nil && options.AuthOptions.Store != nil {
197+
// }
198+
195199
return nil
196200
}

pkg/pulsar/connection.go

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import (
2525
"github.com/apache/pulsar-client-go/pulsar"
2626
pulsaradminconfig "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
2727
"github.com/streamnative/pulsarctl/pkg/cmdutils"
28-
"github.com/streamnative/streamnative-mcp-server/pkg/auth"
29-
"github.com/streamnative/streamnative-mcp-server/pkg/auth/store"
3028
)
3129

3230
const (
@@ -49,51 +47,39 @@ type PulsarContext struct {
4947

5048
// Session represents a Pulsar session
5149
type Session struct {
52-
Ctx PulsarContext
53-
Client pulsar.Client
54-
AdminClient cmdutils.Client
55-
AdminV3Client cmdutils.Client
56-
ClientOptions pulsar.ClientOptions
57-
mutex sync.RWMutex
58-
}
59-
60-
func init() {
61-
cmdutils.PulsarCtlConfig = &cmdutils.ClusterConfig{}
50+
Ctx PulsarContext
51+
Client pulsar.Client
52+
AdminClient cmdutils.Client
53+
AdminV3Client cmdutils.Client
54+
ClientOptions pulsar.ClientOptions
55+
PulsarCtlConfig *cmdutils.ClusterConfig
56+
mutex sync.RWMutex
6257
}
6358

6459
// NewSession creates a new Pulsar session with the given context
6560
// This function dynamically constructs clients without relying on global state
66-
func NewSession(ctx PulsarContext, issuer *auth.Issuer, tokenStore *store.Store) (*Session, error) {
61+
func NewSession(ctx PulsarContext) (*Session, error) {
6762
session := &Session{
6863
Ctx: ctx,
6964
}
7065

71-
if err := session.SetPulsarContext(); err != nil {
66+
if err := session.SetPulsarContext(ctx); err != nil {
7267
return nil, fmt.Errorf("failed to set pulsar context: %w", err)
7368
}
7469

7570
return session, nil
7671
}
7772

78-
func (s *Session) ChangeContext(ctx PulsarContext, issuer *auth.Issuer, tokenStore *store.Store) error {
73+
func (s *Session) SetPulsarContext(ctx PulsarContext) error {
7974
s.mutex.Lock()
8075
defer s.mutex.Unlock()
81-
8276
s.Ctx = ctx
83-
84-
return s.SetPulsarContext()
85-
}
86-
87-
func (s *Session) SetPulsarContext() error {
88-
s.mutex.Lock()
89-
defer s.mutex.Unlock()
90-
9177
pc := &s.Ctx
9278
var err error
9379
// Configure pulsarctl with the token
9480
switch {
9581
case pc.Token != "":
96-
cmdutils.PulsarCtlConfig = &cmdutils.ClusterConfig{
82+
s.PulsarCtlConfig = &cmdutils.ClusterConfig{
9783
WebServiceURL: pc.WebServiceURL,
9884
AuthPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken",
9985
AuthParams: fmt.Sprintf("token:%s", pc.Token),
@@ -117,7 +103,7 @@ func (s *Session) SetPulsarContext() error {
117103
TLSKeyFilePath: pc.TLSKeyFile,
118104
}
119105
case pc.AuthPlugin != "" && pc.AuthParams != "":
120-
cmdutils.PulsarCtlConfig = &cmdutils.ClusterConfig{
106+
s.PulsarCtlConfig = &cmdutils.ClusterConfig{
121107
WebServiceURL: pc.WebServiceURL,
122108
AuthPlugin: pc.AuthPlugin,
123109
AuthParams: pc.AuthParams,
@@ -145,7 +131,7 @@ func (s *Session) SetPulsarContext() error {
145131
}
146132
default:
147133
// No authentication provided
148-
cmdutils.PulsarCtlConfig = &cmdutils.ClusterConfig{
134+
s.PulsarCtlConfig = &cmdutils.ClusterConfig{
149135
WebServiceURL: pc.WebServiceURL,
150136
TLSAllowInsecureConnection: pc.TLSAllowInsecureConnection,
151137
TLSEnableHostnameVerification: pc.TLSEnableHostnameVerification,
@@ -167,8 +153,8 @@ func (s *Session) SetPulsarContext() error {
167153
}
168154
}
169155

170-
s.AdminClient = cmdutils.NewPulsarClient()
171-
s.AdminV3Client = cmdutils.NewPulsarClientWithAPIVersion(pulsaradminconfig.V3)
156+
s.AdminClient = s.PulsarCtlConfig.Client(pulsaradminconfig.V2)
157+
s.AdminV3Client = s.PulsarCtlConfig.Client(pulsaradminconfig.V3)
172158

173159
s.Client, err = pulsar.NewClient(s.ClientOptions)
174160
if err != nil {
@@ -182,7 +168,7 @@ func (s *Session) GetAdminClient() (cmdutils.Client, error) {
182168
s.mutex.RLock()
183169
defer s.mutex.RUnlock()
184170

185-
if cmdutils.PulsarCtlConfig.WebServiceURL == "" {
171+
if s.PulsarCtlConfig.WebServiceURL == "" {
186172
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
187173
}
188174
return s.AdminClient, nil
@@ -192,7 +178,7 @@ func (s *Session) GetAdminV3Client() (cmdutils.Client, error) {
192178
s.mutex.RLock()
193179
defer s.mutex.RUnlock()
194180

195-
if cmdutils.PulsarCtlConfig.WebServiceURL == "" {
181+
if s.PulsarCtlConfig.WebServiceURL == "" {
196182
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
197183
}
198184
return s.AdminV3Client, nil

0 commit comments

Comments
 (0)