Skip to content

Commit 4294781

Browse files
committed
Refactor SNCloud session management and enhance MCP server functionality
- Introduced a new `Session` struct to encapsulate SNCloud context and client management, improving thread safety with mutex locks. - Updated various functions to utilize the new session methods for obtaining SNCloud clients, enhancing code readability and maintainability. - Refactored MCP server initialization to support SNCloud sessions, including error handling improvements. - Removed deprecated context management code and replaced it with session-based approaches for better encapsulation. - Added comments for better understanding of the code structure and functionality.
1 parent 564bcb2 commit 4294781

File tree

14 files changed

+280
-238
lines changed

14 files changed

+280
-238
lines changed

pkg/cmd/mcp/mcp.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package mcp
1919

2020
import (
2121
"slices"
22-
"time"
2322

2423
"github.com/pkg/errors"
2524
"github.com/spf13/cobra"
@@ -73,20 +72,6 @@ func (o *ServerOptions) Complete() error {
7372
return errors.Wrap(err, "Unable to store the authorization data")
7473
}
7574

76-
err = config.InitSNCloudClient(
77-
issuer.IssuerEndpoint, issuer.Audience, o.KeyFile, o.Options.Server, 30*time.Second, o.Options.Store)
78-
if err != nil {
79-
return errors.Wrap(err, "failed to initialize StreamNative Cloud client")
80-
}
81-
82-
if o.Options.PulsarInstance != "" && o.Options.PulsarCluster != "" {
83-
err = mcp.SetContext(o.Options, o.Options.PulsarInstance, o.Options.PulsarCluster)
84-
if err != nil {
85-
mcp.ResetMcpContext()
86-
return errors.Wrap(err, "failed to set StreamNative Cloud context")
87-
}
88-
}
89-
9075
if len(o.Features) != 0 {
9176
requiredFeatures := []mcp.Feature{
9277
mcp.FeatureStreamNativeCloud,

pkg/cmd/mcp/server.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/mark3labs/mcp-go/server"
2626
"github.com/pkg/errors"
2727
"github.com/sirupsen/logrus"
28+
"github.com/streamnative/streamnative-mcp-server/pkg/config"
2829
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
2930
"github.com/streamnative/streamnative-mcp-server/pkg/mcp"
3031
"github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
@@ -43,17 +44,16 @@ func newMcpServer(ctx context.Context, configOpts *ServerOptions, logrusLogger *
4344
stdlog.Fatalf("failed to get user name: %v", err)
4445
os.Exit(1)
4546
}
46-
// sncloudClient, err := config.GetAPIClient()
47-
// if err != nil {
48-
// stdlog.Fatalf("failed to get SNCloud client: %v", err)
49-
// os.Exit(1)
50-
// }
51-
// sncloudLogClient, err := config.GetSNCloudLogClient()
52-
// if err != nil {
53-
// stdlog.Fatalf("failed to get SNCloud log client: %v", err)
54-
// os.Exit(1)
55-
// }
47+
// Create StreamNative Cloud session and set as default
48+
session, err := config.NewSNCloudSessionFromOptions(configOpts.Options)
49+
if err != nil {
50+
return nil, errors.Wrap(err, "failed to create StreamNative Cloud session")
51+
}
5652
mcpServer = mcp.NewServer("streamnative-mcp-server", "0.0.1", logrusLogger, server.WithInstructions(mcp.GetStreamNativeCloudServerInstructions(userName, snConfig)))
53+
mcpServer.SNCloudSession = session
54+
mcpServer.KafkaSession = &kafka.Session{}
55+
mcpServer.PulsarSession = &pulsar.Session{}
56+
5757
s = mcpServer.MCPServer
5858
mcp.RegisterPrompts(s)
5959
mcp.RegisterContextTools(s, configOpts.Features)

pkg/cmd/mcp/sse.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ func runSseServer(configOpts *ServerOptions) error {
7575
return fmt.Errorf("failed to create MCP server: %w", err)
7676
}
7777

78+
// 4. Set the context
79+
ctx = mcp.WithSNCloudSession(ctx, mcpServer.SNCloudSession)
80+
ctx = mcp.WithPulsarSession(ctx, mcpServer.PulsarSession)
81+
ctx = mcp.WithKafkaSession(ctx, mcpServer.KafkaSession)
82+
if configOpts.Options.KeyFile != "" {
83+
if configOpts.Options.PulsarInstance != "" && configOpts.Options.PulsarCluster != "" {
84+
err = mcp.SetContext(ctx, configOpts.Options, configOpts.Options.PulsarInstance, configOpts.Options.PulsarCluster)
85+
if err != nil {
86+
return errors.Wrap(err, "failed to set StreamNative Cloud context")
87+
}
88+
}
89+
}
90+
7891
// add Pulsar Functions as MCP tools
7992
// SSE is not support session-based tools, so we pass an empty sessionId
8093
mcpServer.PulsarFunctionManagedMcpTools(configOpts.ReadOnly, configOpts.Features, "")
@@ -86,6 +99,7 @@ func runSseServer(configOpts *ServerOptions) error {
8699
c := context.WithValue(ctx, common.OptionsKey, configOpts.Options)
87100
c = mcp.WithKafkaSession(c, mcpServer.KafkaSession)
88101
c = mcp.WithPulsarSession(c, mcpServer.PulsarSession)
102+
c = mcp.WithSNCloudSession(c, mcpServer.SNCloudSession)
89103
return c
90104
}),
91105
)

0 commit comments

Comments
 (0)