Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 100 additions & 90 deletions mcp/streamable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,44 @@ func (c *streamableClientConn) sessionUpdated(state clientSessionState) {
// § 2.5: A server using the Streamable HTTP transport MAY assign a session
// ID at initialization time, by including it in an Mcp-Session-Id header
// on the HTTP response containing the InitializeResult.
go c.handleSSE("standalone SSE stream", nil, true, nil)
c.connectStandaloneSSE()
}

func (c *streamableClientConn) connectStandaloneSSE() {
resp, err := c.connectSSE("")
if err != nil {
c.fail(fmt.Errorf("standalone SSE request failed (session ID: %v): %v", c.sessionID, err))
return
}

// [§2.2.3]: "The server MUST either return Content-Type:
// text/event-stream in response to this HTTP GET, or else return HTTP
// 405 Method Not Allowed, indicating that the server does not offer an
// SSE stream at this endpoint."
//
// [§2.2.3]: https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#listening-for-messages-from-the-server
if resp.StatusCode == http.StatusMethodNotAllowed {
// The server doesn't support the standalone SSE stream.
resp.Body.Close()
return
}
if resp.StatusCode == http.StatusNotFound && !c.strict {
// modelcontextprotocol/gosdk#393: some servers return NotFound instead
// of MethodNotAllowed for the standalone SSE stream.
//
// Treat this like MethodNotAllowed in non-strict mode.
if c.logger != nil {
c.logger.Warn("got 404 instead of 405 for standalone SSE stream")
}
resp.Body.Close()
return
}
summary := "standalone SSE stream"
if err := c.checkResponse(summary, resp); err != nil {
c.fail(err)
return
}
go c.handleSSE(summary, resp, true, nil)
}

// fail handles an asynchronous error while reading.
Expand Down Expand Up @@ -1434,22 +1471,10 @@ func (c *streamableClientConn) Write(ctx context.Context, msg jsonrpc.Message) e
return fmt.Errorf("%s: %v", requestSummary, err)
}

