@@ -42,8 +42,8 @@ type StreamableHTTPHandler struct {
4242// StreamableHTTPOptions is a placeholder options struct for future
4343// configuration of the StreamableHTTP handler.
4444type StreamableHTTPOptions struct {
45- // TODO(rfindley) : support configurable session ID generation and event
46- // store, session retention, and event retention.
45+ // TODO: support configurable session ID generation (?)
46+ // TODO: support session retention (?)
4747}
4848
4949// NewStreamableHTTPHandler returns a new [StreamableHTTPHandler].
@@ -61,7 +61,7 @@ func NewStreamableHTTPHandler(getServer func(*http.Request) *Server, opts *Strea
6161// closeAll closes all ongoing sessions.
6262//
6363// TODO(rfindley): investigate the best API for callers to configure their
64- // session lifecycle.
64+ // session lifecycle. (?)
6565//
6666// Should we allow passing in a session store? That would allow the handler to
6767// be stateless.
@@ -118,7 +118,7 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
118118 return
119119 }
120120 h .sessionsMu .Lock ()
121- delete (h .sessions , session .id )
121+ delete (h .sessions , session .sessionID )
122122 h .sessionsMu .Unlock ()
123123 session .Close ()
124124 w .WriteHeader (http .StatusNoContent )
@@ -149,7 +149,7 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
149149 return
150150 }
151151 h .sessionsMu .Lock ()
152- h .sessions [s .id ] = s
152+ h .sessions [s .sessionID ] = s
153153 h .sessionsMu .Unlock ()
154154 session = s
155155 }
@@ -176,7 +176,7 @@ func NewStreamableServerTransport(sessionID string, opts *StreamableServerTransp
176176 opts = & StreamableServerTransportOptions {}
177177 }
178178 t := & StreamableServerTransport {
179- id : sessionID ,
179+ sessionID : sessionID ,
180180 incoming : make (chan jsonrpc.Message , 10 ),
181181 done : make (chan struct {}),
182182 streams : make (map [StreamID ]* stream ),
@@ -193,18 +193,18 @@ func NewStreamableServerTransport(sessionID string, opts *StreamableServerTransp
193193}
194194
195195func (t * StreamableServerTransport ) SessionID () string {
196- return t .id
196+ return t .sessionID
197197}
198198
199199// A StreamableServerTransport implements the [Transport] interface for a
200200// single session.
201201type StreamableServerTransport struct {
202202 nextStreamID atomic.Int64 // incrementing next stream ID
203203
204- id string
205- opts StreamableServerTransportOptions
206- incoming chan jsonrpc.Message // messages from the client to the server
207- done chan struct {}
204+ sessionID string
205+ opts StreamableServerTransportOptions
206+ incoming chan jsonrpc.Message // messages from the client to the server
207+ done chan struct {}
208208
209209 mu sync.Mutex
210210 // Sessions are closed exactly once.
@@ -217,17 +217,20 @@ type StreamableServerTransport struct {
217217 // Therefore, we use a logical connection ID to key the connection state, and
218218 // perform the accounting described below when incoming HTTP requests are
219219 // handled.
220- //
221- // TODO(rfindley): simplify.
222220
223221 // streams holds the logical streams for this session, keyed by their ID.
222+ // TODO: streams are never deleted, so the memory for a connection grows without
223+ // bound. If we deleted a stream when the response is sent, we would lose the ability
224+ // to replay if there was a cut just before the response was transmitted.
225+ // Perhaps we could have a TTL for streams that starts just after the response.
224226 streams map [StreamID ]* stream
225227
226228 // requestStreams maps incoming requests to their logical stream ID.
227229 //
228230 // Lifecycle: requestStreams persists for the duration of the session.
229231 //
230- // TODO(rfindley): clean up once requests are handled.
232+ // TODO(rfindley): clean up once requests are handled. See the TODO for streams
233+ // above.
231234 requestStreams map [jsonrpc.ID ]StreamID
232235}
233236
@@ -288,7 +291,7 @@ type StreamID int64
288291
289292// Connect implements the [Transport] interface.
290293//
291- // TODO(rfindley): Connect should return a new object.
294+ // TODO(rfindley): Connect should return a new object. (Why?)
292295func (s * StreamableServerTransport ) Connect (context.Context ) (Connection , error ) {
293296 return s , nil
294297}
@@ -411,6 +414,8 @@ func (t *StreamableServerTransport) servePOST(w http.ResponseWriter, req *http.R
411414 // TODO(rfindley): consider optimizing for a single incoming request, by
412415 // responding with application/json when there is only a single message in
413416 // the response.
417+ // (But how would we know there is only a single message? For example, couldn't
418+ // a progress notification be sent before a response on the same context?)
414419 return t .streamResponse (stream , w , req , - 1 )
415420}
416421
@@ -437,7 +442,7 @@ func (t *StreamableServerTransport) streamResponse(stream *stream, w http.Respon
437442 return true
438443 }
439444
440- w .Header ().Set (sessionIDHeader , t .id )
445+ w .Header ().Set (sessionIDHeader , t .sessionID )
441446 w .Header ().Set ("Content-Type" , "text/event-stream" ) // Accept checked in [StreamableHTTPHandler]
442447 w .Header ().Set ("Cache-Control" , "no-cache, no-transform" )
443448 w .Header ().Set ("Connection" , "keep-alive" )
@@ -563,11 +568,12 @@ func (t *StreamableServerTransport) Read(ctx context.Context) (jsonrpc.Message,
563568// Write implements the [Connection] interface.
564569func (t * StreamableServerTransport ) Write (ctx context.Context , msg jsonrpc.Message ) error {
565570 // Find the incoming request that this write relates to, if any.
566- var forRequest , replyTo jsonrpc.ID
571+ var forRequest jsonrpc.ID
572+ isResponse := false
567573 if resp , ok := msg .(* jsonrpc.Response ); ok {
568574 // If the message is a response, it relates to its request (of course).
569575 forRequest = resp .ID
570- replyTo = resp . ID
576+ isResponse = true
571577 } else {
572578 // Otherwise, we check to see if it request was made in the context of an
573579 // ongoing request. This may not be the case if the request way made with
@@ -612,9 +618,9 @@ func (t *StreamableServerTransport) Write(ctx context.Context, msg jsonrpc.Messa
612618 }
613619
614620 stream .outgoing = append (stream .outgoing , data )
615- if replyTo . IsValid () {
621+ if isResponse {
616622 // Once we've put the reply on the queue, it's no longer outstanding.
617- delete (stream .requests , replyTo )
623+ delete (stream .requests , forRequest )
618624 }
619625
620626 // Signal streamResponse that new work is available.
@@ -635,16 +641,16 @@ func (t *StreamableServerTransport) Close() error {
635641 if ! t .isDone {
636642 t .isDone = true
637643 close (t .done )
638- return t .opts .EventStore .SessionClosed (context .TODO (), t .id )
644+ // TODO: find a way to plumb a context here, or an event store with a long-running
645+ // close operation can take arbitrary time. Alternative: impose a fixed timeout here.
646+ return t .opts .EventStore .SessionClosed (context .TODO (), t .sessionID )
639647 }
640648 return nil
641649}
642650
643651// A StreamableClientTransport is a [Transport] that can communicate with an MCP
644652// endpoint serving the streamable HTTP transport defined by the 2025-03-26
645653// version of the spec.
646- //
647- // TODO(rfindley): support retries and resumption tokens.
648654type StreamableClientTransport struct {
649655 url string
650656 opts StreamableClientTransportOptions
0 commit comments