Skip to content

Commit bccff75

Browse files
committed
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
2 parents da1df74 + 1387929 commit bccff75

File tree

13 files changed

+74
-42
lines changed

13 files changed

+74
-42
lines changed

examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,17 +141,14 @@ async def run_server():
141141
read_stream,
142142
write_stream,
143143
app.create_initialization_options(),
144-
# This allows the server to run without waiting for initialization
145-
require_initialization=False,
144+
# Runs in standalone mode for stateless deployments
145+
# where clients perform initialization with any node
146+
standalone_mode=True,
146147
)
147148

148149
# Start server task
149150
task_group.start_soon(run_server)
150151

151-
# Small delay to allow the server task to start
152-
# This helps prevent race conditions in stateless mode
153-
await anyio.sleep(0.001)
154-
155152
# Handle the HTTP request and return the response
156153
await http_transport.handle_request(scope, receive, send)
157154

examples/servers/simple-streamablehttp/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ A simple MCP server example demonstrating the StreamableHttp transport, which en
55
## Features
66

77
- Uses the StreamableHTTP transport for server-client communication
8+
- Supports REST API operations (POST, GET, DELETE) for `/mcp` endpoint
89
- Task management with anyio task groups
910
- Ability to send multiple notifications over time to the client
1011
- Proper resource cleanup and lifespan management

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ async def call_tool(
8383
level="info",
8484
data=f"Notification {i+1}/{count} from caller: {caller}",
8585
logger="notification_stream",
86+
# Associates this notification with the original request
87+
# Ensures notifications are sent to the correct response stream
88+
# Without this, notifications will either go to:
89+
# - a standalone SSE stream (if GET request is supported)
90+
# - nowhere (if GET request isn't supported)
8691
related_request_id=ctx.request_id,
8792
)
8893
if i < count - 1: # Don't wait after the last notification

src/mcp/server/fastmcp/server.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ async def run_stdio_async(self) -> None:
466466
async def run_sse_async(self) -> None:
467467
"""Run the server using SSE transport."""
468468
import uvicorn
469+
469470
starlette_app = self.sse_app()
470471

