Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/mcp/server/fastmcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from mcp.server.fastmcp.utilities.context_injection import find_context_parameter
from mcp.server.fastmcp.utilities.logging import configure_logging, get_logger
from mcp.server.lowlevel.helper_types import ReadResourceContents
from mcp.server.lowlevel.server import LifespanResultT
from mcp.server.lowlevel.server import LifespanResultT, NotificationOptions
from mcp.server.lowlevel.server import Server as MCPServer
from mcp.server.lowlevel.server import lifespan as default_lifespan
from mcp.server.session import ServerSession, ServerSessionT
Expand Down Expand Up @@ -220,6 +220,7 @@ def __init__( # noqa: PLR0913
self._custom_starlette_routes: list[Route] = []
self.dependencies = self.settings.dependencies
self._session_manager: StreamableHTTPSessionManager | None = None
self._notification_options: NotificationOptions | None = None

# Set up MCP protocol handlers
self._setup_handlers()
Expand Down Expand Up @@ -298,6 +299,28 @@ def _setup_handlers(self) -> None:
self._mcp_server.get_prompt()(self.get_prompt)
self._mcp_server.list_resource_templates()(self.list_resource_templates)

def set_notification_options(
self,
*,
prompts_changed: bool = False,
resources_changed: bool = False,
tools_changed: bool = False,
) -> None:
"""Configure which change notifications this server broadcasts

Args:
prompts_changed: Whether to send prompt list changed notifications.
resources_changed: Whether to send resource list changed notifications.
tools_changed: Whether to send tool list changed notifications.
"""
from mcp.server.lowlevel.server import NotificationOptions

self._notification_options = NotificationOptions(
prompts_changed=prompts_changed,
resources_changed=resources_changed,
tools_changed=tools_changed,
)

async def list_tools(self) -> list[MCPTool]:
"""List all available tools."""
tools = self._tool_manager.list_tools()
Expand Down Expand Up @@ -732,7 +755,7 @@ async def run_stdio_async(self) -> None:
await self._mcp_server.run(
read_stream,
write_stream,
self._mcp_server.create_initialization_options(),
self._mcp_server.create_initialization_options(self._notification_options),
)

async def run_sse_async(self, mount_path: str | None = None) -> None:
Expand Down Expand Up @@ -821,7 +844,7 @@ async def handle_sse(scope: Scope, receive: Receive, send: Send):
await self._mcp_server.run(
streams[0],
streams[1],
self._mcp_server.create_initialization_options(),
self._mcp_server.create_initialization_options(self._notification_options),
)
return Response()

Expand Down Expand Up @@ -935,6 +958,7 @@ def streamable_http_app(self) -> Starlette:
json_response=self.settings.json_response,
stateless=self.settings.stateless_http, # Use the stateless setting
security_settings=self.settings.transport_security,
notification_options=self._notification_options,
)

# Create the ASGI handler
Expand Down
12 changes: 10 additions & 2 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from starlette.responses import Response
from starlette.types import Receive, Scope, Send

