Skip to content

Commit badcba0

Browse files
committed
Advertise subscription capabilities era-aware and apply review feedback
The big one: MCPServer served subscriptions/listen but still advertised listChanged: false / subscribe: false everywhere, so a spec-following client - which capability-gates its listen filter - would never subscribe, making the publish surface unreachable. get_capabilities now takes an optional protocol_version: at 2026-07-28+ the listChanged and resources.subscribe bits derive from whether subscriptions/listen is served (that is what those bits mean on a wire whose only delivery channel is the listen stream); the handshake-era derivation is unchanged and remains the default. server/discover passes the request's version. The everything-server gains test_trigger_tool_change / test_trigger_prompt_change diagnostic tools (backed by a new MCPServer.remove_prompt mirroring remove_tool), so the conformance list-changed SHOULD checks now run and pass: server-stateless is 30/30. Review follow-ups: - The bus is no longer exposed as a public MCPServer property; Context receives it at construction and the notify_* methods publish through it directly. Publishing from outside a request means keeping a reference to the bus you constructed. - json_response=True no longer hangs subscriptions/listen: a listen response is a notification stream, so it takes the SSE path regardless of the JSON-response preference (TypeScript/Go parity). - ListenHandler gains max_subscriptions (rejected pre-ack past the cap) and max_buffered_events (a stream whose client stopped reading is ended at the cap; the client re-listens - no replay means the backlog was already lossy). - Honored resource URIs are matched via a frozenset instead of a list scan on every publish. - InMemorySubscriptionBus isolates raising listeners (logged + skipped) so one bad listener cannot starve the others or fail the publisher. - close() docs now say it initiates closure; each stream flushes from its own handler task. - Story and docs corrections: cancelling the parked listen request only releases the local task over HTTP today (the stream ends with the connection); capability docs/tests updated for the era-aware bits; stories index row promoted; small example hardening.
1 parent a06581f commit badcba0

21 files changed

Lines changed: 406 additions & 77 deletions

File tree

docs/advanced/subscriptions.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class RedisSubscriptionBus:
5757
mcp = MCPServer("Notebook", subscriptions=RedisSubscriptionBus(...))
5858
```
5959

60-
The bus carries typed `ServerEvent` values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. The same instance is reachable as `mcp.subscriptions`, which is also how you publish from outside a request: `await mcp.subscriptions.publish(ToolsListChanged())`.
60+
The bus carries typed `ServerEvent` values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. To publish from outside a request, keep a reference to the bus you constructed and `await bus.publish(ToolsListChanged())` — the server holds the same instance.
6161

6262
## The low-level composition
6363

docs/client/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ The resource verbs come in pairs: two ways to list, one way to read.
145145

146146
`read_resource` returns `contents`, a list of `TextResourceContents` or `BlobResourceContents`. Same idea as tool content: narrow with `isinstance`, then read `.text` (or `.blob`).
147147

148-
A client can also **subscribe** to a resource and be told when it changes: `subscribe_resource(uri)` and `unsubscribe_resource(uri)`, same shape as everything else here. `MCPServer` doesn't implement that half. It says so up front (`server_capabilities.resources.subscribe` is `False`) and answers the request with an `MCPError`: `-32601`, *Method not found*. A server that does support subscriptions is built on the low-level `Server` (**[The low-level Server](../advanced/low-level-server.md)**).
148+
A client can also be told when a resource changes. On 2025-era connections that is `subscribe_resource(uri)` / `unsubscribe_resource(uri)` - a method pair `MCPServer` doesn't implement, so on the 2026-07-28 wire (where those verbs no longer exist) the request answers `-32601`, *Method not found*. The 2026 replacement is a `subscriptions/listen` stream, which `MCPServer` *does* serve - `server_capabilities.resources.subscribe` is `True` there, and the server side of the story is **[Subscriptions](../advanced/subscriptions.md)**.
149149

150150
## Prompts
151151

docs/tutorial/first-steps.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ asyncio.run(main())
9797
```
9898

9999
```text
100-
{'prompts': {'list_changed': False}, 'resources': {'subscribe': False, 'list_changed': False}, 'tools': {'list_changed': False}}
100+
{'prompts': {'list_changed': True}, 'resources': {'subscribe': True, 'list_changed': True}, 'tools': {'list_changed': True}}
101101
```
102102

103103
That dictionary is the server's half of the handshake:

