Skip to content

Commit 3f8f246

Browse files
authored
Merge branch 'modelcontextprotocol:main' into feat-devcontainer-config
2 parents b803799 + 8dd9a81 commit 3f8f246

File tree

10 files changed

+1077
-119
lines changed

10 files changed

+1077
-119
lines changed

design/design.md

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ server.AddReceivingMiddleware(withLogging)
471471
472472
#### Rate Limiting
473473
474-
Rate limiting can be configured using middleware. Please see [examples/rate-limiting](<https://github.com/modelcontextprotocol/go-sdk/tree/main/examples/rate-limiting>] for an example on how to implement this.
474+
Rate limiting can be configured using middleware. Please see [examples/rate-limiting](<https://github.com/modelcontextprotocol/go-sdk/tree/main/examples/rate-limiting>) for an example on how to implement this.
475475
476476
### Errors
477477
@@ -609,7 +609,7 @@ A tool handler accepts `CallToolParams` and returns a `CallToolResult`. However,
609609
```go
610610
type CallToolParamsFor[In any] struct {
611611
Meta Meta `json:"_meta,omitempty"`
612-
Arguments In `json:"arguments,omitempty"`
612+
Arguments In `json:"arguments,omitempty"`
613613
Name string `json:"name"`
614614
}
615615

@@ -748,13 +748,26 @@ Server sessions also support the spec methods `ListResources` and `ListResourceT
748748
749749
#### Subscriptions
750750
751-
ClientSessions can manage change notifications on particular resources:
751+
##### Client-Side Usage
752+
753+
Use the Subscribe and Unsubscribe methods on a ClientSession to start or stop receiving updates for a specific resource.
752754
753755
```go
754756
func (*ClientSession) Subscribe(context.Context, *SubscribeParams) error
755757
func (*ClientSession) Unsubscribe(context.Context, *UnsubscribeParams) error
756758
```
757759
760+
To process incoming update notifications, you must provide a ResourceUpdatedHandler in your ClientOptions. The SDK calls this function automatically whenever the server sends a notification for a resource you're subscribed to.
761+
762+
```go
763+
type ClientOptions struct {
764+
...
765+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
766+
}
767+
```
768+
769+
##### Server-Side Implementation
770+
758771
The server does not implement resource subscriptions. It passes along subscription requests to the user, and supplies a method to notify clients of changes. It tracks which sessions have subscribed to which resources so the user doesn't have to.
759772
760773
If a server author wants to support resource subscriptions, they must provide handlers to be called when clients subscribe and unsubscribe. It is an error to provide only one of these handlers.
@@ -772,7 +785,7 @@ type ServerOptions struct {
772785
User code should call `ResourceUpdated` when a subscribed resource changes.
773786
774787
```go
775-
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotification) error
788+
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotificationParams) error
776789
```
777790
778791
The server routes these notifications to the server sessions that subscribed to the resource.

mcp/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ClientOptions struct {
6060
ToolListChangedHandler func(context.Context, *ClientSession, *ToolListChangedParams)
6161
PromptListChangedHandler func(context.Context, *ClientSession, *PromptListChangedParams)
6262
ResourceListChangedHandler func(context.Context, *ClientSession, *ResourceListChangedParams)
63+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
6364
LoggingMessageHandler func(context.Context, *ClientSession, *LoggingMessageParams)
6465
ProgressNotificationHandler func(context.Context, *ClientSession, *ProgressNotificationParams)
6566
// If non-zero, defines an interval for regular "ping" requests.
@@ -293,6 +294,7 @@ var clientMethodInfos = map[string]methodInfo{
293294
notificationToolListChanged: newMethodInfo(clientMethod((*Client).callToolChangedHandler)),
294295
notificationPromptListChanged: newMethodInfo(clientMethod((*Client).callPromptChangedHandler)),
295296
notificationResourceListChanged: newMethodInfo(clientMethod((*Client).callResourceChangedHandler)),
297+
notificationResourceUpdated: newMethodInfo(clientMethod((*Client).callResourceUpdatedHandler)),
296298
notificationLoggingMessage: newMethodInfo(clientMethod((*Client).callLoggingHandler)),
297299
notificationProgress: newMethodInfo(sessionMethod((*ClientSession).callProgressNotificationHandler)),
298300
}
@@ -386,6 +388,20 @@ func (cs *ClientSession) Complete(ctx context.Context, params *CompleteParams) (
386388
return handleSend[*CompleteResult](ctx, cs, methodComplete, orZero[Params](params))
387389
}
388390

391+
// Subscribe sends a "resources/subscribe" request to the server, asking for
392+
// notifications when the specified resource changes.
393+
func (cs *ClientSession) Subscribe(ctx context.Context, params *SubscribeParams) error {
394+
_, err := handleSend[*emptyResult](ctx, cs, methodSubscribe, orZero[Params](params))
395+
return err
396+
}
397+
398+
// Unsubscribe sends a "resources/unsubscribe" request to the server, cancelling
399+
// a previous subscription.
400+
func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribeParams) error {
401+
_, err := handleSend[*emptyResult](ctx, cs, methodUnsubscribe, orZero[Params](params))
402+
return err
403+
}
404+
389405
func (c *Client) callToolChangedHandler(ctx context.Context, s *ClientSession, params *ToolListChangedParams) (Result, error) {
390406
return callNotificationHandler(ctx, c.opts.ToolListChangedHandler, s, params)
391407
}
@@ -398,6 +414,10 @@ func (c *Client) callResourceChangedHandler(ctx context.Context, s *ClientSessio
398414
return callNotificationHandler(ctx, c.opts.ResourceListChangedHandler, s, params)
399415
}
400416

417+
func (c *Client) callResourceUpdatedHandler(ctx context.Context, s *ClientSession, params *ResourceUpdatedNotificationParams) (Result, error) {
418+
return callNotificationHandler(ctx, c.opts.ResourceUpdatedHandler, s, params)
419+
}
420+
401421
func (c *Client) callLoggingHandler(ctx context.Context, cs *ClientSession, params *LoggingMessageParams) (Result, error) {
402422
if h := c.opts.LoggingMessageHandler; h != nil {
403423
h(ctx, cs, params)

mcp/event.go

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,22 @@ package mcp
1010
import (
1111
"bufio"
1212
"bytes"
13+
"context"
1314
"errors"
1415
"fmt"
1516
"io"
1617
"iter"
18+
"maps"
1719
"net/http"
20+
"slices"
1821
"strings"
22+
"sync"
1923
)
2024

25+
// If true, MemoryEventStore will do frequent validation to check invariants, slowing it down.
26+
// Remove when we're confident in the code.
27+
const validateMemoryEventStore = true
28+
2129
// An Event is a server-sent event.
2230
// See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields.
2331
type Event struct {
@@ -136,3 +144,259 @@ func scanEvents(r io.Reader) iter.Seq2[Event, error] {
136144
}
137145
}
138146
}
147+
148+
// An EventStore tracks data for SSE streams.
149+
// A single EventStore suffices for all sessions, since session IDs are
150+
// globally unique. So one EventStore can be created per process, for
151+
// all Servers in the process.
152+
// Such a store is able to bound resource usage for the entire process.
153+
//
154+
// All of an EventStore's methods must be safe for use by multiple goroutines.
155+
type EventStore interface {
156+
// Append appends data for an outgoing event to given stream, which is part of the
157+
// given session.
158+
Append(_ context.Context, sessionID string, _ StreamID, data []byte) error
159+
160+
// After returns an iterator over the data for the given session and stream, beginning
161+
// just after the given index.
162+
// Once the iterator yields a non-nil error, it will stop.
163+
// After's iterator must return an error immediately if any data after index was
164+
// dropped; it must not return partial results.
165+
After(_ context.Context, sessionID string, _ StreamID, index int) iter.Seq2[[]byte, error]
166+
167+
// SessionClosed informs the store that the given session is finished, along
168+
// with all of its streams.
169+
// A store cannot rely on this method being called for cleanup. It should institute
170+
// additional mechanisms, such as timeouts, to reclaim storage.
171+
//
172+
SessionClosed(_ context.Context, sessionID string) error
173+
174+
// There is no StreamClosed method. A server doesn't know when a stream is finished, because
175+
// the client can always send a GET with a Last-Event-ID referring to the stream.
176+
}
177+
178+
// A dataList is a list of []byte.
179+
// The zero dataList is ready to use.
180+
type dataList struct {
181+
size int // total size of data bytes
182+
first int // the stream index of the first element in data
183+
data [][]byte
184+
}
185+
186+
func (dl *dataList) appendData(d []byte) {
187+
// If we allowed empty data, we would consume memory without incrementing the size.
188+
// We could of course account for that, but we keep it simple and assume there is no
189+
// empty data.
190+
if len(d) == 0 {
191+
panic("empty data item")
192+
}
193+
dl.data = append(dl.data, d)
194+
dl.size += len(d)
195+
}
196+
197+
// removeFirst removes the first data item in dl, returning the size of the item.
198+
// It panics if dl is empty.
199+
func (dl *dataList) removeFirst() int {
200+
if len(dl.data) == 0 {
201+
panic("empty dataList")
202+
}
203+
r := len(dl.data[0])
204+
dl.size -= r
205+
dl.data[0] = nil // help GC
206+
dl.data = dl.data[1:]
207+
dl.first++
208+
return r
209+
}
210+
211+
// A MemoryEventStore is an [EventStore] backed by memory.
212+
type MemoryEventStore struct {
213+
mu sync.Mutex
214+
maxBytes int // max total size of all data
215+
nBytes int // current total size of all data
216+
store map[string]map[StreamID]*dataList // session ID -> stream ID -> *dataList
217+
}
218+
219+
// MemoryEventStoreOptions are options for a [MemoryEventStore].
220+
type MemoryEventStoreOptions struct{}
221+
222+
// MaxBytes returns the maximum number of bytes that the store will retain before
223+
// purging data.
224+
func (s *MemoryEventStore) MaxBytes() int {
225+
s.mu.Lock()
226+
defer s.mu.Unlock()
227+
return s.maxBytes
228+
}
229+
230+
// SetMaxBytes sets the maximum number of bytes the store will retain before purging
231+
// data. The argument must not be negative. If it is zero, a suitable default will be used.
232+
// SetMaxBytes can be called at any time. The size of the store will be adjusted
233+
// immediately.
234+
func (s *MemoryEventStore) SetMaxBytes(n int) {
235+
s.mu.Lock()
236+
defer s.mu.Unlock()
237+
switch {
238+
case n < 0:
239+
panic("negative argument")
240+
case n == 0:
241+
s.maxBytes = defaultMaxBytes
242+
default:
243+
s.maxBytes = n
244+
}
245+
s.purge()
246+
}
247+
248+
const defaultMaxBytes = 10 << 20 // 10 MiB
249+
250+
// NewMemoryEventStore creates a [MemoryEventStore] with the default value
251+
// for MaxBytes.
252+
func NewMemoryEventStore(opts *MemoryEventStoreOptions) *MemoryEventStore {
253+
return &MemoryEventStore{
254+
maxBytes: defaultMaxBytes,
255+
store: make(map[string]map[StreamID]*dataList),
256+
}
257+
}
258+
259+
// Append implements [EventStore.Append] by recording data in memory.
260+
func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID StreamID, data []byte) error {
261+
s.mu.Lock()
262+
defer s.mu.Unlock()
263+
264+
streamMap, ok := s.store[sessionID]
265+
if !ok {
266+
streamMap = make(map[StreamID]*dataList)
267+
s.store[sessionID] = streamMap
268+
}
269+
dl, ok := streamMap[streamID]
270+
if !ok {
271+
dl = &dataList{}
272+
streamMap[streamID] = dl
273+
}
274+
// Purge before adding, so at least the current data item will be present.
275+
// (That could result in nBytes > maxBytes, but we'll live with that.)
276+
s.purge()
277+
dl.appendData(data)
278+
s.nBytes += len(data)
279+
return nil
280+
}
281+
282+
// ErrEventsPurged is the error that [EventStore.After] should return if the event just after the
283+
// index is no longer available.
284+
var ErrEventsPurged = errors.New("data purged")
285+
286+
// After implements [EventStore.After].
287+
func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID StreamID, index int) iter.Seq2[[]byte, error] {
288+
// Return the data items to yield.
289+
// We must copy, because dataList.removeFirst nils out slice elements.
290+
copyData := func() ([][]byte, error) {
291+
s.mu.Lock()
292+
defer s.mu.Unlock()
293+
streamMap, ok := s.store[sessionID]
294+
if !ok {
295+
return nil, fmt.Errorf("MemoryEventStore.After: unknown session ID %q", sessionID)
296+
}
297+
dl, ok := streamMap[streamID]
298+
if !ok {
299+
return nil, fmt.Errorf("MemoryEventStore.After: unknown stream ID %v in session %q", streamID, sessionID)
300+
}
301+
start := index + 1
302+
if dl.first > start {
303+
return nil, fmt.Errorf("MemoryEventStore.After: index %d, stream ID %v, session %q: %w",
304+
index, streamID, sessionID, ErrEventsPurged)
305+
}
306+
return slices.Clone(dl.data[start-dl.first:]), nil
307+
}
308+
309+
return func(yield func([]byte, error) bool) {
310+
ds, err := copyData()
311+
if err != nil {
312+
yield(nil, err)
313+
return
314+
}
315+
for _, d := range ds {
316+
if !yield(d, nil) {
317+
return
318+
}
319+
}
320+
}
321+
}
322+
323+
// SessionClosed implements [EventStore.SessionClosed].
324+
func (s *MemoryEventStore) SessionClosed(_ context.Context, sessionID string) error {
325+
s.mu.Lock()
326+
defer s.mu.Unlock()
327+
for _, dl := range s.store[sessionID] {
328+
s.nBytes -= dl.size
329+
}
330+
delete(s.store, sessionID)
331+
s.validate()
332+
return nil
333+
}
334+
335+
// purge removes data until no more than s.maxBytes bytes are in use.
336+
// It must be called with s.mu held.
337+
func (s *MemoryEventStore) purge() {
338+
// Remove the first element of every dataList until below the max.
339+
for s.nBytes > s.maxBytes {
340+
changed := false
341+
for _, sm := range s.store {
342+
for _, dl := range sm {
343+
if dl.size > 0 {
344+
r := dl.removeFirst()
345+
if r > 0 {
346+
changed = true
347+
s.nBytes -= r
348+
}
349+
}
350+
}
351+
}
352+
if !changed {
353+
panic("no progress during purge")
354+
}
355+
}
356+
s.validate()
357+
}
358+
359+
// validate checks that the store's data structures are valid.
360+
// It must be called with s.mu held.
361+
func (s *MemoryEventStore) validate() {
362+
if !validateMemoryEventStore {
363+
return
364+
}
365+
// Check that we're accounting for the size correctly.
366+
n := 0
367+
for _, sm := range s.store {
368+
for _, dl := range sm {
369+
for _, d := range dl.data {
370+
n += len(d)
371+
}
372+
}
373+
}
374+
if n != s.nBytes {
375+
panic("sizes don't add up")
376+
}
377+
}
378+
379+
// debugString returns a string containing the state of s.
380+
// Used in tests.
381+
func (s *MemoryEventStore) debugString() string {
382+
s.mu.Lock()
383+
defer s.mu.Unlock()
384+
var b strings.Builder
385+
for i, sess := range slices.Sorted(maps.Keys(s.store)) {
386+
if i > 0 {
387+
fmt.Fprintf(&b, "; ")
388+
}
389+
sm := s.store[sess]
390+
for i, sid := range slices.Sorted(maps.Keys(sm)) {
391+
if i > 0 {
392+
fmt.Fprintf(&b, "; ")
393+
}
394+
dl := sm[sid]
395+
fmt.Fprintf(&b, "%s %d first=%d", sess, sid, dl.first)
396+
for _, d := range dl.data {
397+
fmt.Fprintf(&b, " %s", d)
398+
}
399+
}
400+
}
401+
return b.String()
402+
}

0 commit comments

Comments
 (0)