Skip to content

Commit 104d39c

Browse files
authored
mcp: allow disabling stream replay; disable by default (#585)
This CL implements two changes that must be logically connected. The first is to provide a mechanism for disabling stream replay. Previously, our documentation stated that if StreamableServerTransport.EventStore was nil, a default in-memory event store would be used. In that case, there would be no way to completely disable stream replay, and as described in #576, there are many cases where replay is undesirable. But of course changing the behavior of the transport zero value implicitly affects its default behavior, and so this CL also implements the proposal #580: stream replay is disabled by default. Implementing this change required a significant refactoring, as previously we were relying on the event store for serialized message delivery from the JSON-RPC layer to the MCP layer: the connection would write to the event store, and independently the stream (be it an incoming POST or replay GET) would iterate and serve messages in the stream. In order to achieve the goals of this CL, it was necessary to decouple storage from delivery. The 'stream' abstraction now tracks a delivery callback that writes straight to the HTTP response. It would have been convenient to store the ongoing http.ResponseWriter directly in the stream (this is how typescript does it), but due to the design of our EventStore API, only the HTTP handler knows the next event index, so a 'deliver' abstraction was an unfortunate requirement (suggestions for how to further simplify this are welcome). More simplification is possible: in particular, as a result of this refactoring it should be entirely possible to clean up streams once we've received all responses. Any replay would only need access to the EventStore, if at all. This is left to a follow-up CL, to limit this already significant change. Furthermore, a nice consequence of this refactoring is that, when not using event storage, servers can get synchronous feedback that message delivery failed, which should avoid unnecessary work. We can additionally cancel ongoing requests on early client termination, but that is also left to a follow-up CL. Throughout, the terminology 'standalone SSE stream' replaced 'hanging GET' when referring to the non-replay GET request issued by the client. This is consistent with other SDKs. Fixes #576 Updates #580
1 parent 010bdbc commit 104d39c

File tree

5 files changed

+420
-307
lines changed

5 files changed

+420
-307
lines changed

mcp/event.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,9 @@ func scanEvents(r io.Reader) iter.Seq2[Event, error] {
153153
//
154154
// All of an EventStore's methods must be safe for use by multiple goroutines.
155155
type EventStore interface {
156-
// Open prepares the event store for a given stream. It ensures that the
157-
// underlying data structure for the stream is initialized, making it
158-
// ready to store event streams.
159-
//
160-
// streamIDs must be globally unique.
156+
// Open is called when a new stream is created. It may be used to ensure that
157+
// the underlying data structure for the stream is initialized, making it
158+
// ready to store and replay event streams.
161159
Open(_ context.Context, sessionID, streamID string) error
162160

163161
// Append appends data for an outgoing event to given stream, which is part of the
@@ -166,6 +164,7 @@ type EventStore interface {
166164

167165
// After returns an iterator over the data for the given session and stream, beginning
168166
// just after the given index.
167+
//
169168
// Once the iterator yields a non-nil error, it will stop.
170169
// After's iterator must return an error immediately if any data after index was
171170
// dropped; it must not return partial results.
@@ -174,6 +173,7 @@ type EventStore interface {
174173

175174
// SessionClosed informs the store that the given session is finished, along
176175
// with all of its streams.
176+
//
177177
// A store cannot rely on this method being called for cleanup. It should institute
178178
// additional mechanisms, such as timeouts, to reclaim storage.
179179
SessionClosed(_ context.Context, sessionID string) error

0 commit comments

Comments
 (0)