Skip to content

Commit b3399e6

Browse files
authored
mcp: use args struct for CloseSSEStream (#718)
Change CloseSSEStream from func(time.Duration) to func(CloseSSEStreamArgs) for forward compatibility, allowing new fields to be added without breaking existing callers.
1 parent 1a964ae commit b3399e6

File tree

4 files changed

+21
-19
lines changed

4 files changed

+21
-19
lines changed

examples/server/conformance/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,8 +503,8 @@ func jsonSchema202012Handler(ctx context.Context, req *mcp.CallToolRequest, inpu
503503

504504
func testReconnectionHandler(ctx context.Context, req *mcp.CallToolRequest, _ any) (*mcp.CallToolResult, any, error) {
505505
// Close the SSE stream to trigger client reconnection (SEP-1699)
506-
if req.Extra != nil && req.Extra.CloseStream != nil {
507-
req.Extra.CloseStream(10 * time.Millisecond)
506+
if req.Extra != nil && req.Extra.CloseSSEStream != nil {
507+
req.Extra.CloseSSEStream(mcp.CloseSSEStreamArgs{RetryAfter: 10 * time.Millisecond})
508508
}
509509

510510
// Wait for client to reconnect

mcp/shared.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -479,22 +479,24 @@ type RequestExtra struct {
479479
TokenInfo *auth.TokenInfo // bearer token info (e.g. from OAuth) if any
480480
Header http.Header // header from HTTP request, if any
481481

482-
// If set, CloseStream explicitly closes the current request stream.
482+
// If set, CloseSSEStream explicitly closes the current SSE request stream.
483483
//
484484
// [SEP-1699] introduced server-side SSE stream disconnection: for
485485
// long-running requests, servers may opt to close the SSE stream and
486-
// ask the client to retry at a later time. CloseStream implements this
487-
// feature; if reconnectAfter is set, an event is sent with a `retry:` field
486+
// ask the client to retry at a later time. CloseSSEStream implements this
487+
// feature; if RetryAfter is set, an event is sent with a `retry:` field
488488
// to configure the reconnection delay.
489489
//
490490
// [SEP-1699]: https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699
491-
CloseStream func(reconnectAfter time.Duration)
491+
CloseSSEStream func(CloseSSEStreamArgs)
492492
}
493493

494-
// TODO(cleanup): switch to an args struct here for forwards compatibility.
495-
// type CloseStreamArgs struct {
496-
// After time.Duration
497-
// }
494+
// CloseSSEStreamArgs are arguments for [RequestExtra.CloseSSEStream].
495+
type CloseSSEStreamArgs struct {
496+
// RetryAfter configures the reconnection delay sent to the client via the
497+
// SSE retry field. If zero, no retry field is sent.
498+
RetryAfter time.Duration
499+
}
498500

499501
func (*ClientRequest[P]) isRequest() {}
500502
func (*ServerRequest[P]) isRequest() {}

mcp/streamable.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,9 +1065,9 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques
10651065
}
10661066
if jreq.IsCall() {
10671067
calls[jreq.ID] = struct{}{}
1068-
// See the doc for CloseStream: allow the request handler to explicitly
1069-
// close the ongoing stream.
1070-
jreq.Extra.(*RequestExtra).CloseStream = func(reconnectAfter time.Duration) {
1068+
// See the doc for CloseSSEStream: allow the request handler to
1069+
// explicitly close the ongoing stream.
1070+
jreq.Extra.(*RequestExtra).CloseSSEStream = func(args CloseSSEStreamArgs) {
10711071
c.mu.Lock()
10721072
streamID, ok := c.requestStreams[jreq.ID]
10731073
var stream *stream
@@ -1077,7 +1077,7 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques
10771077
c.mu.Unlock()
10781078

10791079
if stream != nil {
1080-
stream.close(reconnectAfter)
1080+
stream.close(args.RetryAfter)
10811081
}
10821082
}
10831083
}

mcp/streamable_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,10 +528,10 @@ func TestStreamableServerDisconnect(t *testing.T) {
528528
testStream := func(ctx context.Context, session *ServerSession, extra *RequestExtra) {
529529
// Close the stream before the first message. We should have sent an
530530
// initial priming message already, so the client will be able to replay
531-
extra.CloseStream(10 * time.Millisecond)
531+
extra.CloseSSEStream(CloseSSEStreamArgs{RetryAfter: 10 * time.Millisecond})
532532
session.NotifyProgress(ctx, &ProgressNotificationParams{Message: "msg1"})
533533
time.Sleep(20 * time.Millisecond)
534-
extra.CloseStream(10 * time.Millisecond) // Closing twice should still be supported.
534+
extra.CloseSSEStream(CloseSSEStreamArgs{RetryAfter: 10 * time.Millisecond}) // Closing twice should still be supported.
535535
session.NotifyProgress(ctx, &ProgressNotificationParams{Message: "msg2"})
536536
}
537537

@@ -1069,7 +1069,7 @@ func TestStreamableServerTransport(t *testing.T) {
10691069
name: "no close message on old protocol",
10701070
replay: true,
10711071
tool: func(t *testing.T, _ context.Context, req *CallToolRequest) {
1072-
req.Extra.CloseStream(time.Millisecond)
1072+
req.Extra.CloseSSEStream(CloseSSEStreamArgs{RetryAfter: time.Millisecond})
10731073
},
10741074
requests: []streamableRequest{
10751075
initialize,
@@ -1105,7 +1105,7 @@ func TestStreamableServerTransport(t *testing.T) {
11051105
name: "close message on 2025-11-25",
11061106
replay: true,
11071107
tool: func(t *testing.T, _ context.Context, req *CallToolRequest) {
1108-
req.Extra.CloseStream(time.Millisecond)
1108+
req.Extra.CloseSSEStream(CloseSSEStreamArgs{RetryAfter: time.Millisecond})
11091109
},
11101110
requests: []streamableRequest{
11111111
initialize20251125,
@@ -1126,7 +1126,7 @@ func TestStreamableServerTransport(t *testing.T) {
11261126
name: "no close message",
11271127
replay: true,
11281128
tool: func(t *testing.T, _ context.Context, req *CallToolRequest) {
1129-
req.Extra.CloseStream(0)
1129+
req.Extra.CloseSSEStream(CloseSSEStreamArgs{})
11301130
},
11311131
requests: []streamableRequest{
11321132
initialize,

0 commit comments

Comments
 (0)