Skip to content

Commit 43ba8d0

Browse files
committed
Merge remote-tracking branch 'origin/main' into stateless-http
# Conflicts: # mcp/server.go # mcp/streamable.go
2 parents dc777d0 + 8dd9a81 commit 43ba8d0

File tree

9 files changed

+356
-147
lines changed

9 files changed

+356
-147
lines changed

design/design.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,13 +748,26 @@ Server sessions also support the spec methods `ListResources` and `ListResourceT
748748
749749
#### Subscriptions
750750
751-
ClientSessions can manage change notifications on particular resources:
751+
##### Client-Side Usage
752+
753+
Use the Subscribe and Unsubscribe methods on a ClientSession to start or stop receiving updates for a specific resource.
752754
753755
```go
754756
func (*ClientSession) Subscribe(context.Context, *SubscribeParams) error
755757
func (*ClientSession) Unsubscribe(context.Context, *UnsubscribeParams) error
756758
```
757759
760+
To process incoming update notifications, you must provide a ResourceUpdatedHandler in your ClientOptions. The SDK calls this function automatically whenever the server sends a notification for a resource you're subscribed to.
761+
762+
```go
763+
type ClientOptions struct {
764+
...
765+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
766+
}
767+
```
768+
769+
##### Server-Side Implementation
770+
758771
The server does not implement resource subscriptions. It passes along subscription requests to the user, and supplies a method to notify clients of changes. It tracks which sessions have subscribed to which resources so the user doesn't have to.
759772
760773
If a server author wants to support resource subscriptions, they must provide handlers to be called when clients subscribe and unsubscribe. It is an error to provide only one of these handlers.
@@ -772,7 +785,7 @@ type ServerOptions struct {
772785
User code should call `ResourceUpdated` when a subscribed resource changes.
773786
774787
```go
775-
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotification) error
788+
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotificationParams) error
776789
```
777790
778791
The server routes these notifications to the server sessions that subscribed to the resource.

mcp/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ClientOptions struct {
6060
ToolListChangedHandler func(context.Context, *ClientSession, *ToolListChangedParams)
6161
PromptListChangedHandler func(context.Context, *ClientSession, *PromptListChangedParams)
6262
ResourceListChangedHandler func(context.Context, *ClientSession, *ResourceListChangedParams)
63+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
6364
LoggingMessageHandler func(context.Context, *ClientSession, *LoggingMessageParams)
6465
ProgressNotificationHandler func(context.Context, *ClientSession, *ProgressNotificationParams)
6566
// If non-zero, defines an interval for regular "ping" requests.
@@ -293,6 +294,7 @@ var clientMethodInfos = map[string]methodInfo{
293294
notificationToolListChanged: newMethodInfo(clientMethod((*Client).callToolChangedHandler)),
294295
notificationPromptListChanged: newMethodInfo(clientMethod((*Client).callPromptChangedHandler)),
295296
notificationResourceListChanged: newMethodInfo(clientMethod((*Client).callResourceChangedHandler)),
297+
notificationResourceUpdated: newMethodInfo(clientMethod((*Client).callResourceUpdatedHandler)),
296298
notificationLoggingMessage: newMethodInfo(clientMethod((*Client).callLoggingHandler)),
297299
notificationProgress: newMethodInfo(sessionMethod((*ClientSession).callProgressNotificationHandler)),
298300
}
@@ -386,6 +388,20 @@ func (cs *ClientSession) Complete(ctx context.Context, params *CompleteParams) (
386388
return handleSend[*CompleteResult](ctx, cs, methodComplete, orZero[Params](params))
387389
}
388390

391+
// Subscribe sends a "resources/subscribe" request to the server, asking for
392+
// notifications when the specified resource changes.
393+
func (cs *ClientSession) Subscribe(ctx context.Context, params *SubscribeParams) error {
394+
_, err := handleSend[*emptyResult](ctx, cs, methodSubscribe, orZero[Params](params))
395+
return err
396+
}
397+
398+
// Unsubscribe sends a "resources/unsubscribe" request to the server, cancelling
399+
// a previous subscription.
400+
func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribeParams) error {
401+
_, err := handleSend[*emptyResult](ctx, cs, methodUnsubscribe, orZero[Params](params))
402+
return err
403+
}
404+
389405
func (c *Client) callToolChangedHandler(ctx context.Context, s *ClientSession, params *ToolListChangedParams) (Result, error) {
390406
return callNotificationHandler(ctx, c.opts.ToolListChangedHandler, s, params)
391407
}
@@ -398,6 +414,10 @@ func (c *Client) callResourceChangedHandler(ctx context.Context, s *ClientSessio
398414
return callNotificationHandler(ctx, c.opts.ResourceListChangedHandler, s, params)
399415
}
400416

417+
func (c *Client) callResourceUpdatedHandler(ctx context.Context, s *ClientSession, params *ResourceUpdatedNotificationParams) (Result, error) {
418+
return callNotificationHandler(ctx, c.opts.ResourceUpdatedHandler, s, params)
419+
}
420+
401421
func (c *Client) callLoggingHandler(ctx context.Context, cs *ClientSession, params *LoggingMessageParams) (Result, error) {
402422
if h := c.opts.LoggingMessageHandler; h != nil {
403423
h(ctx, cs, params)

mcp/event.go

Lines changed: 19 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,9 @@ func scanEvents(r io.Reader) iter.Seq2[Event, error] {
153153
//
154154
// All of an EventStore's methods must be safe for use by multiple goroutines.
155155
type EventStore interface {
156-
// AppendEvent appends data for an outgoing event to given stream, which is part of the
157-
// given session. It returns the index of the event in the stream, suitable for constructing
158-
// an event ID to send to the client.
159-
AppendEvent(_ context.Context, sessionID string, _ StreamID, data []byte) (int, error)
156+
// Append appends data for an outgoing event to given stream, which is part of the
157+
// given session.
158+
Append(_ context.Context, sessionID string, _ StreamID, data []byte) error
160159

161160
// After returns an iterator over the data for the given session and stream, beginning
162161
// just after the given index.
@@ -165,16 +164,15 @@ type EventStore interface {
165164
// dropped; it must not return partial results.
166165
After(_ context.Context, sessionID string, _ StreamID, index int) iter.Seq2[[]byte, error]
167166

168-
// StreamClosed informs the store that the given stream is finished.
169-
// A store cannot rely on this method being called for cleanup. It should institute
170-
// additional mechanisms, such as timeouts, to reclaim storage.
171-
StreamClosed(_ context.Context, sessionID string, streamID StreamID) error
172-
173167
// SessionClosed informs the store that the given session is finished, along
174168
// with all of its streams.
175169
// A store cannot rely on this method being called for cleanup. It should institute
176170
// additional mechanisms, such as timeouts, to reclaim storage.
171+
//
177172
SessionClosed(_ context.Context, sessionID string) error
173+
174+
// There is no StreamClosed method. A server doesn't know when a stream is finished, because
175+
// the client can always send a GET with a Last-Event-ID referring to the stream.
178176
}
179177

180178
// A dataList is a list of []byte.
@@ -210,15 +208,6 @@ func (dl *dataList) removeFirst() int {
210208
return r
211209
}
212210

213-
// lastIndex returns the index of the last data item in dl.
214-
// It panics if there are none.
215-
func (dl *dataList) lastIndex() int {
216-
if len(dl.data) == 0 {
217-
panic("empty dataList")
218-
}
219-
return dl.first + len(dl.data) - 1
220-
}
221-
222211
// A MemoryEventStore is an [EventStore] backed by memory.
223212
type MemoryEventStore struct {
224213
mu sync.Mutex
@@ -267,9 +256,8 @@ func NewMemoryEventStore(opts *MemoryEventStoreOptions) *MemoryEventStore {
267256
}
268257
}
269258

270-
// AppendEvent implements [EventStore.AppendEvent] by recording data
271-
// in memory.
272-
func (s *MemoryEventStore) AppendEvent(_ context.Context, sessionID string, streamID StreamID, data []byte) (int, error) {
259+
// Append implements [EventStore.Append] by recording data in memory.
260+
func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID StreamID, data []byte) error {
273261
s.mu.Lock()
274262
defer s.mu.Unlock()
275263

@@ -288,9 +276,13 @@ func (s *MemoryEventStore) AppendEvent(_ context.Context, sessionID string, stre
288276
s.purge()
289277
dl.appendData(data)
290278
s.nBytes += len(data)
291-
return dl.lastIndex(), nil
279+
return nil
292280
}
293281

282+
// ErrEventsPurged is the error that [EventStore.After] should return if the event just after the
283+
// index is no longer available.
284+
var ErrEventsPurged = errors.New("data purged")
285+
294286
// After implements [EventStore.After].
295287
func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID StreamID, index int) iter.Seq2[[]byte, error] {
296288
// Return the data items to yield.
@@ -306,10 +298,12 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S
306298
if !ok {
307299
return nil, fmt.Errorf("MemoryEventStore.After: unknown stream ID %v in session %q", streamID, sessionID)
308300
}
309-
if dl.first > index {
310-
return nil, fmt.Errorf("MemoryEventStore.After: data purged at index %d, stream ID %v, session %q", index, streamID, sessionID)
301+
start := index + 1
302+
if dl.first > start {
303+
return nil, fmt.Errorf("MemoryEventStore.After: index %d, stream ID %v, session %q: %w",
304+
index, streamID, sessionID, ErrEventsPurged)
311305
}
312-
return slices.Clone(dl.data[index-dl.first:]), nil
306+
return slices.Clone(dl.data[start-dl.first:]), nil
313307
}
314308

315309
return func(yield func([]byte, error) bool) {
@@ -326,26 +320,6 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S
326320
}
327321
}
328322

329-
// StreamClosed implements [EventStore.StreamClosed].
330-
func (s *MemoryEventStore) StreamClosed(_ context.Context, sessionID string, streamID StreamID) error {
331-
if sessionID == "" {
332-
panic("empty sessionID")
333-
}
334-
335-
s.mu.Lock()
336-
defer s.mu.Unlock()
337-
338-
sm := s.store[sessionID]
339-
dl := sm[streamID]
340-
s.nBytes -= dl.size
341-
delete(sm, streamID)
342-
if len(sm) == 0 {
343-
delete(s.store, sessionID)
344-
}
345-
s.validate()
346-
return nil
347-
}
348-
349323
// SessionClosed implements [EventStore.SessionClosed].
350324
func (s *MemoryEventStore) SessionClosed(_ context.Context, sessionID string) error {
351325
s.mu.Lock()

mcp/event_test.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestMemoryEventStoreState(t *testing.T) {
105105
ctx := context.Background()
106106

107107
appendEvent := func(s *MemoryEventStore, sess string, str StreamID, data string) {
108-
if _, err := s.AppendEvent(ctx, sess, str, []byte(data)); err != nil {
108+
if err := s.Append(ctx, sess, str, []byte(data)); err != nil {
109109
t.Fatal(err)
110110
}
111111
}
@@ -127,18 +127,6 @@ func TestMemoryEventStoreState(t *testing.T) {
127127
"S1 1 first=0 d1 d3; S1 2 first=0 d2; S2 8 first=0 d4",
128128
8,
129129
},
130-
{
131-
"stream close",
132-
func(s *MemoryEventStore) {
133-
appendEvent(s, "S1", 1, "d1")
134-
appendEvent(s, "S1", 2, "d2")
135-
appendEvent(s, "S1", 1, "d3")
136-
appendEvent(s, "S2", 8, "d4")
137-
s.StreamClosed(ctx, "S1", 1)
138-
},
139-
"S1 2 first=0 d2; S2 8 first=0 d4",
140-
4,
141-
},
142130
{
143131
"session close",
144132
func(s *MemoryEventStore) {
@@ -218,10 +206,10 @@ func TestMemoryEventStoreAfter(t *testing.T) {
218206
ctx := context.Background()
219207
s := NewMemoryEventStore(nil)
220208
s.SetMaxBytes(4)
221-
s.AppendEvent(ctx, "S1", 1, []byte("d1"))
222-
s.AppendEvent(ctx, "S1", 1, []byte("d2"))
223-
s.AppendEvent(ctx, "S1", 1, []byte("d3"))
224-
s.AppendEvent(ctx, "S1", 2, []byte("d4")) // will purge d1
209+
s.Append(ctx, "S1", 1, []byte("d1"))
210+
s.Append(ctx, "S1", 1, []byte("d2"))
211+
s.Append(ctx, "S1", 1, []byte("d3"))
212+
s.Append(ctx, "S1", 2, []byte("d4")) // will purge d1
225213
want := "S1 1 first=1 d2 d3; S1 2 first=0 d4"
226214
if got := s.debugString(); got != want {
227215
t.Fatalf("got state %q, want %q", got, want)
@@ -234,10 +222,10 @@ func TestMemoryEventStoreAfter(t *testing.T) {
234222
want []string
235223
wantErr string // if non-empty, error should contain this string
236224
}{
237-
{"S1", 1, 0, nil, "purge"},
238-
{"S1", 1, 1, []string{"d2", "d3"}, ""},
239-
{"S1", 1, 2, []string{"d3"}, ""},
240-
{"S1", 2, 0, []string{"d4"}, ""},
225+
{"S1", 1, 0, []string{"d2", "d3"}, ""},
226+
{"S1", 1, 1, []string{"d3"}, ""},
227+
{"S1", 1, 2, nil, ""},
228+
{"S1", 2, 0, nil, ""},
241229
{"S1", 3, 0, nil, "unknown stream ID"},
242230
{"S2", 0, 0, nil, "unknown session ID"},
243231
} {

mcp/mcp_test.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestEndToEnd(t *testing.T) {
6060

6161
// Channels to check if notification callbacks happened.
6262
notificationChans := map[string]chan int{}
63-
for _, name := range []string{"initialized", "roots", "tools", "prompts", "resources", "progress_server", "progress_client"} {
63+
for _, name := range []string{"initialized", "roots", "tools", "prompts", "resources", "progress_server", "progress_client", "resource_updated", "subscribe", "unsubscribe"} {
6464
notificationChans[name] = make(chan int, 1)
6565
}
6666
waitForNotification := func(t *testing.T, name string) {
@@ -78,6 +78,14 @@ func TestEndToEnd(t *testing.T) {
7878
ProgressNotificationHandler: func(context.Context, *ServerSession, *ProgressNotificationParams) {
7979
notificationChans["progress_server"] <- 0
8080
},
81+
SubscribeHandler: func(context.Context, *SubscribeParams) error {
82+
notificationChans["subscribe"] <- 0
83+
return nil
84+
},
85+
UnsubscribeHandler: func(context.Context, *UnsubscribeParams) error {
86+
notificationChans["unsubscribe"] <- 0
87+
return nil
88+
},
8189
}
8290
s := NewServer(testImpl, sopts)
8391
AddTool(s, &Tool{
@@ -128,6 +136,9 @@ func TestEndToEnd(t *testing.T) {
128136
ProgressNotificationHandler: func(context.Context, *ClientSession, *ProgressNotificationParams) {
129137
notificationChans["progress_client"] <- 0
130138
},
139+
ResourceUpdatedHandler: func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams) {
140+
notificationChans["resource_updated"] <- 0
141+
},
131142
}
132143
c := NewClient(testImpl, opts)
133144
rootAbs, err := filepath.Abs(filepath.FromSlash("testdata/files"))
@@ -421,6 +432,37 @@ func TestEndToEnd(t *testing.T) {
421432
waitForNotification(t, "progress_server")
422433
})
423434

435+
t.Run("resource_subscriptions", func(t *testing.T) {
436+
err := cs.Subscribe(ctx, &SubscribeParams{
437+
URI: "test",
438+
})
439+
if err != nil {
440+
t.Fatal(err)
441+
}
442+
waitForNotification(t, "subscribe")
443+
s.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{
444+
URI: "test",
445+
})
446+
waitForNotification(t, "resource_updated")
447+
err = cs.Unsubscribe(ctx, &UnsubscribeParams{
448+
URI: "test",
449+
})
450+
if err != nil {
451+
t.Fatal(err)
452+
}
453+
waitForNotification(t, "unsubscribe")
454+
455+
// Verify the client does not receive the update after unsubscribing.
456+
s.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{
457+
URI: "test",
458+
})
459+
select {
460+
case <-notificationChans["resource_updated"]:
461+
t.Fatalf("resource updated after unsubscription")
462+
case <-time.After(time.Second):
463+
}
464+
})
465+
424466
// Disconnect.
425467
cs.Close()
426468
clientWG.Wait()
@@ -617,7 +659,7 @@ func TestCancellation(t *testing.T) {
617659
return nil, nil
618660
}
619661
_, cs := basicConnection(t, func(s *Server) {
620-
s.AddTool(&Tool{Name: "slow"}, slowRequest)
662+
s.AddTool(&Tool{Name: "slow", InputSchema: &jsonschema.Schema{}}, slowRequest)
621663
})
622664
defer cs.Close()
623665

mcp/protocol.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,38 @@ type ToolListChangedParams struct {
859859
func (x *ToolListChangedParams) GetProgressToken() any { return getProgressToken(x) }
860860
func (x *ToolListChangedParams) SetProgressToken(t any) { setProgressToken(x, t) }
861861

862+
// Sent from the client to request resources/updated notifications from the
863+
// server whenever a particular resource changes.
864+
type SubscribeParams struct {
865+
// This property is reserved by the protocol to allow clients and servers to
866+
// attach additional metadata to their responses.
867+
Meta `json:"_meta,omitempty"`
868+
// The URI of the resource to subscribe to.
869+
URI string `json:"uri"`
870+
}
871+
872+
// Sent from the client to request cancellation of resources/updated
873+
// notifications from the server. This should follow a previous
874+
// resources/subscribe request.
875+
type UnsubscribeParams struct {
876+
// This property is reserved by the protocol to allow clients and servers to
877+
// attach additional metadata to their responses.
878+
Meta `json:"_meta,omitempty"`
879+
// The URI of the resource to unsubscribe from.
880+
URI string `json:"uri"`
881+
}
882+
883+
// A notification from the server to the client, informing it that a resource
884+
// has changed and may need to be read again. This should only be sent if the
885+
// client previously sent a resources/subscribe request.
886+
type ResourceUpdatedNotificationParams struct {
887+
// This property is reserved by the protocol to allow clients and servers to
888+
// attach additional metadata to their responses.
889+
Meta `json:"_meta,omitempty"`
890+
// The URI of the resource that has been updated. This might be a sub-resource of the one that the client actually subscribed to.
891+
URI string `json:"uri"`
892+
}
893+
862894
// TODO(jba): add CompleteRequest and related types.
863895

864896
// TODO(jba): add ElicitRequest and related types.

0 commit comments

Comments
 (0)