Skip to content

Commit 421f3f7

Browse files
committed
remove extraneous eventstore append
1 parent bf2306e commit 421f3f7

File tree

2 files changed

+1
-20
lines changed

2 files changed

+1
-20
lines changed

mcp/event.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,12 +274,6 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID
274274
// Purge before adding, so at least the current data item will be present.
275275
// (That could result in nBytes > maxBytes, but we'll live with that.)
276276
s.purge()
277-
278-
// An empty data slice signals that a stream has been registered.
279-
// We ignore it since it contains no content and shouldn't affect the total size.
280-
if data == nil {
281-
return nil
282-
}
283277
dl.appendData(data)
284278
s.nBytes += len(data)
285279
return nil

mcp/streamable.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -509,12 +509,11 @@ type stream struct {
509509
}
510510

511511
func newStream(id StreamID, jsonResponse bool) *stream {
512-
s := &stream{
512+
return &stream{
513513
id: id,
514514
jsonResponse: jsonResponse,
515515
requests: make(map[jsonrpc.ID]struct{}),
516516
}
517-
return s
518517
}
519518

520519
func signalChanPtr() *chan struct{} {
@@ -675,11 +674,6 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques
675674
c.mu.Unlock()
676675
stream.signal.Store(signalChanPtr())
677676
defer stream.signal.Store(nil)
678-
// Register this stream with the event store.
679-
if err := c.eventStore.Append(req.Context(), c.SessionID(), stream.id, nil); err != nil {
680-
http.Error(w, fmt.Sprintf("error storing event: %v", err), http.StatusInternalServerError)
681-
return
682-
}
683677
}
684678

685679
// Publish incoming messages.
@@ -805,12 +799,6 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per
805799
for {
806800
for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) {
807801
if err != nil {
808-
yield(nil, err)
809-
return
810-
}
811-
// The stream exists, but does not contain any messages on the stream.
812-
// Do not yield this data.
813-
if data == nil {
814802
break
815803
}
816804
if !yield(data, nil) {
@@ -821,7 +809,6 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per
821809
c.mu.Lock()
822810
nOutstanding := len(stream.requests)
823811
c.mu.Unlock()
824-
825812
// If all requests have been handled and replied to, we should terminate this connection.
826813
// "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream."
827814
// §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server

0 commit comments

Comments
 (0)