Skip to content

Commit 5e55e08

Browse files
committed
mcp: remove the StreamID type
The streamID type did not carry its weight. Remove it. Fixes #484
1 parent b636b16 commit 5e55e08

File tree

4 files changed

+33
-35
lines changed

4 files changed

+33
-35
lines changed

mcp/event.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,19 +156,21 @@ type EventStore interface {
156156
// Open prepares the event store for a given stream. It ensures that the
157157
// underlying data structure for the stream is initialized, making it
158158
// ready to store event streams.
159-
Open(_ context.Context, sessionID string, streamID StreamID) error
159+
//
160+
// streamIDs must be globally unique.
161+
Open(_ context.Context, sessionID, streamID string) error
160162

161163
// Append appends data for an outgoing event to given stream, which is part of the
162164
// given session.
163-
Append(_ context.Context, sessionID string, _ StreamID, data []byte) error
165+
Append(_ context.Context, sessionID, streamID string, data []byte) error
164166

165167
// After returns an iterator over the data for the given session and stream, beginning
166168
// just after the given index.
167169
// Once the iterator yields a non-nil error, it will stop.
168170
// After's iterator must return an error immediately if any data after index was
169171
// dropped; it must not return partial results.
170172
// The stream must have been opened previously (see [EventStore.Open]).
171-
After(_ context.Context, sessionID string, _ StreamID, index int) iter.Seq2[[]byte, error]
173+
After(_ context.Context, sessionID, streamID string, index int) iter.Seq2[[]byte, error]
172174

173175
// SessionClosed informs the store that the given session is finished, along
174176
// with all of its streams.
@@ -217,9 +219,9 @@ func (dl *dataList) removeFirst() int {
217219
// A MemoryEventStore is an [EventStore] backed by memory.
218220
type MemoryEventStore struct {
219221
mu sync.Mutex
220-
maxBytes int // max total size of all data
221-
nBytes int // current total size of all data
222-
store map[string]map[StreamID]*dataList // session ID -> stream ID -> *dataList
222+
maxBytes int // max total size of all data
223+
nBytes int // current total size of all data
224+
store map[string]map[string]*dataList // session ID -> stream ID -> *dataList
223225
}
224226

225227
// MemoryEventStoreOptions are options for a [MemoryEventStore].
@@ -258,13 +260,13 @@ const defaultMaxBytes = 10 << 20 // 10 MiB
258260
func NewMemoryEventStore(opts *MemoryEventStoreOptions) *MemoryEventStore {
259261
return &MemoryEventStore{
260262
maxBytes: defaultMaxBytes,
261-
store: make(map[string]map[StreamID]*dataList),
263+
store: make(map[string]map[string]*dataList),
262264
}
263265
}
264266

265267
// Open implements [EventStore.Open]. It ensures that the underlying data
266268
// structures for the given session are initialized and ready for use.
267-
func (s *MemoryEventStore) Open(_ context.Context, sessionID string, streamID StreamID) error {
269+
func (s *MemoryEventStore) Open(_ context.Context, sessionID, streamID string) error {
268270
s.mu.Lock()
269271
defer s.mu.Unlock()
270272
s.init(sessionID, streamID)
@@ -275,10 +277,10 @@ func (s *MemoryEventStore) Open(_ context.Context, sessionID string, streamID St
275277
// given sessionID and streamID exists, creating it if necessary. It returns the
276278
// dataList associated with the specified IDs.
277279
// Requires s.mu.
278-
func (s *MemoryEventStore) init(sessionID string, streamID StreamID) *dataList {
280+
func (s *MemoryEventStore) init(sessionID, streamID string) *dataList {
279281
streamMap, ok := s.store[sessionID]
280282
if !ok {
281-
streamMap = make(map[StreamID]*dataList)
283+
streamMap = make(map[string]*dataList)
282284
s.store[sessionID] = streamMap
283285
}
284286
dl, ok := streamMap[streamID]
@@ -290,7 +292,7 @@ func (s *MemoryEventStore) init(sessionID string, streamID StreamID) *dataList {
290292
}
291293

292294
// Append implements [EventStore.Append] by recording data in memory.
293-
func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID StreamID, data []byte) error {
295+
func (s *MemoryEventStore) Append(_ context.Context, sessionID, streamID string, data []byte) error {
294296
s.mu.Lock()
295297
defer s.mu.Unlock()
296298
dl := s.init(sessionID, streamID)
@@ -307,7 +309,7 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID
307309
var ErrEventsPurged = errors.New("data purged")
308310

309311
// After implements [EventStore.After].
310-
func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID StreamID, index int) iter.Seq2[[]byte, error] {
312+
func (s *MemoryEventStore) After(_ context.Context, sessionID, streamID string, index int) iter.Seq2[[]byte, error] {
311313
// Return the data items to yield.
312314
// We must copy, because dataList.removeFirst nils out slice elements.
313315
copyData := func() ([][]byte, error) {

mcp/event_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ func TestScanEvents(t *testing.T) {
105105
func TestMemoryEventStoreState(t *testing.T) {
106106
ctx := context.Background()
107107

108-
appendEvent := func(s *MemoryEventStore, sess string, str StreamID, data string) {
109-
if err := s.Append(ctx, sess, str, []byte(data)); err != nil {
108+
appendEvent := func(s *MemoryEventStore, sess, stream string, data string) {
109+
if err := s.Append(ctx, sess, stream, []byte(data)); err != nil {
110110
t.Fatal(err)
111111
}
112112
}
@@ -218,7 +218,7 @@ func TestMemoryEventStoreAfter(t *testing.T) {
218218

219219
for _, tt := range []struct {
220220
sessionID string
221-
streamID StreamID
221+
streamID string
222222
index int
223223
want []string
224224
wantErr string // if non-empty, error should contain this string
@@ -277,11 +277,11 @@ func BenchmarkMemoryEventStore(b *testing.B) {
277277
store.SetMaxBytes(test.limit)
278278
ctx := context.Background()
279279
sessionIDs := make([]string, test.sessions)
280-
streamIDs := make([][3]StreamID, test.sessions)
280+
streamIDs := make([][3]string, test.sessions)
281281
for i := range sessionIDs {
282282
sessionIDs[i] = fmt.Sprint(i)
283283
for j := range 3 {
284-
streamIDs[i][j] = StreamID(randText())
284+
streamIDs[i][j] = randText()
285285
}
286286
}
287287
payload := make([]byte, test.datasize)

mcp/streamable.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,8 @@ func (t *StreamableServerTransport) Connect(ctx context.Context) (Connection, er
396396
jsonResponse: t.jsonResponse,
397397
incoming: make(chan jsonrpc.Message, 10),
398398
done: make(chan struct{}),
399-
streams: make(map[StreamID]*stream),
400-
requestStreams: make(map[jsonrpc.ID]StreamID),
399+
streams: make(map[string]*stream),
400+
requestStreams: make(map[jsonrpc.ID]string),
401401
}
402402
if t.connection.eventStore == nil {
403403
t.connection.eventStore = NewMemoryEventStore(nil)
@@ -442,14 +442,14 @@ type streamableServerConn struct {
442442
// bound. If we deleted a stream when the response is sent, we would lose the ability
443443
// to replay if there was a cut just before the response was transmitted.
444444
// Perhaps we could have a TTL for streams that starts just after the response.
445-
streams map[StreamID]*stream
445+
streams map[string]*stream
446446

447447
// requestStreams maps incoming requests to their logical stream ID.
448448
//
449449
// Lifecycle: requestStreams persist for the duration of the session.
450450
//
451451
// TODO: clean up once requests are handled. See the TODO for streams above.
452-
requestStreams map[jsonrpc.ID]StreamID
452+
requestStreams map[jsonrpc.ID]string
453453
}
454454

455455
func (c *streamableServerConn) SessionID() string {
@@ -466,7 +466,7 @@ func (c *streamableServerConn) SessionID() string {
466466
type stream struct {
467467
// id is the logical ID for the stream, unique within a session.
468468
// an empty string is used for messages that don't correlate with an incoming request.
469-
id StreamID
469+
id string
470470

471471
// If isInitialize is set, the stream is in response to an initialize request,
472472
// and therefore should include the session ID header.
@@ -500,7 +500,7 @@ type stream struct {
500500
requests map[jsonrpc.ID]struct{}
501501
}
502502

503-
func (c *streamableServerConn) newStream(ctx context.Context, id StreamID, isInitialize, jsonResponse bool) (*stream, error) {
503+
func (c *streamableServerConn) newStream(ctx context.Context, id string, isInitialize, jsonResponse bool) (*stream, error) {
504504
if err := c.eventStore.Open(ctx, c.sessionID, id); err != nil {
505505
return nil, err
506506
}
@@ -517,10 +517,6 @@ func signalChanPtr() *chan struct{} {
517517
return &c
518518
}
519519

520-
// A StreamID identifies a stream of SSE events. It is globally unique.
521-
// [ServerSession].
522-
type StreamID string
523-
524520
// We track the incoming request ID inside the handler context using
525521
// idContextValue, so that notifications and server->client calls that occur in
526522
// the course of handling incoming requests are correlated with the incoming
@@ -569,7 +565,7 @@ func (t *StreamableServerTransport) ServeHTTP(w http.ResponseWriter, req *http.R
569565
// It returns an HTTP status code and error message.
570566
func (c *streamableServerConn) serveGET(w http.ResponseWriter, req *http.Request) {
571567
// connID 0 corresponds to the default GET request.
572-
id := StreamID("")
568+
id := ""
573569
// By default, we haven't seen a last index. Since indices start at 0, we represent
574570
// that by -1. This is incremented just before each event is written, in streamResponse
575571
// around L407.
@@ -669,7 +665,7 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques
669665
// notifications or server->client requests made in the course of handling.
670666
// Update accounting for this incoming payload.
671667
if len(requests) > 0 {
672-
stream, err = c.newStream(req.Context(), StreamID(randText()), isInitialize, c.jsonResponse)
668+
stream, err = c.newStream(req.Context(), randText(), isInitialize, c.jsonResponse)
673669
if err != nil {
674670
http.Error(w, fmt.Sprintf("storing stream: %v", err), http.StatusInternalServerError)
675671
return
@@ -860,25 +856,25 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per
860856
// streamID and message index idx.
861857
//
862858
// See also [parseEventID].
863-
func formatEventID(sid StreamID, idx int) string {
859+
func formatEventID(sid string, idx int) string {
864860
return fmt.Sprintf("%s_%d", sid, idx)
865861
}
866862

867863
// parseEventID parses a Last-Event-ID value into a logical stream id and
868864
// index.
869865
//
870866
// See also [formatEventID].
871-
func parseEventID(eventID string) (sid StreamID, idx int, ok bool) {
867+
func parseEventID(eventID string) (streamID string, idx int, ok bool) {
872868
parts := strings.Split(eventID, "_")
873869
if len(parts) != 2 {
874870
return "", 0, false
875871
}
876-
stream := StreamID(parts[0])
872+
streamID = parts[0]
877873
idx, err := strconv.Atoi(parts[1])
878874
if err != nil || idx < 0 {
879875
return "", 0, false
880876
}
881-
return StreamID(stream), idx, true
877+
return streamID, idx, true
882878
}
883879

884880
// Read implements the [Connection] interface.
@@ -922,7 +918,7 @@ func (c *streamableServerConn) Write(ctx context.Context, msg jsonrpc.Message) e
922918
//
923919
// For messages sent outside of a request context, this is the default
924920
// connection "".
925-
var forStream StreamID
921+
var forStream string
926922
if forRequest.IsValid() {
927923
c.mu.Lock()
928924
forStream = c.requestStreams[forRequest]

mcp/streamable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1139,7 +1139,7 @@ func mustMarshal(v any) json.RawMessage {
11391139

11401140
func TestEventID(t *testing.T) {
11411141
tests := []struct {
1142-
sid StreamID
1142+
sid string
11431143
idx int
11441144
}{
11451145
{"0", 0},

0 commit comments

Comments
 (0)