Skip to content

Commit acb89b4

Browse files
committed
Add in-memory transport
1 parent 1a60e1b commit acb89b4

File tree

8 files changed

+174
-8
lines changed

8 files changed

+174
-8
lines changed

mcp_python/client/session.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ def __init__(
3636
self,
3737
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
3838
write_stream: MemoryObjectSendStream[JSONRPCMessage],
39+
read_timeout_seconds: int | float | None = None,
3940
) -> None:
40-
super().__init__(read_stream, write_stream, ServerRequest, ServerNotification)
41+
super().__init__(read_stream, write_stream, ServerRequest, ServerNotification, read_timeout_seconds=read_timeout_seconds)
4142

4243
async def initialize(self) -> InitializeResult:
4344
from mcp_python.types import (

mcp_python/server/__init__.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,13 @@ def pkg_version(package: str) -> str:
6363
try:
6464
from importlib.metadata import version
6565

66-
return version(package)
66+
v = version(package)
67+
if v is not None:
68+
return v
6769
except Exception:
68-
return "unknown"
70+
pass
71+
72+
return "unknown"
6973

7074
return types.InitializationOptions(
7175
server_name=self.name,
@@ -330,6 +334,11 @@ async def run(
330334
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
331335
write_stream: MemoryObjectSendStream[JSONRPCMessage],
332336
initialization_options: types.InitializationOptions,
337+
# When True, exceptions are returned as messages to the client.
338+
# When False, exceptions are raised, which will cause the server to shut down
339+
# but also make tracing exceptions much easier during testing and when using
340+
# in-process servers.
341+
raise_exceptions: bool = False,
333342
):
334343
with warnings.catch_warnings(record=True) as w:
335344
async with ServerSession(
@@ -349,6 +358,7 @@ async def run(
349358
f"Dispatching request of type {type(req).__name__}"
350359
)
351360

361+
token = None
352362
try:
353363
# Set our global state that can be retrieved via
354364
# app.get_request_context()
@@ -360,20 +370,24 @@ async def run(
360370
)
361371
)
362372
response = await handler(req)
363-
# Reset the global state after we are done
364-
request_ctx.reset(token)
365373
except Exception as err:
374+
if raise_exceptions:
375+
raise err
366376
response = ErrorData(
367377
code=0, message=str(err), data=None
368378
)
379+
finally:
380+
# Reset the global state after we are done
381+
if token is not None:
382+
request_ctx.reset(token)
369383

370384
await message.respond(response)
371385
else:
372386
await message.respond(
373387
ErrorData(
374388
code=METHOD_NOT_FOUND,
375389
message="Method not found",
376-
)
390+
),
377391
)
378392

379393
logger.debug("Response sent")

mcp_python/server/memory.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""
2+
In-memory transports
3+
"""
4+
5+
from contextlib import asynccontextmanager
6+
from typing import AsyncGenerator, Tuple
7+
8+
import anyio
9+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10+
11+
from mcp_python.client.session import ClientSession
12+
from mcp_python.server import Server
13+
from mcp_python.server.session import ServerSession
14+
from mcp_python.types import ErrorData, JSONRPCMessage
15+
16+
@asynccontextmanager
17+
async def create_client_server_memory_streams() -> AsyncGenerator[ Tuple[
18+
Tuple[MemoryObjectReceiveStream[JSONRPCMessage | Exception], MemoryObjectSendStream[JSONRPCMessage]],
19+
Tuple[MemoryObjectReceiveStream[JSONRPCMessage | Exception], MemoryObjectSendStream[JSONRPCMessage]]
20+
], None]:
21+
"""
22+
Creates a pair of bidirectional memory streams for client-server communication.
23+
24+
Returns:
25+
A tuple of (client_streams, server_streams) where each is a tuple of
26+
(read_stream, write_stream)
27+
"""
28+
# Create streams for both directions
29+
server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[JSONRPCMessage | Exception](1)
30+
client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[JSONRPCMessage | Exception](1)
31+
32+
# Return streams grouped by client/server
33+
client_streams = (server_to_client_receive, client_to_server_send)
34+
server_streams = (client_to_server_receive, server_to_client_send)
35+
36+
async with (server_to_client_receive, client_to_server_send,
37+
client_to_server_receive, server_to_client_send):
38+
yield client_streams, server_streams
39+
40+
41+
@asynccontextmanager
42+
async def create_connected_server_and_client_session(server: Server) -> AsyncGenerator[ClientSession, None]:
43+
"""Creates a ServerSession that is connected to the `server`."""
44+
async with create_client_server_memory_streams() as (client_streams, server_streams):
45+
# Unpack the streams
46+
client_read, client_write = client_streams
47+
server_read, server_write = server_streams
48+
print("stream-1")
49+
50+
# Create a cancel scope for the server task
51+
async with anyio.create_task_group() as tg:
52+
53+
tg.start_soon(
54+
server.run,
55+
server_read,
56+
server_write,
57+
server.create_initialization_options()
58+
)
59+
60+
print("stream2")
61+
62+
try:
63+
# Client session could be created here using client_read and client_write
64+
# This would allow testing the server with a client in the same process
65+
async with ClientSession(
66+
read_stream=client_read, write_stream=client_write
67+
) as client_session:
68+
await client_session.initialize()
69+
yield client_session
70+
finally:
71+
tg.cancel_scope.cancel()

mcp_python/shared/session.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,16 @@ def __init__(
8787
write_stream: MemoryObjectSendStream[JSONRPCMessage],
8888
receive_request_type: type[ReceiveRequestT],
8989
receive_notification_type: type[ReceiveNotificationT],
90+
# If none, reading will never time out
91+
read_timeout_seconds: int | float | None = None,
9092
) -> None:
9193
self._read_stream = read_stream
9294
self._write_stream = write_stream
9395
self._response_streams = {}
9496
self._request_id = 0
9597
self._receive_request_type = receive_request_type
9698
self._receive_notification_type = receive_notification_type
99+
self._read_timeout_seconds = read_timeout_seconds
97100

98101
self._incoming_message_stream_writer, self._incoming_message_stream_reader = (
99102
anyio.create_memory_object_stream[
@@ -147,7 +150,13 @@ async def send_request(
147150

148151
await self._write_stream.send(JSONRPCMessage(jsonrpc_request))
149152

150-
response_or_error = await response_stream_reader.receive()
153+
try:
154+
with anyio.fail_after(self._read_timeout_seconds):
155+
response_or_error = await response_stream_reader.receive()
156+
except TimeoutError:
157+
# TODO: make sure this response comes back correctly to the client
158+
raise McpError(ErrorData(code=408, message=f"Timed out while waiting for response to {request.__class__.__name__}. Waited {READ_TIMEOUT_SECONDS} seconds."))
159+
151160
if isinstance(response_or_error, JSONRPCError):
152161
raise McpError(response_or_error.error)
153162
else:

mcp_python/types.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Any, Generic, Literal, TypeVar
22

33
from pydantic import BaseModel, ConfigDict, RootModel
4+
from pydantic.fields import Field
45
from pydantic.networks import AnyUrl
56

67
"""
@@ -141,16 +142,19 @@ class ErrorData(BaseModel):
141142

142143
code: int
143144
"""The error type that occurred."""
145+
144146
message: str
145147
"""
146148
A short description of the error. The message SHOULD be limited to a concise single
147149
sentence.
148150
"""
151+
149152
data: Any | None = None
150153
"""
151154
Additional information about the error. The value of this member is defined by the
152155
sender (e.g. detailed error information, nested errors etc.).
153156
"""
157+
154158
model_config = ConfigDict(extra="allow")
155159

156160

tests/conftest.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import pytest
2+
from pydantic import HttpUrl
3+
4+
from mcp_python.server import Server
5+
from mcp_python.server.types import InitializationOptions
6+
from mcp_python.types import Resource, ServerCapabilities
7+
8+
9+
TEST_INITIALIZATION_OPTIONS = InitializationOptions(
10+
server_name="my_mcp_server",
11+
server_version="0.1.0",
12+
capabilities=ServerCapabilities(),
13+
)
14+
15+
@pytest.fixture
16+
def mcp_server() -> Server:
17+
server = Server(name="test_server")
18+
19+
# Add a simple resource for testing
20+
@server.list_resources()
21+
async def handle_list_resources():
22+
return [
23+
Resource(
24+
uri=HttpUrl("memory://test"),
25+
name="Test Resource",
26+
description="A test resource"
27+
)
28+
]
29+
30+
return server

tests/test_memory.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from typing_extensions import AsyncGenerator
2+
import anyio
3+
from pydantic import HttpUrl
4+
from pydantic_core import Url
5+
import pytest
6+
7+
from mcp_python.client.session import ClientSession
8+
from mcp_python.server import Server
9+
from mcp_python.server.memory import create_client_server_memory_streams, create_connected_server_and_client_session
10+
from mcp_python.server.session import ServerSession
11+
from mcp_python.server.types import InitializationOptions
12+
from mcp_python.types import (
13+
ClientNotification,
14+
EmptyResult,
15+
InitializedNotification,
16+
JSONRPCMessage,
17+
Resource,
18+
ServerCapabilities,
19+
)
20+
21+
22+
@pytest.fixture
23+
async def client_connected_to_server(mcp_server: Server) -> AsyncGenerator[ClientSession, None]:
24+
print('11111')
25+
async with create_connected_server_and_client_session(mcp_server) as client_session:
26+
print('2222k')
27+
yield client_session
28+
print('33')
29+
30+
31+
@pytest.mark.anyio
32+
async def test_memory_server_and_client_connection(client_connected_to_server: ClientSession):
33+
"""Shows how a client and server can communicate over memory streams."""
34+
response = await client_connected_to_server.send_ping()
35+
print('foo')
36+
assert isinstance(response, EmptyResult)
37+
print('bar')

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)