// §2.5.3: "The server MAY terminate the session at any time, after
// which it MUST respond to requests containing that session ID with HTTP
// 404 Not Found."
if resp.StatusCode == http.StatusNotFound {
// Fail the session immediately, rather than relying on jsonrpc2 to fail
// (and close) it, because we want the call to Close to know that this
// session is missing (and therefore not send the DELETE).
err := fmt.Errorf("%s: failed to send: %w", requestSummary, errSessionMissing)
if err := c.checkResponse(requestSummary, resp); err != nil {
c.fail(err)
resp.Body.Close()
return err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
resp.Body.Close()
return fmt.Errorf("broken session: %v", resp.Status)
}

if sessionID := resp.Header.Get(sessionIDHeader); sessionID != "" {
c.mu.Lock()
Expand All @@ -1463,6 +1488,8 @@ func (c *streamableClientConn) Write(ctx context.Context, msg jsonrpc.Message) e
return fmt.Errorf("mismatching session IDs %q and %q", hadSessionID, sessionID)
}
}
// TODO(rfindley): this logic isn't quite right.
// We should keep going even if the server returns 202, if we have a call.
if resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusAccepted {
// [§2.1.4]: "If the input is a JSON-RPC response or notification:
// If the server accepts the input, the server MUST return HTTP status code 202 Accepted with no body."
Expand Down Expand Up @@ -1543,73 +1570,63 @@ func (c *streamableClientConn) handleJSON(requestSummary string, resp *http.Resp
//
// If forCall is set, it is the call that initiated the stream, and the
// stream is complete when we receive its response.
func (c *streamableClientConn) handleSSE(requestSummary string, initialResp *http.Response, persistent bool, forCall *jsonrpc2.Request) {
resp := initialResp
var lastEventID string
func (c *streamableClientConn) handleSSE(requestSummary string, resp *http.Response, persistent bool, forCall *jsonrpc2.Request) {
for {
// Connection was successful. Continue the loop with the new response.
// TODO: we should set a reasonable limit on the number of times we'll try
// getting a response for a given request.
//
// Eventually, if we don't get the response, we should stop trying and
// fail the request.
if resp != nil {
eventID, clientClosed := c.processStream(requestSummary, resp, forCall)
lastEventID = eventID
lastEventID, clientClosed := c.processStream(requestSummary, resp, forCall)

// If the connection was closed by the client, we're done.
if clientClosed {
return
}
// If the stream has ended, then do not reconnect if the stream is
// temporary (POST initiated SSE).
if lastEventID == "" && !persistent {
return
}
// If the connection was closed by the client, we're done.
if clientClosed {
return
}
// If the stream has ended, then do not reconnect if the stream is
// temporary (POST initiated SSE).
if lastEventID == "" && !persistent {
return
}

// The stream was interrupted or ended by the server. Attempt to reconnect.
newResp, err := c.reconnect(lastEventID)
newResp, err := c.connectSSE(lastEventID)
if err != nil {
// All reconnection attempts failed: fail the connection.
c.fail(fmt.Errorf("%s: failed to reconnect (session ID: %v): %v", requestSummary, c.sessionID, err))
return
}
resp = newResp
if resp.StatusCode == http.StatusMethodNotAllowed && persistent {
// [§2.2.3]: "The server MUST either return Content-Type:
// text/event-stream in response to this HTTP GET, or else return HTTP
// 405 Method Not Allowed, indicating that the server does not offer an
// SSE stream at this endpoint."
//
// [§2.2.3]: https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#listening-for-messages-from-the-server

// The server doesn't support the standalone SSE stream.
resp.Body.Close()
return
}
if resp.StatusCode == http.StatusNotFound && persistent && !c.strict {
// modelcontextprotocol/gosdk#393: some servers return NotFound instead
// of MethodNotAllowed for the standalone SSE stream.
//
// Treat this like MethodNotAllowed in non-strict mode.
if c.logger != nil {
c.logger.Warn("got 404 instead of 405 for standalonw SSE stream")
}
resp.Body.Close()
return
}
// (see equivalent handling in [streamableClientConn.Write]).
if resp.StatusCode == http.StatusNotFound {
c.fail(fmt.Errorf("%s: failed to reconnect (session ID: %v): %w", requestSummary, c.sessionID, errSessionMissing))
if err := c.checkResponse(requestSummary, resp); err != nil {
c.fail(err)
return
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
}
}

// checkResponse checks the status code of the provided response, and
// translates it into an error if the request was unsuccessful.
//
// The response body is close if a non-nil error is returned.
func (c *streamableClientConn) checkResponse(requestSummary string, resp *http.Response) (err error) {
defer func() {
if err != nil {
resp.Body.Close()
c.fail(fmt.Errorf("%s: failed to reconnect: %v", requestSummary, http.StatusText(resp.StatusCode)))
return
}
// Reconnection was successful. Continue the loop with the new response.
}()
// §2.5.3: "The server MAY terminate the session at any time, after
// which it MUST respond to requests containing that session ID with HTTP
// 404 Not Found."
if resp.StatusCode == http.StatusNotFound {
// Return an errSessionMissing to avoid sending a redundant DELETE when the
// session is already gone.
return fmt.Errorf("%s: failed to connect (session ID: %v): %w", requestSummary, c.sessionID, errSessionMissing)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("%s: failed to connect: %v", requestSummary, http.StatusText(resp.StatusCode))
}
return nil
}

// processStream reads from a single response body, sending events to the
Expand All @@ -1620,6 +1637,7 @@ func (c *streamableClientConn) processStream(requestSummary string, resp *http.R
defer resp.Body.Close()
for evt, err := range scanEvents(resp.Body) {
if err != nil {
// TODO: we should differentiate EOF from other errors here.
break
}

Expand Down Expand Up @@ -1664,39 +1682,48 @@ func (c *streamableClientConn) processStream(requestSummary string, resp *http.R
return lastEventID, false
}

// reconnect handles the logic of retrying a connection with an exponential
// backoff strategy. It returns a new, valid HTTP response if successful, or
// an error if all retries are exhausted.
func (c *streamableClientConn) reconnect(lastEventID string) (*http.Response, error) {
// connectSSE handles the logic of connecting a text/event-stream connection.
//
// If lastEventID is set, it is the last-event ID of a stream being resumed.
//
// If connection fails, connectSSE retries with an exponential backoff
// strategy. It returns a new, valid HTTP response if successful, or an error
// if all retries are exhausted.
func (c *streamableClientConn) connectSSE(lastEventID string) (*http.Response, error) {
var finalErr error

// We can reach the 'reconnect' path through the standlone SSE request, in which case
// lastEventID will be "".
//
// In this case, we need an initial attempt.
// If lastEventID is set, we've already connected successfully once, so
// consider that to be the first attempt.
attempt := 0
if lastEventID != "" {
attempt = 1
}

for ; attempt <= c.maxRetries; attempt++ {
select {
case <-c.done:
return nil, fmt.Errorf("connection closed by client during reconnect")
case <-time.After(calculateReconnectDelay(attempt)):
resp, err := c.establishSSE(lastEventID)
req, err := http.NewRequestWithContext(c.ctx, http.MethodGet, c.url, nil)
if err != nil {
return nil, err
}
c.setMCPHeaders(req)
if lastEventID != "" {
req.Header.Set("Last-Event-ID", lastEventID)
}
req.Header.Set("Accept", "text/event-stream")
resp, err := c.client.Do(req)
if err != nil {
finalErr = err // Store the error and try again.
continue
}
return resp, nil
}
}
// If the loop completes, all retries have failed.
// If the loop completes, all retries have failed, or the client is closing.
if finalErr != nil {
return nil, fmt.Errorf("connection failed after %d attempts: %w", c.maxRetries, finalErr)
}
return nil, fmt.Errorf("connection failed after %d attempts", c.maxRetries)
return nil, fmt.Errorf("connection aborted after %d attempts", c.maxRetries)
}

// Close implements the [Connection] interface.
Expand All @@ -1723,23 +1750,6 @@ func (c *streamableClientConn) Close() error {
return c.closeErr
}

// establishSSE establishes the persistent SSE listening stream.
// It is used for reconnect attempts using the Last-Event-ID header to
// resume a broken stream where it left off.
func (c *streamableClientConn) establishSSE(lastEventID string) (*http.Response, error) {
req, err := http.NewRequestWithContext(c.ctx, http.MethodGet, c.url, nil)
if err != nil {
return nil, err
}
c.setMCPHeaders(req)
if lastEventID != "" {
req.Header.Set("Last-Event-ID", lastEventID)
}
req.Header.Set("Accept", "text/event-stream")

return c.client.Do(req)
}

// calculateReconnectDelay calculates a delay using exponential backoff with full jitter.
func calculateReconnectDelay(attempt int) time.Duration {
if attempt == 0 {
Expand Down
Loading