diff --git a/src/mcp/server/fastmcp/server.py b/src/mcp/server/fastmcp/server.py index 719595916..12f010d91 100644 --- a/src/mcp/server/fastmcp/server.py +++ b/src/mcp/server/fastmcp/server.py @@ -51,7 +51,7 @@ from mcp.server.fastmcp.utilities.context_injection import find_context_parameter from mcp.server.fastmcp.utilities.logging import configure_logging, get_logger from mcp.server.lowlevel.helper_types import ReadResourceContents -from mcp.server.lowlevel.server import LifespanResultT +from mcp.server.lowlevel.server import LifespanResultT, NotificationOptions from mcp.server.lowlevel.server import Server as MCPServer from mcp.server.lowlevel.server import lifespan as default_lifespan from mcp.server.session import ServerSession, ServerSessionT @@ -220,6 +220,7 @@ def __init__( # noqa: PLR0913 self._custom_starlette_routes: list[Route] = [] self.dependencies = self.settings.dependencies self._session_manager: StreamableHTTPSessionManager | None = None + self._notification_options: NotificationOptions | None = None # Set up MCP protocol handlers self._setup_handlers() @@ -298,6 +299,28 @@ def _setup_handlers(self) -> None: self._mcp_server.get_prompt()(self.get_prompt) self._mcp_server.list_resource_templates()(self.list_resource_templates) + def set_notification_options( + self, + *, + prompts_changed: bool = False, + resources_changed: bool = False, + tools_changed: bool = False, + ) -> None: + """Configure which change notifications this server broadcasts + + Args: + prompts_changed: Whether to send prompt list changed notifications. + resources_changed: Whether to send resource list changed notifications. + tools_changed: Whether to send tool list changed notifications. + """ + from mcp.server.lowlevel.server import NotificationOptions + + self._notification_options = NotificationOptions( + prompts_changed=prompts_changed, + resources_changed=resources_changed, + tools_changed=tools_changed, + ) + async def list_tools(self) -> list[MCPTool]: """List all available tools.""" tools = self._tool_manager.list_tools() @@ -732,7 +755,7 @@ async def run_stdio_async(self) -> None: await self._mcp_server.run( read_stream, write_stream, - self._mcp_server.create_initialization_options(), + self._mcp_server.create_initialization_options(self._notification_options), ) async def run_sse_async(self, mount_path: str | None = None) -> None: @@ -821,7 +844,7 @@ async def handle_sse(scope: Scope, receive: Receive, send: Send): await self._mcp_server.run( streams[0], streams[1], - self._mcp_server.create_initialization_options(), + self._mcp_server.create_initialization_options(self._notification_options), ) return Response() @@ -935,6 +958,7 @@ def streamable_http_app(self) -> Starlette: json_response=self.settings.json_response, stateless=self.settings.stateless_http, # Use the stateless setting security_settings=self.settings.transport_security, + notification_options=self._notification_options, ) # Create the ASGI handler diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 53d542d21..1d637b714 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -15,6 +15,7 @@ from starlette.responses import Response from starlette.types import Receive, Scope, Send +from mcp.server.lowlevel.server import NotificationOptions from mcp.server.lowlevel.server import Server as MCPServer from mcp.server.streamable_http import ( MCP_SESSION_ID_HEADER, @@ -51,6 +52,9 @@ class StreamableHTTPSessionManager: json_response: Whether to use JSON responses instead of SSE streams stateless: If True, creates a completely fresh transport for each request with no session tracking or state persistence between requests. + notification_options: + Specifies which change notifications (e.g. tools, resources, prompts) + this manager should advertise to clients. """ def __init__( @@ -60,6 +64,7 @@ def __init__( json_response: bool = False, stateless: bool = False, security_settings: TransportSecuritySettings | None = None, + notification_options: NotificationOptions | None = None, ): self.app = app self.event_store = event_store @@ -67,6 +72,9 @@ def __init__( self.stateless = stateless self.security_settings = security_settings + # Server notification options + self._notification_options = notification_options + # Session tracking (only used if not stateless) self._session_creation_lock = anyio.Lock() self._server_instances: dict[str, StreamableHTTPServerTransport] = {} @@ -175,7 +183,7 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA await self.app.run( read_stream, write_stream, - self.app.create_initialization_options(), + self.app.create_initialization_options(self._notification_options), stateless=True, ) except Exception: @@ -241,7 +249,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE await self.app.run( read_stream, write_stream, - self.app.create_initialization_options(), + self.app.create_initialization_options(self._notification_options), stateless=False, # Stateful mode ) except Exception as e: diff --git a/tests/server/test_notification_options_propagation.py b/tests/server/test_notification_options_propagation.py new file mode 100644 index 000000000..b15f89a9b --- /dev/null +++ b/tests/server/test_notification_options_propagation.py @@ -0,0 +1,113 @@ +"""Tests for enabling server notifications.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Any + +import pytest +from starlette.applications import Starlette + +from mcp.server.fastmcp import FastMCP +from mcp.server.lowlevel.server import NotificationOptions +from mcp.server.models import InitializationOptions +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager + + +@pytest.mark.anyio +async def test_fastmcp_sets_notification_options_affects_initialization(): + """Test that set_notification_options() correctly affects server initialization.""" + server = FastMCP("notification-test") + + # By default there should be no configuration + assert server._notification_options is None + + # Configure notifications + server.set_notification_options( + prompts_changed=True, + resources_changed=True, + tools_changed=False, + ) + + # Verify internal NotificationOptions created correctly + assert isinstance(server._notification_options, NotificationOptions) + assert server._notification_options.prompts_changed is True + assert server._notification_options.resources_changed is True + assert server._notification_options.tools_changed is False + + +@pytest.mark.anyio +async def test_streamable_http_session_manager_uses_notification_options() -> None: + # Create the FastMCP server and configure notifications + server = FastMCP("notification-test") + server.set_notification_options( + prompts_changed=True, + resources_changed=False, + tools_changed=True, + ) + + # Force creation of the StreamableHTTP session manager without starting uvicorn + app = server.streamable_http_app() + + assert isinstance(app, Starlette) + + # Get the StreamableHTTPSessionManager + assert server._session_manager is not None + session_manager: StreamableHTTPSessionManager = server._session_manager + + # Verify internal NotificationOptions created correctly + assert isinstance(session_manager._notification_options, NotificationOptions) + assert session_manager._notification_options.prompts_changed is True + assert session_manager._notification_options.resources_changed is False + assert session_manager._notification_options.tools_changed is True + + +@pytest.mark.anyio +async def test_run_stdio_uses_configured_notification_options(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify FastMCP passes NotificationOptions to the low-level server.run call.""" + called_with: dict[str, InitializationOptions] = {} + + async def fake_run( + read_stream: Any, + write_stream: Any, + initialization_options: InitializationOptions, + **kwargs: Any, + ) -> None: + """Fake run method capturing the initialization options.""" + called_with["init_opts"] = initialization_options + + # Create the FastMCP server instance + server = FastMCP("test-server") + + # Patch the low-level server.run method to our fake + monkeypatch.setattr(server._mcp_server, "run", fake_run) + + # Patch stdio_server to avoid touching real stdin/stdout + @asynccontextmanager + async def fake_stdio_server() -> AsyncIterator[tuple[str, str]]: + yield ("fake_read", "fake_write") + + monkeypatch.setattr("mcp.server.fastmcp.server.stdio_server", fake_stdio_server) + + # Configure notification options + server.set_notification_options( + prompts_changed=True, + resources_changed=True, + tools_changed=False, + ) + + # Execute run_stdio_async (uses patched run + stdio_server) + await server.run_stdio_async() + + # Verify our fake_run was actually called + assert "init_opts" in called_with, "Expected _mcp_server.run to be called with InitializationOptions" + + init_opts: InitializationOptions = called_with["init_opts"] + assert isinstance(init_opts, InitializationOptions) + + # Verify the NotificationOptions are reflected correctly in capabilities + caps = init_opts.capabilities + assert caps.prompts is not None and caps.prompts.listChanged is True + assert caps.resources is not None and caps.resources.listChanged is True + assert caps.tools is not None and caps.tools.listChanged is False