Skip to content

Commit 53b77a5

Browse files
authored
feat(mcp): add multi-session Pulsar support with auth middleware (#64)
1 parent ad4b79f commit 53b77a5

File tree

5 files changed

+148
-27
lines changed

5 files changed

+148
-27
lines changed

CLAUDE.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,24 @@ From sessions:
130130
1. **StreamNative Cloud**: `--organization` + `--key-file`
131131
2. **External Kafka**: `--use-external-kafka` + Kafka params
132132
3. **External Pulsar**: `--use-external-pulsar` + Pulsar params
133+
4. **Multi-Session Pulsar** (SSE only): `--use-external-pulsar` + `--multi-session-pulsar`
133134

134135
Pre-configured context: `--pulsar-instance` + `--pulsar-cluster` disables context management tools.
135136

137+
### Multi-Session Pulsar Mode
138+
139+
When `--multi-session-pulsar` is enabled (SSE server with external Pulsar only):
140+
141+
- **No global PulsarSession**: Each request must provide its own token via `Authorization: Bearer <token>` header
142+
- **HTTP 401 on auth failure**: Requests without valid tokens are rejected with HTTP 401 Unauthorized
143+
- **Per-user session caching**: Sessions are cached using LRU with configurable size and TTL
144+
- **Session management**: See `pkg/mcp/session/pulsar_session_manager.go`
145+
146+
Key files:
147+
- `pkg/cmd/mcp/sse.go` - Auth middleware wraps SSEHandler()/MessageHandler()
148+
- `pkg/mcp/session/pulsar_session_manager.go` - LRU session cache with TTL cleanup
149+
- `pkg/cmd/mcp/server.go` - Skips global PulsarSession when multi-session enabled
150+
136151
## Error Handling
137152

138153
- Wrap errors: `fmt.Errorf("failed to X: %w", err)`

README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,41 @@ snmcp sse --http-addr :9090 --http-path /mcp --use-external-pulsar --pulsar-web-
124124
docker run -i --rm -e SNMCP_ORGANIZATION=my-org -e SNMCP_KEY_FILE=/key.json -v /path/to/key-file.json:/key.json -p 9090:9090 streamnative/snmcp sse
125125
```
126126

127+
#### Multi-Session Pulsar Mode (SSE only)
128+
129+
When running the SSE server with external Pulsar, you can enable **multi-session mode** to support per-user authentication. In this mode, each HTTP request must include an `Authorization: Bearer <token>` header, and the server will create separate Pulsar sessions for each unique token.
130+
131+
```bash
132+
# Start SSE server with multi-session Pulsar mode
133+
snmcp sse --http-addr :9090 --http-path /mcp \
134+
--use-external-pulsar \
135+
--pulsar-web-service-url http://pulsar.example.com:8080 \
136+
--multi-session-pulsar \
137+
--session-cache-size 100 \
138+
--session-ttl-minutes 30
139+
```
140+
141+
**Key features:**
142+
- **Per-user sessions**: Each user's Pulsar token creates a separate session
143+
- **LRU caching**: Sessions are cached with LRU eviction when the cache is full
144+
- **TTL-based cleanup**: Idle sessions are automatically cleaned up after the configured TTL
145+
- **Strict authentication**: Requests without a valid `Authorization` header receive HTTP 401 Unauthorized
146+
147+
**Authentication flow:**
148+
1. Client connects to SSE endpoint with `Authorization: Bearer <pulsar-jwt-token>` header
149+
2. Server validates the token by attempting to create a Pulsar session
150+
3. If valid, the session is cached and reused for subsequent requests
151+
4. If invalid or missing, server returns HTTP 401 Unauthorized
152+
153+
**Configuration options:**
154+
| Flag | Default | Description |
155+
|------|---------|-------------|
156+
| `--multi-session-pulsar` | `false` | Enable per-user Pulsar sessions |
157+
| `--session-cache-size` | `100` | Maximum number of cached sessions |
158+
| `--session-ttl-minutes` | `30` | Session idle timeout before eviction |
159+
160+
> **Note:** Multi-session mode is only available for external Pulsar mode (`--use-external-pulsar`) and only works with the SSE server, not stdio.
161+
127162
### Command-line Options
128163

129164
```
@@ -175,6 +210,9 @@ Flags:
175210
--use-external-pulsar Use external Pulsar
176211
--http-addr string HTTP server address (default ":9090")
177212
--http-path string HTTP server path for SSE endpoint (default "/mcp")
213+
--multi-session-pulsar Enable per-user Pulsar sessions based on Authorization header tokens (only for external Pulsar mode)
214+
--session-cache-size int Maximum number of cached Pulsar sessions when multi-session is enabled (default 100)
215+
--session-ttl-minutes int Session TTL in minutes before eviction when multi-session is enabled (default 30)
178216
-v, --version version for snmcp
179217
```
180218

pkg/cmd/mcp/server.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,24 +83,29 @@ func newMcpServer(_ context.Context, configOpts *ServerOptions, logrusLogger *lo
8383
}
8484
case snConfig.ExternalPulsar != nil:
8585
{
86-
psession, err := pulsar.NewSession(pulsar.PulsarContext{
87-
ServiceURL: snConfig.ExternalPulsar.ServiceURL,
88-
WebServiceURL: snConfig.ExternalPulsar.WebServiceURL,
89-
AuthPlugin: snConfig.ExternalPulsar.AuthPlugin,
90-
AuthParams: snConfig.ExternalPulsar.AuthParams,
91-
Token: snConfig.ExternalPulsar.Token,
92-
TLSAllowInsecureConnection: snConfig.ExternalPulsar.TLSAllowInsecureConnection,
93-
TLSEnableHostnameVerification: snConfig.ExternalPulsar.TLSEnableHostnameVerification,
94-
TLSTrustCertsFilePath: snConfig.ExternalPulsar.TLSTrustCertsFilePath,
95-
TLSCertFile: snConfig.ExternalPulsar.TLSCertFile,
96-
TLSKeyFile: snConfig.ExternalPulsar.TLSKeyFile,
97-
})
98-
if err != nil {
99-
return nil, errors.Wrap(err, "failed to set external Pulsar context")
100-
}
10186
mcpServer = mcp.NewServer("streamnative-mcp-server", "0.0.1", logrusLogger, server.WithInstructions(mcp.GetExternalPulsarServerInstructions(snConfig.ExternalPulsar.WebServiceURL)))
102-
mcpServer.PulsarSession = psession
10387
s = mcpServer.MCPServer
88+
89+
// Only create global PulsarSession if not in multi-session mode
90+
// In multi-session mode, each request must provide its own token via Authorization header
91+
if !configOpts.MultiSessionPulsar {
92+
psession, err := pulsar.NewSession(pulsar.PulsarContext{
93+
ServiceURL: snConfig.ExternalPulsar.ServiceURL,
94+
WebServiceURL: snConfig.ExternalPulsar.WebServiceURL,
95+
AuthPlugin: snConfig.ExternalPulsar.AuthPlugin,
96+
AuthParams: snConfig.ExternalPulsar.AuthParams,
97+
Token: snConfig.ExternalPulsar.Token,
98+
TLSAllowInsecureConnection: snConfig.ExternalPulsar.TLSAllowInsecureConnection,
99+
TLSEnableHostnameVerification: snConfig.ExternalPulsar.TLSEnableHostnameVerification,
100+
TLSTrustCertsFilePath: snConfig.ExternalPulsar.TLSTrustCertsFilePath,
101+
TLSCertFile: snConfig.ExternalPulsar.TLSCertFile,
102+
TLSKeyFile: snConfig.ExternalPulsar.TLSKeyFile,
103+
})
104+
if err != nil {
105+
return nil, errors.Wrap(err, "failed to set external Pulsar context")
106+
}
107+
mcpServer.PulsarSession = psession
108+
}
104109
}
105110
default:
106111
{

pkg/cmd/mcp/sse.go

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ func runSseServer(configOpts *ServerOptions) error {
110110
TLSKeyFile: snConfig.ExternalPulsar.TLSKeyFile,
111111
},
112112
}
113-
pulsarSessionManager = session.NewPulsarSessionManager(managerConfig, mcpServer.PulsarSession, logger)
113+
// Pass nil as globalSession - in multi-session mode, every request must have a valid token
114+
pulsarSessionManager = session.NewPulsarSessionManager(managerConfig, nil, logger)
114115
logger.Info("Multi-session Pulsar mode enabled")
115116
}
116117

@@ -125,14 +126,16 @@ func runSseServer(configOpts *ServerOptions) error {
125126
// Handle per-user Pulsar sessions
126127
if pulsarSessionManager != nil {
127128
token := session.ExtractBearerToken(r)
129+
// Token is already validated in auth middleware, this should always succeed
128130
if pulsarSession, err := pulsarSessionManager.GetOrCreateSession(ctx, token); err == nil {
129131
c = context2.WithPulsarSession(c, pulsarSession)
130132
if token != "" {
131133
c = session.WithUserTokenHash(c, pulsarSessionManager.HashTokenForLog(token))
132134
}
133135
} else {
134-
logger.WithError(err).Warn("Failed to get per-user Pulsar session, using global")
135-
c = context2.WithPulsarSession(c, mcpServer.PulsarSession)
136+
// Should not happen since middleware validates token first
137+
logger.WithError(err).Error("Unexpected auth error after middleware validation")
138+
// Don't set PulsarSession - tool handlers will fail gracefully with "session not found"
136139
}
137140
} else {
138141
c = context2.WithPulsarSession(c, mcpServer.PulsarSession)
@@ -144,16 +147,62 @@ func runSseServer(configOpts *ServerOptions) error {
144147

145148
// 4. Expose the full SSE URL to the user
146149
ssePath := sseServer.CompleteSsePath()
150+
msgPath := sseServer.CompleteMessagePath()
147151
fmt.Fprintf(os.Stderr, "StreamNative Cloud MCP Server listening on http://%s%s\n",
148152
configOpts.HTTPAddr, ssePath)
149153

150154
// 5. Run the HTTP listener in a goroutine
151155
errCh := make(chan error, 1)
152-
go func() {
153-
if err := sseServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
154-
errCh <- err // bubble up real crashes
156+
var httpServer *http.Server
157+
158+
if pulsarSessionManager != nil {
159+
// Multi-session mode: use custom handlers with auth middleware
160+
mux := http.NewServeMux()
161+
162+
// Auth middleware wrapper that validates token before processing
163+
authMiddleware := func(next http.Handler) http.Handler {
164+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
165+
token := session.ExtractBearerToken(r)
166+
if token == "" {
167+
w.Header().Set("Content-Type", "application/json")
168+
http.Error(w, `{"error":"missing Authorization header"}`, http.StatusUnauthorized)
169+
return
170+
}
171+
// Pre-validate token by attempting to get/create session
172+
if _, err := pulsarSessionManager.GetOrCreateSession(r.Context(), token); err != nil {
173+
logger.WithError(err).Warn("Authentication failed")
174+
w.Header().Set("Content-Type", "application/json")
175+
http.Error(w, `{"error":"authentication failed"}`, http.StatusUnauthorized)
176+
return
177+
}
178+
next.ServeHTTP(w, r)
179+
})
155180
}
156-
}()
181+
182+
// Mount handlers with auth middleware
183+
mux.Handle(ssePath, authMiddleware(sseServer.SSEHandler()))
184+
mux.Handle(msgPath, authMiddleware(sseServer.MessageHandler()))
185+
186+
// Start custom HTTP server
187+
httpServer = &http.Server{
188+
Addr: configOpts.HTTPAddr,
189+
Handler: mux,
190+
ReadHeaderTimeout: 10 * time.Second, // Prevent Slowloris attacks
191+
}
192+
go func() {
193+
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
194+
errCh <- err
195+
}
196+
}()
197+
logger.Info("SSE server started with authentication middleware")
198+
} else {
199+
// Non-multi-session mode: use default Start()
200+
go func() {
201+
if err := sseServer.Start(configOpts.HTTPAddr); err != nil && !errors.Is(err, http.ErrServerClosed) {
202+
errCh <- err // bubble up real crashes
203+
}
204+
}()
205+
}
157206

158207
// Give the server a moment to start
159208
time.Sleep(100 * time.Millisecond)
@@ -177,10 +226,20 @@ func runSseServer(configOpts *ServerOptions) error {
177226
pulsarSessionManager.Stop()
178227
}
179228

180-
// First try to shut down the SSE server
181-
if err := sseServer.Shutdown(shCtx); err != nil {
182-
if !errors.Is(err, http.ErrServerClosed) {
183-
logger.Errorf("Error shutting down SSE server: %v", err)
229+
// Shut down the HTTP server
230+
if httpServer != nil {
231+
// Multi-session mode: shut down custom HTTP server
232+
if err := httpServer.Shutdown(shCtx); err != nil {
233+
if !errors.Is(err, http.ErrServerClosed) {
234+
logger.Errorf("Error shutting down HTTP server: %v", err)
235+
}
236+
}
237+
} else {
238+
// Non-multi-session mode: shut down SSE server
239+
if err := sseServer.Shutdown(shCtx); err != nil {
240+
if !errors.Is(err, http.ErrServerClosed) {
241+
logger.Errorf("Error shutting down SSE server: %v", err)
242+
}
184243
}
185244
}
186245

pkg/mcp/session/pulsar_session_manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ func NewPulsarSessionManager(
9999
func (m *PulsarSessionManager) GetOrCreateSession(_ context.Context, token string) (*pulsar.Session, error) {
100100
if token == "" {
101101
// Return global session when no token provided
102+
// If no global session exists (multi-session mode), return error
103+
if m.globalSession == nil {
104+
return nil, fmt.Errorf("authentication required: missing Authorization header")
105+
}
102106
return m.globalSession, nil
103107
}
104108

0 commit comments

Comments
 (0)