from mcp.server.lowlevel.server import NotificationOptions
from mcp.server.lowlevel.server import Server as MCPServer
from mcp.server.streamable_http import (
MCP_SESSION_ID_HEADER,
Expand Down Expand Up @@ -51,6 +52,9 @@ class StreamableHTTPSessionManager:
json_response: Whether to use JSON responses instead of SSE streams
stateless: If True, creates a completely fresh transport for each request
with no session tracking or state persistence between requests.
notification_options:
Specifies which change notifications (e.g. tools, resources, prompts)
this manager should advertise to clients.
"""

def __init__(
Expand All @@ -60,13 +64,17 @@ def __init__(
json_response: bool = False,
stateless: bool = False,
security_settings: TransportSecuritySettings | None = None,
notification_options: NotificationOptions | None = None,
):
self.app = app
self.event_store = event_store
self.json_response = json_response
self.stateless = stateless
self.security_settings = security_settings

# Server notification options
self._notification_options = notification_options

# Session tracking (only used if not stateless)
self._session_creation_lock = anyio.Lock()
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
Expand Down Expand Up @@ -175,7 +183,7 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
self.app.create_initialization_options(self._notification_options),
stateless=True,
)
except Exception:
Expand Down Expand Up @@ -241,7 +249,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
self.app.create_initialization_options(self._notification_options),
stateless=False, # Stateful mode
)
except Exception as e:
Expand Down
113 changes: 113 additions & 0 deletions tests/server/test_notification_options_propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Tests for enabling server notifications."""

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

import pytest
from starlette.applications import Starlette

from mcp.server.fastmcp import FastMCP
from mcp.server.lowlevel.server import NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager


@pytest.mark.anyio
async def test_fastmcp_sets_notification_options_affects_initialization():
"""Test that set_notification_options() correctly affects server initialization."""
server = FastMCP("notification-test")

# By default there should be no configuration
assert server._notification_options is None

# Configure notifications
server.set_notification_options(
prompts_changed=True,
resources_changed=True,
tools_changed=False,
)

# Verify internal NotificationOptions created correctly
assert isinstance(server._notification_options, NotificationOptions)
assert server._notification_options.prompts_changed is True
assert server._notification_options.resources_changed is True
assert server._notification_options.tools_changed is False


@pytest.mark.anyio
async def test_streamable_http_session_manager_uses_notification_options() -> None:
# Create the FastMCP server and configure notifications
server = FastMCP("notification-test")
server.set_notification_options(
prompts_changed=True,
resources_changed=False,
tools_changed=True,
)

# Force creation of the StreamableHTTP session manager without starting uvicorn
app = server.streamable_http_app()

assert isinstance(app, Starlette)

# Get the StreamableHTTPSessionManager
assert server._session_manager is not None
session_manager: StreamableHTTPSessionManager = server._session_manager

# Verify internal NotificationOptions created correctly
assert isinstance(session_manager._notification_options, NotificationOptions)
assert session_manager._notification_options.prompts_changed is True
assert session_manager._notification_options.resources_changed is False
assert session_manager._notification_options.tools_changed is True


@pytest.mark.anyio
async def test_run_stdio_uses_configured_notification_options(monkeypatch: pytest.MonkeyPatch) -> None:
"""Verify FastMCP passes NotificationOptions to the low-level server.run call."""
called_with: dict[str, InitializationOptions] = {}

async def fake_run(
read_stream: Any,
write_stream: Any,
initialization_options: InitializationOptions,
**kwargs: Any,
) -> None:
"""Fake run method capturing the initialization options."""
called_with["init_opts"] = initialization_options

# Create the FastMCP server instance
server = FastMCP("test-server")

# Patch the low-level server.run method to our fake
monkeypatch.setattr(server._mcp_server, "run", fake_run)

# Patch stdio_server to avoid touching real stdin/stdout
@asynccontextmanager
async def fake_stdio_server() -> AsyncIterator[tuple[str, str]]:
yield ("fake_read", "fake_write")

monkeypatch.setattr("mcp.server.fastmcp.server.stdio_server", fake_stdio_server)

# Configure notification options
server.set_notification_options(
prompts_changed=True,
resources_changed=True,
tools_changed=False,
)

# Execute run_stdio_async (uses patched run + stdio_server)
await server.run_stdio_async()

# Verify our fake_run was actually called
assert "init_opts" in called_with, "Expected _mcp_server.run to be called with InitializationOptions"

init_opts: InitializationOptions = called_with["init_opts"]
assert isinstance(init_opts, InitializationOptions)

# Verify the NotificationOptions are reflected correctly in capabilities
caps = init_opts.capabilities
assert caps.prompts is not None and caps.prompts.listChanged is True
assert caps.resources is not None and caps.resources.listChanged is True
assert caps.tools is not None and caps.tools.listChanged is False
Loading