examples/servers/everything-server/mcp_everything_server/server.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import click
1717
from mcp.server import ServerRequestContext
1818
from mcp.server.mcpserver import Context, MCPServer
19-
from mcp.server.mcpserver.prompts.base import UserMessage
19+
from mcp.server.mcpserver.prompts.base import Prompt, UserMessage
2020
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
2121
from mcp.shared.exceptions import MCPError
2222
from mcp_types import (
@@ -603,6 +603,34 @@ async def test_reconnection(ctx: Context) -> str:
603603
return "Reconnection test completed"
604604

605605

606+
def _dynamic_tool() -> str:
607+
"""A tool registered and removed by test_trigger_tool_change."""
608+
return "dynamic"
609+
610+
611+
def _dynamic_prompt() -> str:
612+
"""A prompt registered and removed by test_trigger_prompt_change."""
613+
return "dynamic"
614+
615+
616+
@mcp.tool()
617+
async def test_trigger_tool_change(ctx: Context) -> str:
618+
"""Mutates the tool list and announces it to subscriptions/listen streams (SEP-2575)"""
619+
mcp.add_tool(_dynamic_tool, name="test_dynamic_tool")
620+
mcp.remove_tool("test_dynamic_tool")
621+
await ctx.notify_tools_changed()
622+
return "tool list changed"
623+
624+
625+
@mcp.tool()
626+
async def test_trigger_prompt_change(ctx: Context) -> str:
627+
"""Mutates the prompt list and announces it to subscriptions/listen streams (SEP-2575)"""
628+
mcp.add_prompt(Prompt.from_function(_dynamic_prompt, name="test_dynamic_prompt", description="dynamic"))
629+
mcp.remove_prompt("test_dynamic_prompt")
630+
await ctx.notify_prompts_changed()
631+
return "prompt list changed"
632+
633+
606634
# Resources
607635
@mcp.resource("test://static-text")
608636
def static_text_resource() -> str:

examples/stories/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,14 @@ opens with a banner saying what replaces it.
148148
| [`starlette_mount`](starlette_mount/) | mounting `streamable_http_app()` under a Starlette/FastAPI sub-path | current |
149149
| [`sse_polling`](sse_polling/) | SEP-1699 `closeSSE()` + `Last-Event-ID` resume via `EventStore` | legacy |
150150
| [`standalone_get`](standalone_get/) | server-initiated `list_changed` over the sessionful GET stream | legacy |
151+
| [`subscriptions`](subscriptions/) | `subscriptions/listen` streams: `ctx.notify_*`, `SubscriptionBus`, `ListenHandler` | current |
151152
| [`reconnect`](reconnect/) | explicit `discover()`, persist `DiscoverResult`, zero-RTT reconnect | current |
152153
| [`bearer_auth`](bearer_auth/) | `TokenVerifier` + `AuthSettings` bearer gate, PRM metadata, `get_access_token()` | current |
153154
| [`oauth`](oauth/) | full `authorization_code` grant against an in-process AS | current |
154155
| [`oauth_client_credentials`](oauth_client_credentials/) | `client_credentials` grant; minimal in-process token endpoint | current |
155156
| [`identity_assertion`](identity_assertion/) | SEP-990 enterprise IdP flow: present an ID-JAG under the `jwt-bearer` grant | current |
156157
| **— deferred (README only) —** | | |
157158
| [`caching`](caching/) | `CacheableResult` ttl/scope hints; client honouring | not yet implemented |
158-
| [`subscriptions`](subscriptions/) | `subscriptions/listen`, `ServerEventBus`, `Client.listen()` | not yet implemented — [#2901](https://github.com/modelcontextprotocol/python-sdk/issues/2901) |
159159
| [`tasks`](tasks/) | `io.modelcontextprotocol/tasks` extension | not yet implemented |
160160
| [`apps`](apps/) | MCP Apps: `ui://` resource + `_meta.ui` | not yet implemented — [#2896](https://github.com/modelcontextprotocol/python-sdk/issues/2896) |
161161
| [`skills`](skills/) | SEP-2640 skills extension | not yet implemented — [#2896](https://github.com/modelcontextprotocol/python-sdk/issues/2896) |

examples/stories/subscriptions/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ per-stream filtering, subscription-id tagging). Replaces the handshake-era
1010
The client edits a note it did not subscribe to (silence), edits the one it
1111
did (a tagged `notifications/resources/updated`), registers a tool at runtime
1212
(`notifications/tools/list_changed`, then re-lists and calls it), and finally
13-
closes the stream by cancelling the parked request.
13+
stops listening - cancelling the parked request releases the local task, and
14+
closing the connection ends the stream server-side.
1415

1516
## Run it
1617

@@ -27,9 +28,10 @@ uv run python -m stories.subscriptions.client --http --server server_lowlevel
2728
- `client.py` — stream frames arrive as ordinary server notifications via the
2829
constructor-only `message_handler=`. There is no client-side listen API yet,
2930
so opening the stream drops to the `client.session` escape hatch; the request
30-
parks for the stream's lifetime and the client closes the stream by
31-
cancelling it. Every frame's `_meta["io.modelcontextprotocol/subscriptionId"]`
32-
is the listen request's JSON-RPC id.
31+
parks for the stream's lifetime. Cancelling it releases the local task; over
32+
HTTP the server-side stream ends when the connection closes. Every frame's
33+
`_meta["io.modelcontextprotocol/subscriptionId"]` is the listen request's
34+
JSON-RPC id.
3335
- `server.py` — publishing is one `await ctx.notify_*()` line per change; the
3436
filter, the tagging, and the ack ordering are the SDK's job. Publishing with
3537
no subscribers is a no-op.

examples/stories/subscriptions/client.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ async def wait_for(count: int) -> None:
3838

3939
async with anyio.create_task_group() as tg:
4040
# There is no client-side listen API yet, so the story drops to the
41-
# `client.session` escape hatch: the request parks for the stream's
42-
# lifetime, so it runs as a task and the client closes the stream by
43-
# cancelling it (the spec's client-side close).
41+
# `client.session` escape hatch. The request parks for the stream's
42+
# lifetime, so it runs as a task; cancelling it releases the local
43+
# awaiting scope. In-memory that also ends the server's stream; over
44+
# HTTP today nothing aborts the POST, so the server-side stream ends
45+
# when the connection closes (the `Client` exit right below).
4446
async def listen() -> None:
4547
request = types.SubscriptionsListenRequest(
4648
params=types.SubscriptionsListenRequestParams(
@@ -69,16 +71,17 @@ async def listen() -> None:
6971
updated = received[1]
7072
assert isinstance(updated, types.ResourceUpdatedNotification), updated
7173
assert updated.params.uri == "note://todo"
72-
assert updated.params.meta == ack.params.meta
74+
assert updated.params.meta is not None
75+
assert updated.params.meta[SUBSCRIPTION_ID] == ack.params.meta[SUBSCRIPTION_ID]
7376
assert len(received) == 2, "the journal edit must not have been delivered"
7477

7578
# ── a runtime tool registration announces itself ──
7679
await client.call_tool("enable_search", {})
7780
await wait_for(3)
7881
assert isinstance(received[2], types.ToolListChangedNotification), received[2]
7982

80-
# The client is done listening: closing the stream is cancelling the
81-
# parked request's scope.
83+
# The client is done listening: cancel the parked request and let
84+
# the connection teardown below end the stream server-side.
8285
tg.cancel_scope.cancel()
8386

8487
# list_changed told us to re-fetch - the new tool is callable, and the

examples/stories/subscriptions/server.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,16 @@ async def edit_note(name: str, text: str, ctx: Context) -> str:
2222
def search(query: str) -> list[str]:
2323
return [name for name, text in notes.items() if query in text]
2424

25+
enabled = False
26+
2527
@mcp.tool()
2628
async def enable_search(ctx: Context) -> str:
2729
"""Register the `search` tool at runtime and tell subscribers the list changed."""
28-
mcp.add_tool(search)
29-
await ctx.notify_tools_changed()
30+
nonlocal enabled
31+
if not enabled:
32+
enabled = True
33+
mcp.add_tool(search)
34+
await ctx.notify_tools_changed()
3035
return "search is live"
3136

3237
return mcp

examples/stories/subscriptions/server_lowlevel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ async def call_tool(ctx: ServerRequestContext[Any], params: types.CallToolReques
5656
search_enabled = True
5757
await bus.publish(ToolsListChanged())
5858
return types.CallToolResult(content=[types.TextContent(text="search is live")])
59-
assert params.name == "search"
59+
assert params.name == "search" and search_enabled
6060
matches = [name for name, text in notes.items() if args["query"] in text]
6161
return types.CallToolResult(content=[types.TextContent(text=", ".join(matches))])
6262

src/mcp/server/_streamable_http_modern.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,10 @@ async def handle_modern_request(
272272
progress_token=progress_token_from_params(req.params),
273273
)
274274

275-
if json_response:
275+
if json_response and req.method != "subscriptions/listen":
276+
# A listen response IS a notification stream, so it always takes the
277+
# SSE path below regardless of the JSON-response preference (the
278+
# TypeScript and Go SDKs route it the same way).
276279
msg = await _to_jsonrpc_response(
277280
req.id, serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state)
278281
)

0 commit comments

Comments
 (0)