Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 19 additions & 45 deletions mcp/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ func scanEvents(r io.Reader) iter.Seq2[Event, error] {
//
// All of an EventStore's methods must be safe for use by multiple goroutines.
type EventStore interface {
// AppendEvent appends data for an outgoing event to given stream, which is part of the
// given session. It returns the index of the event in the stream, suitable for constructing
// an event ID to send to the client.
AppendEvent(_ context.Context, sessionID string, _ StreamID, data []byte) (int, error)
// Append appends data for an outgoing event to given stream, which is part of the
// given session.
Append(_ context.Context, sessionID string, _ StreamID, data []byte) error

// After returns an iterator over the data for the given session and stream, beginning
// just after the given index.
Expand All @@ -165,16 +164,15 @@ type EventStore interface {
// dropped; it must not return partial results.
After(_ context.Context, sessionID string, _ StreamID, index int) iter.Seq2[[]byte, error]

// StreamClosed informs the store that the given stream is finished.
// A store cannot rely on this method being called for cleanup. It should institute
// additional mechanisms, such as timeouts, to reclaim storage.
StreamClosed(_ context.Context, sessionID string, streamID StreamID) error

// SessionClosed informs the store that the given session is finished, along
// with all of its streams.
// A store cannot rely on this method being called for cleanup. It should institute
// additional mechanisms, such as timeouts, to reclaim storage.
//
SessionClosed(_ context.Context, sessionID string) error

// There is no StreamClosed method. A server doesn't know when a stream is finished, because
// the client can always send a GET with a Last-Event-ID referring to the stream.
}

// A dataList is a list of []byte.
Expand Down Expand Up @@ -210,15 +208,6 @@ func (dl *dataList) removeFirst() int {
return r
}

// lastIndex returns the index of the last data item in dl.
// It panics if there are none.
func (dl *dataList) lastIndex() int {
if len(dl.data) == 0 {
panic("empty dataList")
}
return dl.first + len(dl.data) - 1
}

// A MemoryEventStore is an [EventStore] backed by memory.
type MemoryEventStore struct {
mu sync.Mutex
Expand Down Expand Up @@ -267,9 +256,8 @@ func NewMemoryEventStore(opts *MemoryEventStoreOptions) *MemoryEventStore {
}
}

// AppendEvent implements [EventStore.AppendEvent] by recording data
// in memory.
func (s *MemoryEventStore) AppendEvent(_ context.Context, sessionID string, streamID StreamID, data []byte) (int, error) {
// Append implements [EventStore.Append] by recording data in memory.
func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID StreamID, data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -288,9 +276,13 @@ func (s *MemoryEventStore) AppendEvent(_ context.Context, sessionID string, stre
s.purge()
dl.appendData(data)
s.nBytes += len(data)
return dl.lastIndex(), nil
return nil
}

// ErrEventsPurged is the error that [EventStore.After] should return if the event just after the
// index is no longer available.
var ErrEventsPurged = errors.New("data purged")

// After implements [EventStore.After].
func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID StreamID, index int) iter.Seq2[[]byte, error] {
// Return the data items to yield.
Expand All @@ -306,10 +298,12 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S
if !ok {
return nil, fmt.Errorf("MemoryEventStore.After: unknown stream ID %v in session %q", streamID, sessionID)
}
if dl.first > index {
return nil, fmt.Errorf("MemoryEventStore.After: data purged at index %d, stream ID %v, session %q", index, streamID, sessionID)
start := index + 1
if dl.first > start {
return nil, fmt.Errorf("MemoryEventStore.After: index %d, stream ID %v, session %q: %w",
index, streamID, sessionID, ErrEventsPurged)
}
return slices.Clone(dl.data[index-dl.first:]), nil
return slices.Clone(dl.data[start-dl.first:]), nil
}

return func(yield func([]byte, error) bool) {
Expand All @@ -326,26 +320,6 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S
}
}

// StreamClosed implements [EventStore.StreamClosed].
func (s *MemoryEventStore) StreamClosed(_ context.Context, sessionID string, streamID StreamID) error {
if sessionID == "" {
panic("empty sessionID")
}

s.mu.Lock()
defer s.mu.Unlock()

sm := s.store[sessionID]
dl := sm[streamID]
s.nBytes -= dl.size
delete(sm, streamID)
if len(sm) == 0 {
delete(s.store, sessionID)
}
s.validate()
return nil
}

// SessionClosed implements [EventStore.SessionClosed].
func (s *MemoryEventStore) SessionClosed(_ context.Context, sessionID string) error {
s.mu.Lock()
Expand Down
30 changes: 9 additions & 21 deletions mcp/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestMemoryEventStoreState(t *testing.T) {
ctx := context.Background()

appendEvent := func(s *MemoryEventStore, sess string, str StreamID, data string) {
if _, err := s.AppendEvent(ctx, sess, str, []byte(data)); err != nil {
if err := s.Append(ctx, sess, str, []byte(data)); err != nil {
t.Fatal(err)
}
}
Expand All @@ -127,18 +127,6 @@ func TestMemoryEventStoreState(t *testing.T) {
"S1 1 first=0 d1 d3; S1 2 first=0 d2; S2 8 first=0 d4",
8,
},
{
"stream close",
func(s *MemoryEventStore) {
appendEvent(s, "S1", 1, "d1")
appendEvent(s, "S1", 2, "d2")
appendEvent(s, "S1", 1, "d3")
appendEvent(s, "S2", 8, "d4")
s.StreamClosed(ctx, "S1", 1)
},
"S1 2 first=0 d2; S2 8 first=0 d4",
4,
},
{
"session close",
func(s *MemoryEventStore) {
Expand Down Expand Up @@ -218,10 +206,10 @@ func TestMemoryEventStoreAfter(t *testing.T) {
ctx := context.Background()
s := NewMemoryEventStore(nil)
s.SetMaxBytes(4)
s.AppendEvent(ctx, "S1", 1, []byte("d1"))
s.AppendEvent(ctx, "S1", 1, []byte("d2"))
s.AppendEvent(ctx, "S1", 1, []byte("d3"))
s.AppendEvent(ctx, "S1", 2, []byte("d4")) // will purge d1
s.Append(ctx, "S1", 1, []byte("d1"))
s.Append(ctx, "S1", 1, []byte("d2"))
s.Append(ctx, "S1", 1, []byte("d3"))
s.Append(ctx, "S1", 2, []byte("d4")) // will purge d1
want := "S1 1 first=1 d2 d3; S1 2 first=0 d4"
if got := s.debugString(); got != want {
t.Fatalf("got state %q, want %q", got, want)
Expand All @@ -234,10 +222,10 @@ func TestMemoryEventStoreAfter(t *testing.T) {
want []string
wantErr string // if non-empty, error should contain this string
}{
{"S1", 1, 0, nil, "purge"},
{"S1", 1, 1, []string{"d2", "d3"}, ""},
{"S1", 1, 2, []string{"d3"}, ""},
{"S1", 2, 0, []string{"d4"}, ""},
{"S1", 1, 0, []string{"d2", "d3"}, ""},
{"S1", 1, 1, []string{"d3"}, ""},
{"S1", 1, 2, nil, ""},
{"S1", 2, 0, nil, ""},
{"S1", 3, 0, nil, "unknown stream ID"},
{"S2", 0, 0, nil, "unknown session ID"},
} {
Expand Down
2 changes: 2 additions & 0 deletions mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ func NewServer(impl *Implementation, opts *ServerOptions) *Server {
if opts.PageSize < 0 {
panic(fmt.Errorf("invalid page size %d", opts.PageSize))
}
// TODO(jba): don't modify opts, modify Server.opts.
if opts.PageSize == 0 {
opts.PageSize = DefaultPageSize
}

return &Server{
impl: impl,
opts: *opts,
Expand Down
Loading
Loading