471472
config = uvicorn.Config(
@@ -673,7 +674,10 @@ async def log(
673674
**extra: Additional structured data to include
674675
"""
675676
await self.request_context.session.send_log_message(
676-
level=level, data=message, logger=logger_name
677+
level=level,
678+
data=message,
679+
logger=logger_name,
680+
related_request_id=self.request_id,
677681
)
678682

679683
@property

src/mcp/server/lowlevel/server.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -479,10 +479,11 @@ async def run(
479479
# but also make tracing exceptions much easier during testing and when using
480480
# in-process servers.
481481
raise_exceptions: bool = False,
482-
# When True, the server will wait for the client to send an initialization
483-
# message before processing any other messages.
484-
# False should be used for stateless servers.
485-
require_initialization: bool = True,
482+
# When True, the server runs in standalone mode for stateless deployments where
483+
# clients can perform initialization with any node. The client must still follow
484+
# the initialization lifecycle, but can do so with any available node
485+
# rather than requiring initialization for each connection.
486+
standalone_mode: bool = False,
486487
):
487488
async with AsyncExitStack() as stack:
488489
lifespan_context = await stack.enter_async_context(self.lifespan(self))
@@ -491,7 +492,7 @@ async def run(
491492
read_stream,
492493
write_stream,
493494
initialization_options,
494-
require_initialization,
495+
standalone_mode=standalone_mode,
495496
)
496497
)
497498

src/mcp/server/session.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ def __init__(
8585
read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception],
8686
write_stream: MemoryObjectSendStream[types.JSONRPCMessage],
8787
init_options: InitializationOptions,
88-
require_initialization: bool = True,
88+
standalone_mode: bool = False,
8989
) -> None:
9090
super().__init__(
9191
read_stream, write_stream, types.ClientRequest, types.ClientNotification
9292
)
93-
if require_initialization:
94-
self._initialization_state = InitializationState.NotInitialized
95-
else:
93+
if standalone_mode:
9694
self._initialization_state = InitializationState.Initialized
95+
else:
96+
self._initialization_state = InitializationState.NotInitialized
9797
self._init_options = init_options
9898
self._incoming_message_stream_writer, self._incoming_message_stream_reader = (
9999
anyio.create_memory_object_stream[ServerRequestResponder](0)

src/mcp/server/streamableHttp.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -656,11 +656,6 @@ async def connect(
656656
"""
657657

658658
# Create the memory streams for this connection
659-
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception]
660-
read_stream_writer: MemoryObjectSendStream[JSONRPCMessage | Exception]
661-
662-
write_stream: MemoryObjectSendStream[JSONRPCMessage]
663-
write_stream_reader: MemoryObjectReceiveStream[JSONRPCMessage]
664659

665660
read_stream_writer, read_stream = anyio.create_memory_object_stream[
666661
JSONRPCMessage | Exception
@@ -684,10 +679,13 @@ async def message_router():
684679
if isinstance(
685680
message.root, JSONRPCNotification | JSONRPCRequest
686681
):
687-
# Extract related_request_id from params if it exists
688-
if (params := getattr(message.root, "params", None)) and (
689-
related_id := params.get("related_request_id")
690-
) is not None:
682+
# Extract related_request_id from meta if it exists
683+
if (
684+
(params := getattr(message.root, "params", None))
685+
and (meta := params.get("_meta"))
686+
and (related_id := meta.get("related_request_id"))
687+
is not None
688+
):
691689
target_request_id = str(related_id)
692690
else:
693691
target_request_id = str(message.root.id)

src/mcp/shared/session.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import Any, Generic, TypeVar
77

88
import anyio
9-
import anyio.lowlevel
109
import httpx
1110
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1211
from pydantic import BaseModel
@@ -24,6 +23,7 @@
2423
JSONRPCNotification,
2524
JSONRPCRequest,
2625
JSONRPCResponse,
26+
NotificationParams,
2727
RequestParams,
2828
ServerNotification,
2929
ServerRequest,
@@ -276,8 +276,19 @@ async def send_notification(
276276
Emits a notification, which is a one-way message that does not expect
277277
a response.
278278
"""
279+
# Some transport implementations may need to set the related_request_id
280+
# to attribute to the notifications to the request that triggered them.
279281
if related_request_id is not None and notification.root.params is not None:
280-
notification.root.params.related_request_id = related_request_id
282+
# Create meta if it doesn't exist
283+
if notification.root.params.meta is None:
284+
meta_dict = {"related_request_id": related_request_id}
285+
286+
else:
287+
meta_dict = notification.root.params.meta.model_dump(
288+
by_alias=True, mode="json", exclude_none=True
289+
)
290+
meta_dict["related_request_id"] = related_request_id
291+
notification.root.params.meta = NotificationParams.Meta(**meta_dict)
281292
jsonrpc_notification = JSONRPCNotification(
282293
jsonrpc="2.0",
283294
**notification.model_dump(by_alias=True, mode="json", exclude_none=True),

src/mcp/types.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ class Meta(BaseModel):
5858
model_config = ConfigDict(extra="allow")
5959

6060
meta: Meta | None = Field(alias="_meta", default=None)
61-
related_request_id: RequestId | None = None
6261
"""
6362
This parameter name is reserved by MCP to allow clients and servers to attach
6463
additional metadata to their notifications.

tests/client/test_logging_callback.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from mcp.shared.session import RequestResponder
1010
from mcp.types import (
1111
LoggingMessageNotificationParams,
12+
NotificationParams,
1213
TextContent,
1314
)
1415

@@ -78,6 +79,11 @@ async def message_handler(
7879
)
7980
assert log_result.isError is False
8081
assert len(logging_collector.log_messages) == 1
81-
assert logging_collector.log_messages[0] == LoggingMessageNotificationParams(
82-
level="info", logger="test_logger", data="Test log message"
83-
)
82+
# Create meta object with related_request_id added dynamically
83+
meta = NotificationParams.Meta()
84+
setattr(meta, "related_request_id", "2")
85+
log = logging_collector.log_messages[0]
86+
assert log.level == "info"
87+
assert log.logger == "test_logger"
88+
assert log.data == "Test log message"
89+
assert log.meta == meta

0 commit comments

Comments
 (0)