@@ -42,8 +42,8 @@ type StreamableHTTPHandler struct {
4242// StreamableHTTPOptions is a placeholder options struct for future
4343// configuration of the StreamableHTTP handler.
4444type StreamableHTTPOptions struct {
45- // TODO(rfindley) : support configurable session ID generation and event
46- // store, session retention, and event retention.
45+ // TODO: support configurable session ID generation (?)
46+ // TODO: support session retention (?)
4747}
4848
4949// NewStreamableHTTPHandler returns a new [StreamableHTTPHandler].
@@ -61,7 +61,7 @@ func NewStreamableHTTPHandler(getServer func(*http.Request) *Server, opts *Strea
6161// closeAll closes all ongoing sessions.
6262//
6363// TODO(rfindley): investigate the best API for callers to configure their
64- // session lifecycle.
64+ // session lifecycle. (?)
6565//
6666// Should we allow passing in a session store? That would allow the handler to
6767// be stateless.
@@ -118,7 +118,7 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
118118 return
119119 }
120120 h .sessionsMu .Lock ()
121- delete (h .sessions , session .id )
121+ delete (h .sessions , session .sessionID )
122122 h .sessionsMu .Unlock ()
123123 session .Close ()
124124 w .WriteHeader (http .StatusNoContent )
@@ -149,7 +149,7 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
149149 return
150150 }
151151 h .sessionsMu .Lock ()
152- h .sessions [s .id ] = s
152+ h .sessions [s .sessionID ] = s
153153 h .sessionsMu .Unlock ()
154154 session = s
155155 }
@@ -176,7 +176,7 @@ func NewStreamableServerTransport(sessionID string, opts *StreamableServerTransp
176176 opts = & StreamableServerTransportOptions {}
177177 }
178178 t := & StreamableServerTransport {
179- id : sessionID ,
179+ sessionID : sessionID ,
180180 incoming : make (chan jsonrpc.Message , 10 ),
181181 done : make (chan struct {}),
182182 streams : make (map [StreamID ]* stream ),
@@ -193,18 +193,18 @@ func NewStreamableServerTransport(sessionID string, opts *StreamableServerTransp
193193}
194194
195195func (t * StreamableServerTransport ) SessionID () string {
196- return t .id
196+ return t .sessionID
197197}
198198
199199// A StreamableServerTransport implements the [Transport] interface for a
200200// single session.
201201type StreamableServerTransport struct {
202202 nextStreamID atomic.Int64 // incrementing next stream ID
203203
204- id string
205- opts StreamableServerTransportOptions
206- incoming chan jsonrpc.Message // messages from the client to the server
207- done chan struct {}
204+ sessionID string
205+ opts StreamableServerTransportOptions
206+ incoming chan jsonrpc.Message // messages from the client to the server
207+ done chan struct {}
208208
209209 mu sync.Mutex
210210 // Sessions are closed exactly once.
@@ -217,17 +217,20 @@ type StreamableServerTransport struct {
217217 // Therefore, we use a logical connection ID to key the connection state, and
218218 // perform the accounting described below when incoming HTTP requests are
219219 // handled.
220- //
221- // TODO(rfindley): simplify.
222220
223221 // streams holds the logical streams for this session, keyed by their ID.
222+ // TODO: streams are never deleted, so the memory for a connection grows without
223+ // bound. If we deleted a stream when the response is sent, we would lose the ability
224+ // to replay if there was a cut just before the response was transmitted.
225+ // Perhaps we could have a TTL for streams that starts just after the response.
224226 streams map [StreamID ]* stream
225227
226228 // requestStreams maps incoming requests to their logical stream ID.
227229 //
228230 // Lifecycle: requestStreams persists for the duration of the session.
229231 //
230- // TODO(rfindley): clean up once requests are handled.
232+ // TODO(rfindley): clean up once requests are handled. See the TODO for streams
233+ // above.
231234 requestStreams map [jsonrpc.ID ]StreamID
232235}
233236
@@ -288,7 +291,7 @@ type StreamID int64
288291
289292// Connect implements the [Transport] interface.
290293//
291- // TODO(rfindley): Connect should return a new object.
294+ // TODO(rfindley): Connect should return a new object. (Why?)
292295func (s * StreamableServerTransport ) Connect (context.Context ) (Connection , error ) {
293296 return s , nil
294297}
@@ -411,6 +414,8 @@ func (t *StreamableServerTransport) servePOST(w http.ResponseWriter, req *http.R
411414 // TODO(rfindley): consider optimizing for a single incoming request, by
412415 // responding with application/json when there is only a single message in
413416 // the response.
417+ // (But how would we know there is only a single message? For example, couldn't
418+ // a progress notification be sent before a response on the same context?)
414419 return t .streamResponse (stream , w , req , - 1 )
415420}
416421
@@ -437,7 +442,7 @@ func (t *StreamableServerTransport) streamResponse(stream *stream, w http.Respon
437442 return true
438443 }
439444
440- w .Header ().Set (sessionIDHeader , t .id )
445+ w .Header ().Set (sessionIDHeader , t .sessionID )
441446 w .Header ().Set ("Content-Type" , "text/event-stream" ) // Accept checked in [StreamableHTTPHandler]
442447 w .Header ().Set ("Cache-Control" , "no-cache, no-transform" )
443448 w .Header ().Set ("Connection" , "keep-alive" )
@@ -486,7 +491,9 @@ stream:
486491 // If all requests have been handled and replied to, we should terminate this connection.
487492 // "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream."
488493 // §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server
489- // TODO(jba,findleyr): why not terminate regardless of http method?
494+ // We only want to terminate POSTs, and GETs that are replaying. The general-purpose GET
495+ // (stream ID 0) will never have requests, and should remain open indefinitely.
496+ // TODO: implement the GET case.
490497 if req .Method == http .MethodPost && nOutstanding == 0 {
491498 if writes == 0 {
492499 // Spec: If the server accepts the input, the server MUST return HTTP
@@ -563,11 +570,12 @@ func (t *StreamableServerTransport) Read(ctx context.Context) (jsonrpc.Message,
563570// Write implements the [Connection] interface.
564571func (t * StreamableServerTransport ) Write (ctx context.Context , msg jsonrpc.Message ) error {
565572 // Find the incoming request that this write relates to, if any.
566- var forRequest , replyTo jsonrpc.ID
573+ var forRequest jsonrpc.ID
574+ isResponse := false
567575 if resp , ok := msg .(* jsonrpc.Response ); ok {
568576 // If the message is a response, it relates to its request (of course).
569577 forRequest = resp .ID
570- replyTo = resp . ID
578+ isResponse = true
571579 } else {
572580 // Otherwise, we check to see if it request was made in the context of an
573581 // ongoing request. This may not be the case if the request way made with
@@ -611,10 +619,12 @@ func (t *StreamableServerTransport) Write(ctx context.Context, msg jsonrpc.Messa
611619 stream = t .streams [0 ]
612620 }
613621
622+ // TODO: if there is nothing to send these messages to (as would happen, for example, if forConn == 0
623+ // and the client never did a GET), then memory will grow without bound. Consider a mitigation.
614624 stream .outgoing = append (stream .outgoing , data )
615- if replyTo . IsValid () {
625+ if isResponse {
616626 // Once we've put the reply on the queue, it's no longer outstanding.
617- delete (stream .requests , replyTo )
627+ delete (stream .requests , forRequest )
618628 }
619629
620630 // Signal streamResponse that new work is available.
@@ -635,16 +645,16 @@ func (t *StreamableServerTransport) Close() error {
635645 if ! t .isDone {
636646 t .isDone = true
637647 close (t .done )
638- return t .opts .EventStore .SessionClosed (context .TODO (), t .id )
648+ // TODO: find a way to plumb a context here, or an event store with a long-running
649+ // close operation can take arbitrary time. Alternative: impose a fixed timeout here.
650+ return t .opts .EventStore .SessionClosed (context .TODO (), t .sessionID )
639651 }
640652 return nil
641653}
642654
643655// A StreamableClientTransport is a [Transport] that can communicate with an MCP
644656// endpoint serving the streamable HTTP transport defined by the 2025-03-26
645657// version of the spec.
646- //
647- // TODO(rfindley): support retries and resumption tokens.
648658type StreamableClientTransport struct {
649659 url string
650660 opts StreamableClientTransportOptions
0 commit comments