Skip to content

Commit 6ad41a2

Browse files
committed
Enhance session management and improve MCP server functionality
- Updated `PulsarFunctionManagedMcpTools` to use a fixed `sessionId` instead of an empty string, ensuring proper session handling. - Refactored API and log client retrieval methods in the `Session` struct to utilize `sync.Once`, improving thread safety and performance. - Added validation for `sessionID` in `PulsarFunctionManagedMcpTools` to prevent processing with empty session IDs. - Enhanced context retrieval functions to safely return empty strings when context values are not present, improving robustness. - Updated function names to follow consistent naming conventions, enhancing code readability and maintainability.
1 parent 2124b60 commit 6ad41a2

File tree

5 files changed

+46
-47
lines changed

5 files changed

+46
-47
lines changed

pkg/cmd/mcp/sse.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ func runSseServer(configOpts *ServerOptions) error {
8989
}
9090

9191
// add Pulsar Functions as MCP tools
92-
// SSE is not support session-based tools, so we pass an empty sessionId
93-
mcpServer.PulsarFunctionManagedMcpTools(configOpts.ReadOnly, configOpts.Features, "")
92+
// SSE is not support session-based tools, so we pass an fixed sessionId
93+
mcpServer.PulsarFunctionManagedMcpTools(configOpts.ReadOnly, configOpts.Features, "FIXED_SESSION_ID")
9494

9595
sseServer := server.NewSSEServer(
9696
mcpServer.MCPServer,

pkg/config/apiclient.go

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type Session struct {
5555
TokenRefresher *OAuth2TokenRefresher
5656
Configuration *sncloud.Configuration
5757
mutex sync.RWMutex
58+
apiClientOnce sync.Once
59+
logClientOnce sync.Once
5860
}
5961

6062
// OAuth2TokenRefresher implements oauth2.TokenSource interface for refreshing OAuth2 tokens
@@ -182,27 +184,15 @@ func (s *Session) initializeTokenRefresher() error {
182184

183185
// GetAPIClient returns the API client for the session, initializing it if necessary
184186
func (s *Session) GetAPIClient() (*sncloud.APIClient, error) {
185-
s.mutex.RLock()
186-
if s.APIClient != nil {
187-
defer s.mutex.RUnlock()
188-
return s.APIClient, nil
189-
}
190-
s.mutex.RUnlock()
191-
192-
// Need to initialize the client
193-
s.mutex.Lock()
194-
defer s.mutex.Unlock()
195-
196-
// Double check after acquiring write lock
197-
if s.APIClient != nil {
198-
return s.APIClient, nil
199-
}
200-
201-
// Initialize the API client
202-
if err := s.initializeAPIClient(); err != nil {
187+
var err error
188+
s.apiClientOnce.Do(func() {
189+
err = s.initializeAPIClient()
190+
})
191+
192+
if err != nil {
203193
return nil, errors.Wrap(err, "failed to initialize API client")
204194
}
205-
195+
206196
return s.APIClient, nil
207197
}
208198

@@ -245,27 +235,15 @@ func (s *Session) initializeAPIClient() error {
245235

246236
// GetLogClient returns the log client for the session, initializing it if necessary
247237
func (s *Session) GetLogClient() (*http.Client, error) {
248-
s.mutex.RLock()
249-
if s.LogClient != nil {
250-
defer s.mutex.RUnlock()
251-
return s.LogClient, nil
252-
}
253-
s.mutex.RUnlock()
254-
255-
// Need to initialize the client
256-
s.mutex.Lock()
257-
defer s.mutex.Unlock()
258-
259-
// Double check after acquiring write lock
260-
if s.LogClient != nil {
261-
return s.LogClient, nil
262-
}
263-
264-
// Initialize the log client
265-
if err := s.initializeLogClient(); err != nil {
238+
var err error
239+
s.logClientOnce.Do(func() {
240+
err = s.initializeLogClient()
241+
})
242+
243+
if err != nil {
266244
return nil, errors.Wrap(err, "failed to initialize log client")
267245
}
268-
246+
269247
return s.LogClient, nil
270248
}
271249

pkg/mcp/ctx.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,32 @@ func WithKafkaSession(ctx context.Context, session *kafka.Session) context.Conte
6868

6969
// GetSNCloudOrganization gets the SNCloud organization from the context
7070
func GetSNCloudOrganization(ctx context.Context) string {
71-
return ctx.Value(SNCloudOrganizationContextKey).(string)
71+
if val := ctx.Value(SNCloudOrganizationContextKey); val != nil {
72+
if str, ok := val.(string); ok {
73+
return str
74+
}
75+
}
76+
return ""
7277
}
7378

7479
// GetSNCloudInstance gets the SNCloud instance from the context
7580
func GetSNCloudInstance(ctx context.Context) string {
76-
return ctx.Value(SNCloudInstanceContextKey).(string)
81+
if val := ctx.Value(SNCloudInstanceContextKey); val != nil {
82+
if str, ok := val.(string); ok {
83+
return str
84+
}
85+
}
86+
return ""
7787
}
7888

7989
// GetSNCloudCluster gets the SNCloud cluster from the context
8090
func GetSNCloudCluster(ctx context.Context) string {
81-
return ctx.Value(SNCloudClusterContextKey).(string)
91+
if val := ctx.Value(SNCloudClusterContextKey); val != nil {
92+
if str, ok := val.(string); ok {
93+
return str
94+
}
95+
}
96+
return ""
8297
}
8398

8499
// GetSNCloudSession gets the SNCloud session from the context

pkg/mcp/pulsar_functions_as_tools.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
5858
return
5959
}
6060

61+
// Validate sessionID
62+
if sessionID == "" {
63+
log.Printf("Skipping Pulsar Functions as MCP Tools because sessionID is empty")
64+
return
65+
}
66+
6167
options := pftools.DefaultManagerOptions()
6268

6369
if s.SNCloudSession.Ctx.Organization == "" || s.SNCloudSession.Ctx.PulsarInstance == "" || s.SNCloudSession.Ctx.PulsarCluster == "" {

pkg/mcp/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ type Server struct {
3535

3636
func NewServer(name, version string, logger *logrus.Logger, opts ...server.ServerOption) *Server {
3737
// Create a new MCP server
38-
opts = addOpts(opts...)
38+
opts = AddOpts(opts...)
3939
s := server.NewMCPServer(name, version, opts...)
40-
mcpserver := createSNCloudMCPServer(s, logger)
40+
mcpserver := CreateSNCloudMCPServer(s, logger)
4141
return mcpserver
4242
}
4343

44-
func addOpts(opts ...server.ServerOption) []server.ServerOption {
44+
func AddOpts(opts ...server.ServerOption) []server.ServerOption {
4545
defaultOpts := []server.ServerOption{
4646
server.WithResourceCapabilities(true, true),
4747
server.WithRecovery(),
@@ -51,7 +51,7 @@ func addOpts(opts ...server.ServerOption) []server.ServerOption {
5151
return opts
5252
}
5353

54-
func createSNCloudMCPServer(s *server.MCPServer, logger *logrus.Logger) *Server {
54+
func CreateSNCloudMCPServer(s *server.MCPServer, logger *logrus.Logger) *Server {
5555
mcpserver := &Server{
5656
MCPServer: s,
5757
logger: logger,

0 commit comments

Comments
 (0)