Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions mcp/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ func NewSSEHandler(getServer func(request *http.Request) *Server) *SSEHandler {
// [SSEServerTransport.ServeHTTP].
// - Close terminates the hanging GET.
type SSEServerTransport struct {
endpoint string
incoming chan JSONRPCMessage // queue of incoming messages; never closed
endpoint string
sessionID string
incoming chan JSONRPCMessage // queue of incoming messages; never closed

// We must guard both pushes to the incoming queue and writes to the response
// writer, because incoming POST requests are arbitrarily concurrent and we
Expand Down Expand Up @@ -228,6 +229,7 @@ func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

transport := NewSSEServerTransport(endpoint.RequestURI(), w)
transport.sessionID = sessionID

// The session is terminated when the request exits.
h.mu.Lock()
Expand Down Expand Up @@ -263,8 +265,7 @@ type sseServerConn struct {
t *SSEServerTransport
}

// TODO(jba): get the session ID. (Not urgent because SSE transports have been removed from the spec.)
func (s sseServerConn) SessionID() string { return "" }
func (s sseServerConn) SessionID() string { return s.t.sessionID }

// Read implements jsonrpc2.Reader.
func (s sseServerConn) Read(ctx context.Context) (JSONRPCMessage, error) {
Expand Down Expand Up @@ -393,6 +394,7 @@ func (c *SSEClientTransport) Connect(ctx context.Context) (Connection, error) {
s := &sseClientConn{
sseEndpoint: c.sseEndpoint,
msgEndpoint: msgEndpoint,
sessionID: msgEndpoint.Query().Get("sessionid"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is, unfortunately, not part of the spec.
Why do we want to have session IDs for the SSE connection? In theory, there is no way for the client to actually know its session id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was requested in #98, and I obliged.
We could also say that SSE is unsupported, and drop this.

Copy link

@tylerwilliams tylerwilliams Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 I filed this bug.

The part of the spec that led me down this session ID path was this: https://modelcontextprotocol.io/docs/concepts/transports#session-management.

Maybe there's another way to approach this (setting or grabbing the header manually) that I should look into?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see SSE is deprecated https://modelcontextprotocol.io/docs/concepts/transports#server-sent-events-sse-deprecated so I think this is my bad and I should just be using mcp.NewStreamableHTTPHandler instead of mcp.NewSSEHandler. Sorry for the noise :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! Closing this PR.

incoming: make(chan []byte, 100),
body: resp.Body,
done: make(chan struct{}),
Expand Down Expand Up @@ -511,8 +513,9 @@ func scanEvents(r io.Reader) iter.Seq2[event, error] {
// - Reads are SSE 'message' events, and pushes them onto a buffered channel.
// - Close terminates the GET request.
type sseClientConn struct {
sseEndpoint *url.URL // SSE endpoint for the GET
msgEndpoint *url.URL // session endpoint for POSTs
sseEndpoint *url.URL // SSE endpoint for the GET
msgEndpoint *url.URL // session endpoint for POSTs
sessionID string
incoming chan []byte // queue of incoming messages

mu sync.Mutex
Expand All @@ -521,8 +524,7 @@ type sseClientConn struct {
done chan struct{} // closed when the stream is closed
}

// TODO(jba): get the session ID. (Not urgent because SSE transports have been removed from the spec.)
func (c *sseClientConn) SessionID() string { return "" }
func (c *sseClientConn) SessionID() string { return c.sessionID }

func (c *sseClientConn) isDone() bool {
c.mu.Lock()
Expand Down
11 changes: 9 additions & 2 deletions mcp/sse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ func TestSSEServer(t *testing.T) {
sseHandler := NewSSEHandler(func(*http.Request) *Server { return server })

conns := make(chan *ServerSession, 1)
sseHandler.onConnection = func(cc *ServerSession) {
sseHandler.onConnection = func(ss *ServerSession) {
if ss.ID() == "" {
t.Error("ServerSession has empty session ID")
}
select {
case conns <- cc:
case conns <- ss:
default:
}
}
Expand All @@ -41,6 +44,10 @@ func TestSSEServer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if cs.ID() == "" {
t.Error("ClientSession has empty ID")
}

if err := cs.Ping(ctx, nil); err != nil {
t.Fatal(err)
}
Expand Down
Loading