Skip to content

Commit 3fc6125

Browse files
committed
mcp: add resource subscriptions
This CL adds the ability for clients to subscribe and receive updates for resources as described in https://modelcontextprotocol.io/specification/2025-06-18/server/resources#subscriptions
1 parent de4b788 commit 3fc6125

File tree

6 files changed

+213
-5
lines changed

6 files changed

+213
-5
lines changed

design/design.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,13 +748,26 @@ Server sessions also support the spec methods `ListResources` and `ListResourceT
748748
749749
#### Subscriptions
750750
751-
ClientSessions can manage change notifications on particular resources:
751+
##### Client-Side Usage
752+
753+
Use the Subscribe and Unsubscribe methods on a ClientSession to start or stop receiving updates for a specific resource.
752754
753755
```go
754756
func (*ClientSession) Subscribe(context.Context, *SubscribeParams) error
755757
func (*ClientSession) Unsubscribe(context.Context, *UnsubscribeParams) error
756758
```
757759
760+
To process incoming update notifications, you must provide a ResourceUpdatedHandler in your ClientOptions. The SDK calls this function automatically whenever the server sends a notification for a resource you're subscribed to.
761+
762+
```go
763+
type ClientOptions struct {
764+
...
765+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
766+
}
767+
```
768+
769+
##### Server-Side Implementation
770+
758771
The server does not implement resource subscriptions. It passes along subscription requests to the user, and supplies a method to notify clients of changes. It tracks which sessions have subscribed to which resources so the user doesn't have to.
759772
760773
If a server author wants to support resource subscriptions, they must provide handlers to be called when clients subscribe and unsubscribe. It is an error to provide only one of these handlers.
@@ -772,7 +785,7 @@ type ServerOptions struct {
772785
User code should call `ResourceUpdated` when a subscribed resource changes.
773786
774787
```go
775-
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotification) error
788+
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotificationParams) error
776789
```
777790
778791
The server routes these notifications to the server sessions that subscribed to the resource.

mcp/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ClientOptions struct {
6060
ToolListChangedHandler func(context.Context, *ClientSession, *ToolListChangedParams)
6161
PromptListChangedHandler func(context.Context, *ClientSession, *PromptListChangedParams)
6262
ResourceListChangedHandler func(context.Context, *ClientSession, *ResourceListChangedParams)
63+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
6364
LoggingMessageHandler func(context.Context, *ClientSession, *LoggingMessageParams)
6465
ProgressNotificationHandler func(context.Context, *ClientSession, *ProgressNotificationParams)
6566
// If non-zero, defines an interval for regular "ping" requests.
@@ -293,6 +294,7 @@ var clientMethodInfos = map[string]methodInfo{
293294
notificationToolListChanged: newMethodInfo(clientMethod((*Client).callToolChangedHandler)),
294295
notificationPromptListChanged: newMethodInfo(clientMethod((*Client).callPromptChangedHandler)),
295296
notificationResourceListChanged: newMethodInfo(clientMethod((*Client).callResourceChangedHandler)),
297+
notificationResourceUpdated: newMethodInfo(clientMethod((*Client).callResourceUpdatedHandler)),
296298
notificationLoggingMessage: newMethodInfo(clientMethod((*Client).callLoggingHandler)),
297299
notificationProgress: newMethodInfo(sessionMethod((*ClientSession).callProgressNotificationHandler)),
298300
}
@@ -386,6 +388,20 @@ func (cs *ClientSession) Complete(ctx context.Context, params *CompleteParams) (
386388
return handleSend[*CompleteResult](ctx, cs, methodComplete, orZero[Params](params))
387389
}
388390

391+
// Subscribe sends a "resources/subscribe" request to the server, asking for
392+
// notifications when the specified resource changes.
393+
func (cs *ClientSession) Subscribe(ctx context.Context, params *SubscribeParams) error {
394+
_, err := handleSend[*emptyResult](ctx, cs, methodSubscribe, orZero[Params](params))
395+
return err
396+
}
397+
398+
// Unsubscribe sends a "resources/unsubscribe" request to the server, cancelling
399+
// a previous subscription.
400+
func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribeParams) error {
401+
_, err := handleSend[*emptyResult](ctx, cs, methodUnsubscribe, orZero[Params](params))
402+
return err
403+
}
404+
389405
func (c *Client) callToolChangedHandler(ctx context.Context, s *ClientSession, params *ToolListChangedParams) (Result, error) {
390406
return callNotificationHandler(ctx, c.opts.ToolListChangedHandler, s, params)
391407
}
@@ -398,6 +414,10 @@ func (c *Client) callResourceChangedHandler(ctx context.Context, s *ClientSessio
398414
return callNotificationHandler(ctx, c.opts.ResourceListChangedHandler, s, params)
399415
}
400416

417+
func (c *Client) callResourceUpdatedHandler(ctx context.Context, s *ClientSession, params *ResourceUpdatedNotificationParams) (Result, error) {
418+
return callNotificationHandler(ctx, c.opts.ResourceUpdatedHandler, s, params)
419+
}
420+
401421
func (c *Client) callLoggingHandler(ctx context.Context, cs *ClientSession, params *LoggingMessageParams) (Result, error) {
402422
if h := c.opts.LoggingMessageHandler; h != nil {
403423
h(ctx, cs, params)

mcp/mcp_test.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestEndToEnd(t *testing.T) {
6060

6161
// Channels to check if notification callbacks happened.
6262
notificationChans := map[string]chan int{}
63-
for _, name := range []string{"initialized", "roots", "tools", "prompts", "resources", "progress_server", "progress_client"} {
63+
for _, name := range []string{"initialized", "roots", "tools", "prompts", "resources", "progress_server", "progress_client", "resource_updated", "subscribe", "unsubscribe"} {
6464
notificationChans[name] = make(chan int, 1)
6565
}
6666
waitForNotification := func(t *testing.T, name string) {
@@ -78,6 +78,14 @@ func TestEndToEnd(t *testing.T) {
7878
ProgressNotificationHandler: func(context.Context, *ServerSession, *ProgressNotificationParams) {
7979
notificationChans["progress_server"] <- 0
8080
},
81+
SubscribeHandler: func(context.Context, *SubscribeParams) error {
82+
notificationChans["subscribe"] <- 0
83+
return nil
84+
},
85+
UnsubscribeHandler: func(context.Context, *UnsubscribeParams) error {
86+
notificationChans["unsubscribe"] <- 0
87+
return nil
88+
},
8189
}
8290
s := NewServer(testImpl, sopts)
8391
AddTool(s, &Tool{
@@ -128,6 +136,9 @@ func TestEndToEnd(t *testing.T) {
128136
ProgressNotificationHandler: func(context.Context, *ClientSession, *ProgressNotificationParams) {
129137
notificationChans["progress_client"] <- 0
130138
},
139+
ResourceUpdatedHandler: func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams) {
140+
notificationChans["resource_updated"] <- 0
141+
},
131142
}
132143
c := NewClient(testImpl, opts)
133144
rootAbs, err := filepath.Abs(filepath.FromSlash("testdata/files"))
@@ -421,6 +432,37 @@ func TestEndToEnd(t *testing.T) {
421432
waitForNotification(t, "progress_server")
422433
})
423434

435+
t.Run("resource_subscriptions", func(t *testing.T) {
436+
err := cs.Subscribe(ctx, &SubscribeParams{
437+
URI: "test",
438+
})
439+
if err != nil {
440+
t.Fatal(err)
441+
}
442+
waitForNotification(t, "subscribe")
443+
s.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{
444+
URI: "test",
445+
})
446+
waitForNotification(t, "resource_updated")
447+
err = cs.Unsubscribe(ctx, &UnsubscribeParams{
448+
URI: "test",
449+
})
450+
if err != nil {
451+
t.Fatal(err)
452+
}
453+
waitForNotification(t, "unsubscribe")
454+
455+
// Verify the client does not receive the update after unsubscribing.
456+
s.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{
457+
URI: "test",
458+
})
459+
select {
460+
case <-notificationChans["resource_updated"]:
461+
t.Fatalf("resource updated after unsubscription")
462+
case <-time.After(time.Second):
463+
}
464+
})
465+
424466
// Disconnect.
425467
cs.Close()
426468
clientWG.Wait()

mcp/protocol.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,38 @@ type ToolListChangedParams struct {
859859
func (x *ToolListChangedParams) GetProgressToken() any { return getProgressToken(x) }
860860
func (x *ToolListChangedParams) SetProgressToken(t any) { setProgressToken(x, t) }
861861

862+
// Sent from the client to request resources/updated notifications from the
863+
// server whenever a particular resource changes.
864+
type SubscribeParams struct {
865+
// This property is reserved by the protocol to allow clients and servers to
866+
// attach additional metadata to their responses.
867+
Meta `json:"_meta,omitempty"`
868+
// The URI of the resource to subscribe to.
869+
URI string `json:"uri"`
870+
}
871+
872+
// Sent from the client to request cancellation of resources/updated
873+
// notifications from the server. This should follow a previous
874+
// resources/subscribe request.
875+
type UnsubscribeParams struct {
876+
// This property is reserved by the protocol to allow clients and servers to
877+
// attach additional metadata to their responses.
878+
Meta `json:"_meta,omitempty"`
879+
// The URI of the resource to unsubscribe to.
880+
URI string `json:"uri"`
881+
}
882+
883+
// A notification from the server to the client, informing it that a resource
884+
// has changed and may need to be read again. This should only be sent if the
885+
// client previously sent a resources/subscribe request.
886+
type ResourceUpdatedNotificationParams struct {
887+
// This property is reserved by the protocol to allow clients and servers to
888+
// attach additional metadata to their responses.
889+
Meta `json:"_meta,omitempty"`
890+
// The URI of the resource that has been updated. This might be a sub-resource of the one that the client actually subscribed to.
891+
URI string `json:"uri"`
892+
}
893+
862894
// TODO(jba): add CompleteRequest and related types.
863895

864896
// TODO(jba): add ElicitRequest and related types.

mcp/server.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Server struct {
4343
sessions []*ServerSession
4444
sendingMethodHandler_ MethodHandler[*ServerSession]
4545
receivingMethodHandler_ MethodHandler[*ServerSession]
46+
resourceSubscriptions map[string][]*ServerSession // uri -> session
4647
}
4748

4849
// ServerOptions is used to configure behavior of the server.
@@ -64,6 +65,10 @@ type ServerOptions struct {
6465
// If the peer fails to respond to pings originating from the keepalive check,
6566
// the session is automatically closed.
6667
KeepAlive time.Duration
68+
// Function called when a client session subscribes to a resource.
69+
SubscribeHandler func(context.Context, *SubscribeParams) error
70+
// Function called when a client session unsubscribes from a resource.
71+
UnsubscribeHandler func(context.Context, *UnsubscribeParams) error
6772
}
6873

6974
// NewServer creates a new MCP server. The resulting server has no features:
@@ -88,6 +93,12 @@ func NewServer(impl *Implementation, opts *ServerOptions) *Server {
8893
if opts.PageSize == 0 {
8994
opts.PageSize = DefaultPageSize
9095
}
96+
if opts.SubscribeHandler != nil && opts.UnsubscribeHandler == nil {
97+
panic("SubscribeHandler requires UnsubscribeHandler")
98+
}
99+
if opts.UnsubscribeHandler != nil && opts.SubscribeHandler == nil {
100+
panic("UnsubscribeHandler requires SubscribeHandler")
101+
}
91102
return &Server{
92103
impl: impl,
93104
opts: *opts,
@@ -97,6 +108,7 @@ func NewServer(impl *Implementation, opts *ServerOptions) *Server {
97108
resourceTemplates: newFeatureSet(func(t *serverResourceTemplate) string { return t.resourceTemplate.URITemplate }),
98109
sendingMethodHandler_: defaultSendingMethodHandler[*ServerSession],
99110
receivingMethodHandler_: defaultReceivingMethodHandler[*ServerSession],
111+
resourceSubscriptions: make(map[string][]*ServerSession),
100112
}
101113
}
102114

@@ -224,6 +236,12 @@ func (s *Server) capabilities() *serverCapabilities {
224236
if s.resources.len() > 0 || s.resourceTemplates.len() > 0 {
225237
caps.Resources = &resourceCapabilities{ListChanged: true}
226238
}
239+
if s.opts.SubscribeHandler != nil {
240+
if caps.Resources == nil {
241+
caps.Resources = &resourceCapabilities{}
242+
}
243+
caps.Resources.Subscribe = true
244+
}
227245
return caps
228246
}
229247

@@ -426,6 +444,55 @@ func fileResourceHandler(dir string) ResourceHandler {
426444
}
427445
}
428446

447+
func (s *Server) ResourceUpdated(ctx context.Context, params *ResourceUpdatedNotificationParams) error {
448+
s.mu.Lock()
449+
sessions := slices.Clone(s.resourceSubscriptions[params.URI])
450+
s.mu.Unlock()
451+
if len(sessions) == 0 {
452+
return nil
453+
}
454+
notifySessions(sessions, notificationResourceUpdated, params)
455+
return nil
456+
}
457+
458+
func (s *Server) subscribe(ctx context.Context, ss *ServerSession, params *SubscribeParams) (*emptyResult, error) {
459+
if s.opts.SubscribeHandler == nil {
460+
return nil, fmt.Errorf("%w: server does not support resource subscriptions", jsonrpc2.ErrMethodNotFound)
461+
}
462+
if err := s.opts.SubscribeHandler(ctx, params); err != nil {
463+
return nil, err
464+
}
465+
s.mu.Lock()
466+
defer s.mu.Unlock()
467+
uri := params.URI
468+
subscribers := s.resourceSubscriptions[uri]
469+
if !slices.Contains(subscribers, ss) {
470+
s.resourceSubscriptions[uri] = append(subscribers, ss)
471+
}
472+
return &emptyResult{}, nil
473+
}
474+
475+
func (s *Server) unsubscribe(ctx context.Context, ss *ServerSession, params *UnsubscribeParams) (*emptyResult, error) {
476+
if s.opts.UnsubscribeHandler == nil {
477+
return nil, jsonrpc2.ErrMethodNotFound
478+
}
479+
480+
if err := s.opts.UnsubscribeHandler(ctx, params); err != nil {
481+
return nil, err
482+
}
483+
484+
s.mu.Lock()
485+
defer s.mu.Unlock()
486+
487+
uri := params.URI
488+
if sessions, ok := s.resourceSubscriptions[uri]; ok {
489+
s.resourceSubscriptions[uri] = slices.DeleteFunc(sessions, func(s *ServerSession) bool {
490+
return s == ss
491+
})
492+
}
493+
return &emptyResult{}, nil
494+
}
495+
429496
// Run runs the server over the given transport, which must be persistent.
430497
//
431498
// Run blocks until the client terminates the connection or the provided
@@ -473,6 +540,11 @@ func (s *Server) disconnect(cc *ServerSession) {
473540
s.sessions = slices.DeleteFunc(s.sessions, func(cc2 *ServerSession) bool {
474541
return cc2 == cc
475542
})
543+
for uri, sessions := range s.resourceSubscriptions {
544+
s.resourceSubscriptions[uri] = slices.DeleteFunc(sessions, func(cc2 *ServerSession) bool {
545+
return cc2 == cc
546+
})
547+
}
476548
}
477549

478550
// Connect connects the MCP server over the given transport and starts handling
@@ -614,6 +686,8 @@ var serverMethodInfos = map[string]methodInfo{
614686
methodListResourceTemplates: newMethodInfo(serverMethod((*Server).listResourceTemplates)),
615687
methodReadResource: newMethodInfo(serverMethod((*Server).readResource)),
616688
methodSetLevel: newMethodInfo(sessionMethod((*ServerSession).setLevel)),
689+
methodSubscribe: newMethodInfo(serverMethod((*Server).subscribe)),
690+
methodUnsubscribe: newMethodInfo(serverMethod((*Server).unsubscribe)),
617691
notificationInitialized: newMethodInfo(serverMethod((*Server).callInitializedHandler)),
618692
notificationRootsListChanged: newMethodInfo(serverMethod((*Server).callRootsListChangedHandler)),
619693
notificationProgress: newMethodInfo(sessionMethod((*ServerSession).callProgressNotificationHandler)),

mcp/server_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package mcp
66

77
import (
8+
"context"
89
"log"
910
"slices"
1011
"testing"
@@ -232,6 +233,7 @@ func TestServerCapabilities(t *testing.T) {
232233
testCases := []struct {
233234
name string
234235
configureServer func(s *Server)
236+
serverOpts ServerOptions
235237
wantCapabilities *serverCapabilities
236238
}{
237239
{
@@ -275,6 +277,23 @@ func TestServerCapabilities(t *testing.T) {
275277
Resources: &resourceCapabilities{ListChanged: true},
276278
},
277279
},
280+
{
281+
name: "With resource subscriptions",
282+
configureServer: func(s *Server) {},
283+
serverOpts: ServerOptions{
284+
SubscribeHandler: func(ctx context.Context, sp *SubscribeParams) error {
285+
return nil
286+
},
287+
UnsubscribeHandler: func(ctx context.Context, up *UnsubscribeParams) error {
288+
return nil
289+
},
290+
},
291+
wantCapabilities: &serverCapabilities{
292+
Completions: &completionCapabilities{},
293+
Logging: &loggingCapabilities{},
294+
Resources: &resourceCapabilities{Subscribe: true},
295+
},
296+
},
278297
{
279298
name: "With tools",
280299
configureServer: func(s *Server) {
@@ -294,19 +313,27 @@ func TestServerCapabilities(t *testing.T) {
294313
s.AddResourceTemplate(&ResourceTemplate{URITemplate: "file:///rt"}, nil)
295314
s.AddTool(&Tool{Name: "t"}, nil)
296315
},
316+
serverOpts: ServerOptions{
317+
SubscribeHandler: func(ctx context.Context, sp *SubscribeParams) error {
318+
return nil
319+
},
320+
UnsubscribeHandler: func(ctx context.Context, up *UnsubscribeParams) error {
321+
return nil
322+
},
323+
},
297324
wantCapabilities: &serverCapabilities{
298325
Completions: &completionCapabilities{},
299326
Logging: &loggingCapabilities{},
300327
Prompts: &promptCapabilities{ListChanged: true},
301-
Resources: &resourceCapabilities{ListChanged: true},
328+
Resources: &resourceCapabilities{ListChanged: true, Subscribe: true},
302329
Tools: &toolCapabilities{ListChanged: true},
303330
},
304331
},
305332
}
306333

307334
for _, tc := range testCases {
308335
t.Run(tc.name, func(t *testing.T) {
309-
server := NewServer(testImpl, nil)
336+
server := NewServer(testImpl, &tc.serverOpts)
310337
tc.configureServer(server)
311338
gotCapabilities := server.capabilities()
312339
if diff := cmp.Diff(tc.wantCapabilities, gotCapabilities); diff != "" {

0 commit comments

Comments
 (0)