Skip to content

Commit bae0929

Browse files
authored
mcp: address review comments on streamable cleanup (#716)
Follow up on some post-submit comments from jba.
1 parent b3399e6 commit bae0929

File tree

1 file changed

+24
-18
lines changed

1 file changed

+24
-18
lines changed

mcp/streamable.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -627,12 +627,15 @@ type stream struct {
627627

628628
// If pendingJSONMessages is non-nil, this is a JSON stream and messages are
629629
// collected here until the stream is complete, at which point they are
630-
// flushed as a single JSON response.
630+
// flushed as a single JSON response. Note that the non-nilness of this field
631+
// is significant, as it signals the expected content type.
632+
//
633+
// Note: if we remove support for batching, this could just be a bool.
631634
pendingJSONMessages []json.RawMessage
632635

633636
// w is the HTTP response writer for this stream. A non-nil w indicates
634-
// that the stream is claimed by an HTTP request; it is set to nil when
635-
// the request completes.
637+
// that the stream is claimed by an HTTP request (the hanging POST or GET);
638+
// it is set to nil when the request completes.
636639
w http.ResponseWriter
637640

638641
// done is closed to release the hanging HTTP request.
@@ -646,9 +649,8 @@ type stream struct {
646649
// It starts at -1 since indices start at 0.
647650
lastIdx int
648651

649-
// supportsPrimeClose indicates whether the client supports prime and close
650-
// events (protocol version 2025-11-25 or later).
651-
supportsPrimeClose bool
652+
// protocolVersion is the protocol version for this stream.
653+
protocolVersion string
652654

653655
// requests is the set of unanswered incoming requests for the stream.
654656
//
@@ -659,8 +661,8 @@ type stream struct {
659661
requests map[jsonrpc.ID]struct{}
660662
}
661663

662-
// close sends a 'close' event to the client (if supportsPrimeClose is true and
663-
// reconnectAfter > 0) and closes the done channel.
664+
// close sends a 'close' event to the client (if protocolVersion >= 2025-11-25
665+
// and reconnectAfter > 0) and closes the done channel.
664666
//
665667
// The done channel is set to nil after closing, so that done != nil implies
666668
// the stream is active and done is open. This simplifies checks elsewhere.
@@ -670,7 +672,7 @@ func (s *stream) close(reconnectAfter time.Duration) {
670672
if s.done == nil {
671673
return // stream not connected or already closed
672674
}
673-
if s.supportsPrimeClose && reconnectAfter > 0 {
675+
if s.protocolVersion >= protocolVersion20251125 && reconnectAfter > 0 {
674676
reconnectStr := strconv.FormatInt(reconnectAfter.Milliseconds(), 10)
675677
if _, err := writeEvent(s.w, Event{
676678
Name: "close",
@@ -704,9 +706,13 @@ func (s *stream) release() {
704706
//
705707
// s.mu must be held when calling this method.
706708
func (s *stream) deliverLocked(data []byte, eventID string, responseTo jsonrpc.ID) (done bool, err error) {
709+
// First, record the response. We must do this *before* returning an error
710+
// below, as even if the stream is disconnected we want to update our
711+
// accounting.
707712
if responseTo.IsValid() {
708713
delete(s.requests, responseTo)
709714
}
715+
// Now, try to deliver the message to the client.
710716
done = len(s.requests) == 0 && s.id != ""
711717
if s.done == nil {
712718
return done, fmt.Errorf("stream not connected or already closed")
@@ -846,9 +852,8 @@ func (c *streamableServerConn) serveGET(w http.ResponseWriter, req *http.Request
846852
if protocolVersion == "" {
847853
protocolVersion = protocolVersion20250326
848854
}
849-
supportsPrimeClose := protocolVersion >= protocolVersion20251125
850855

851-
stream, done := c.acquireStream(ctx, w, streamID, lastIdx, supportsPrimeClose)
856+
stream, done := c.acquireStream(ctx, w, streamID, lastIdx, protocolVersion)
852857
if stream == nil {
853858
return
854859
}
@@ -883,9 +888,9 @@ func (c *streamableServerConn) hangResponse(ctx context.Context, done <-chan str
883888
// all messages, so that no delivery or storage of new messages occurs while
884889
// the stream is still replaying.
885890
//
886-
// supportsPrimeClose indicates whether the client supports the prime and close
887-
// events (SEP-1699, added in protocol version 2025-11-25 or later).
888-
func (c *streamableServerConn) acquireStream(ctx context.Context, w http.ResponseWriter, streamID string, lastIdx int, supportsPrimeClose bool) (*stream, chan struct{}) {
891+
// protocolVersion is the protocol version for this stream, used to determine
892+
// feature support (e.g. prime and close events were added in 2025-11-25).
893+
func (c *streamableServerConn) acquireStream(ctx context.Context, w http.ResponseWriter, streamID string, lastIdx int, protocolVersion string) (*stream, chan struct{}) {
889894
// if tempStream is set, the stream is done and we're just replaying messages.
890895
//
891896
// We record a temporary stream to claim exclusive replay rights. The spec
@@ -986,7 +991,7 @@ func (c *streamableServerConn) acquireStream(ctx context.Context, w http.Respons
986991
s.w = w
987992
s.done = make(chan struct{})
988993
s.lastIdx = lastIdx
989-
s.supportsPrimeClose = supportsPrimeClose
994+
s.protocolVersion = protocolVersion
990995
return s, s.done
991996
}
992997

@@ -1091,7 +1096,6 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques
10911096
if isInitialize && initializeProtocolVersion != "" {
10921097
effectiveVersion = initializeProtocolVersion
10931098
}
1094-
supportsPrimeClose := effectiveVersion >= protocolVersion20251125
10951099

10961100
// If we don't have any calls, we can just publish the incoming messages and return.
10971101
// No need to track a logical stream.
@@ -1143,13 +1147,15 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques
11431147
stream.w = w
11441148
done := make(chan struct{})
11451149
stream.done = done
1146-
stream.supportsPrimeClose = supportsPrimeClose
1150+
stream.protocolVersion = effectiveVersion
11471151
if c.jsonResponse {
11481152
// JSON mode: collect messages in pendingJSONMessages until done.
1153+
// Set pendingJSONMessages to a non-nil value to signal that this is an
1154+
// application/json stream.
11491155
stream.pendingJSONMessages = []json.RawMessage{}
11501156
} else {
11511157
// SSE mode: write a priming event if supported.
1152-
if c.eventStore != nil && supportsPrimeClose {
1158+
if c.eventStore != nil && effectiveVersion >= protocolVersion20251125 {
11531159
// Write a priming event, as defined by [§2.1.6] of the spec.
11541160
//
11551161
// [§2.1.6]: https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#sending-messages-to-the-server

0 commit comments

Comments
 (0)