Skip to content

Commit 1842209

Browse files
Make Context bound to the request, allowing shared MCPApp connections (#573)
* Fix workflow resume issue * temp wip * Fix workflow resume issue * temp wip * add audience validation * add comfigured token support; add workflow_pre_auth; oauth example against github * working oauth example with workflow_pre_auth * fixes to oauth discovery; add dynamic oath example * full e2e workflow * improve how we're dealing with no oauth user * all tests passing * reformat * rework oauth flow * Update readme * Fix user conflation on temporal; start of moving cache to app side * cache tokens requested from temporal flow in app * Implement local loopback OAuth callback server for MCPApp client-only runs * Tests and more fixes for loopback, including browser launch * various fixes * additional fixes suggested by cursor * merge and format * Various updates to get the "interactive_tool" example working * All examples working * Regenerate schema and add docstrings * Remove TESTING_GUIDE.md * Fixes to make sure oauth identities are properly isolated across multiple users * remove * Make Context bound to the request, allowing shared MCPApp connections * Fix failing tests * Address PR feedback * address more PR feedback, plus an example * Move examples * more fixes --------- Co-authored-by: Roman van der Krogt <[email protected]>
1 parent 6f3a22f commit 1842209

File tree

22 files changed

+1723
-507
lines changed

22 files changed

+1723
-507
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Context Isolation Demo
2+
3+
This example shows how per-request context scoping prevents logs and
4+
notifications from bleeding between concurrent MCP clients.
5+
6+
## Setup
7+
8+
- Install the example dependencies from this folder:
9+
```bash
10+
uv pip install -r examples/mcp_agent_server/context_isolation/requirements.txt
11+
```
12+
- Optional: adjust `mcp_agent.config.yaml` if you want to tweak logging transports or
13+
register additional MCP backends.
14+
15+
## Running the example
16+
17+
1. Start the SSE server in one terminal:
18+
19+
```bash
20+
uv run python examples/mcp_agent_server/context_isolation/server.py
21+
```
22+
23+
The server listens on `http://127.0.0.1:8000/sse` and exposes a single tool
24+
(`emit_log`) that logs messages using the request-scoped context.
25+
26+
2. In a second terminal, run the clients script. It launches two concurrent
27+
clients that connect to the server, set independent logging levels, and call
28+
the tool.
29+
30+
```bash
31+
uv run python examples/mcp_agent_server/context_isolation/clients.py
32+
```
33+
34+
Each client prints the logs and `demo/echo` notifications it receives. Client
35+
A (set to `debug`) sees all messages it emits, while client B (set to
36+
`error`) only receives error-level output. Notifications are tagged with the
37+
originating session so you can observe the strict separation between the two
38+
clients.
39+
40+
## Expected output
41+
42+
- Server console highlights two `SetLevelRequest` operations (one per client) followed
43+
by a pair of `CallToolRequest` entries. You should also see an `emit_log` workflow
44+
execution for each client with parameters matching the client payloads.
45+
46+
- Client A prints both `debug` and `info` log notifications (one per tool call) and
47+
the `demo/echo` notification containing its session id:
48+
49+
```text
50+
[A] log debug: ...
51+
[A] log info: Workflow emit_log started execution ...
52+
[A] tool result: ... "level": "debug"
53+
```
54+
55+
- Client B only prints the `error` log notification—even after the second tool call—
56+
confirming that the per-session
57+
log level (`error`) filters out the info/debug output:
58+
59+
```text
60+
[B] log error: ...
61+
[B] tool result: ... "level": "error"
62+
```
63+
64+
If Client B ever receives an `info` or `debug` log entry, the request-scoped logging
65+
override is not working and should be investigated.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""Connect two clients concurrently to demonstrate context isolation."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from datetime import timedelta
7+
from typing import Any
8+
9+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10+
from mcp import ClientSession
11+
12+
from mcp_agent.app import MCPApp
13+
from mcp_agent.config import MCPServerSettings, MCPSettings, Settings
14+
from mcp_agent.core.context import Context
15+
from mcp_agent.mcp.gen_client import gen_client
16+
from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession
17+
18+
19+
SERVER_NAME = "context-isolation-server"
20+
SERVER_URL = "http://127.0.0.1:8000/sse"
21+
22+
23+
async def run_client(
24+
client_name: str,
25+
log_level: str,
26+
payloads: list[str],
27+
*,
28+
delay_between_calls: float = 0.5,
29+
) -> None:
30+
"""Connect to the server, set logging, and invoke the emit_log tool for each payload."""
31+
32+
settings = Settings(
33+
execution_engine="asyncio",
34+
mcp=MCPSettings(
35+
servers={
36+
SERVER_NAME: MCPServerSettings(
37+
name=SERVER_NAME,
38+
description="Context isolation demo server",
39+
transport="sse",
40+
url=SERVER_URL,
41+
)
42+
}
43+
),
44+
)
45+
46+
app = MCPApp(name=f"client-{client_name}", settings=settings)
47+
48+
async with app.run() as running_app:
49+
context = running_app.context
50+
51+
async def on_log(params: Any) -> None:
52+
try:
53+
message = params.data.get("message") if params.data else None
54+
except Exception:
55+
message = None
56+
print(f"[{client_name}] log {params.level}: {message}")
57+
58+
class DemoClientSession(MCPAgentClientSession):
59+
async def _received_notification(self, notification): # type: ignore[override]
60+
method = getattr(getattr(notification, "root", None), "method", None)
61+
if method and method != "notifications/message":
62+
print(
63+
f"[{client_name}] notify {method}: {notification.model_dump()}"
64+
)
65+
return await super()._received_notification(notification)
66+
67+
def make_session(
68+
read_stream: MemoryObjectReceiveStream,
69+
write_stream: MemoryObjectSendStream,
70+
read_timeout_seconds: timedelta | None,
71+
context: Context | None = None,
72+
) -> ClientSession:
73+
return DemoClientSession(
74+
read_stream=read_stream,
75+
write_stream=write_stream,
76+
read_timeout_seconds=read_timeout_seconds,
77+
logging_callback=on_log,
78+
context=context,
79+
)
80+
81+
async with gen_client(
82+
SERVER_NAME,
83+
context.server_registry,
84+
client_session_factory=make_session,
85+
) as server:
86+
await server.set_logging_level(log_level)
87+
for idx, payload in enumerate(payloads, start=1):
88+
result = await server.call_tool(
89+
"emit_log",
90+
arguments={"level": log_level, "message": payload},
91+
)
92+
print(f"[{client_name}] call {idx} result: {result}")
93+
await asyncio.sleep(delay_between_calls)
94+
95+
96+
async def main() -> None:
97+
await asyncio.gather(
98+
run_client("A", "debug", ["hello from A", "A second info"]),
99+
run_client("B", "error", ["hello from B", "B second info"]),
100+
)
101+
102+
103+
if __name__ == "__main__":
104+
asyncio.run(main())
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
$schema: ../../schema/mcp-agent.config.schema.json
2+
3+
execution_engine: asyncio
4+
5+
logger:
6+
transports: [console]
7+
level: info
8+
9+
mcp:
10+
servers: {}
11+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Core framework dependency
2+
mcp-agent @ file://../../../ # Link to the local mcp-agent project root
3+
4+
# Additional helper packages used by the client script
5+
anyio
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Simple SSE server demonstrating per-client context isolation."""
2+
3+
import asyncio
4+
5+
from mcp_agent.app import MCPApp
6+
from mcp_agent.core.context import Context
7+
from mcp_agent.server.app_server import create_mcp_server_for_app
8+
9+
10+
app = MCPApp(name="context-isolation-server")
11+
12+
13+
@app.tool("emit_log")
14+
async def emit_log(context: Context, level: str = "info", message: str = "hi") -> dict:
15+
"""Log a message at the requested level and emit a notification."""
16+
17+
session = context.request_session_id or "unknown"
18+
await context.log(level, f"[{session}] {message}")
19+
try:
20+
await context.send_notification(
21+
"demo/echo",
22+
{
23+
"session": session,
24+
"level": level,
25+
"message": message,
26+
},
27+
)
28+
except Exception:
29+
pass
30+
return {"logged": message, "level": level, "session": session}
31+
32+
33+
async def main() -> None:
34+
async with app.run() as running_app:
35+
server = create_mcp_server_for_app(running_app)
36+
await server.run_sse_async()
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

src/mcp_agent/core/context.py

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from mcp_agent.logging.logger import get_logger
3636
from mcp_agent.tracing.token_counter import TokenCounter
3737
from mcp_agent.oauth.identity import OAuthUserIdentity
38+
from mcp_agent.core.request_context import get_current_request_context
3839

3940

4041
if TYPE_CHECKING:
@@ -73,7 +74,6 @@ class Context(MCPContext):
7374
human_input_handler: Optional[HumanInputCallback] = None
7475
elicitation_handler: Optional[ElicitationCallback] = None
7576
signal_notification: Optional[SignalWaitCallback] = None
76-
upstream_session: Optional[ServerSession] = None
7777
model_selector: Optional[ModelSelector] = None
7878
session_id: str | None = None
7979
app: Optional["MCPApp"] = None
@@ -105,12 +105,51 @@ class Context(MCPContext):
105105
token_store: Optional[TokenStore] = None
106106
token_manager: Optional[TokenManager] = None
107107
identity_registry: Dict[str, OAuthUserIdentity] = Field(default_factory=dict)
108+
request_session_id: str | None = None
109+
request_identity: OAuthUserIdentity | None = None
108110

109111
model_config = ConfigDict(
110112
extra="allow",
111113
arbitrary_types_allowed=True, # Tell Pydantic to defer type evaluation
112114
)
113115

116+
@property
117+
def upstream_session(self) -> ServerSession | None: # type: ignore[override]
118+
"""
119+
Resolve the active upstream session, preferring the request-scoped clone.
120+
121+
The base application context keeps an optional session used by scripts or
122+
tests that set MCPApp.upstream_session directly. During an MCP request the
123+
request-bound context is stored in a ContextVar; whenever callers reach the
124+
base context while that request is active we return the request's session
125+
instead of whichever client touched the base context last.
126+
"""
127+
request_ctx = get_current_request_context()
128+
if request_ctx is not None:
129+
if request_ctx is self:
130+
return getattr(self, "_upstream_session", None)
131+
132+
current = request_ctx
133+
while current is not None:
134+
parent_ctx = getattr(current, "_parent_context", None)
135+
if parent_ctx is self:
136+
return getattr(current, "_upstream_session", None)
137+
current = parent_ctx
138+
139+
explicit = getattr(self, "_upstream_session", None)
140+
if explicit is not None:
141+
return explicit
142+
143+
parent = getattr(self, "_parent_context", None)
144+
if parent is not None:
145+
return getattr(parent, "_upstream_session", None)
146+
147+
return None
148+
149+
@upstream_session.setter
150+
def upstream_session(self, value: ServerSession | None) -> None:
151+
object.__setattr__(self, "_upstream_session", value)
152+
114153
@property
115154
def mcp(self) -> FastMCP | None:
116155
return self.app.mcp if self.app else None
@@ -144,9 +183,10 @@ def session(self) -> ServerSession | None:
144183
145184
Returns None when no session can be resolved (e.g., local scripts).
146185
"""
147-
# 1) Explicit upstream session set by app/workflow
148-
if getattr(self, "upstream_session", None) is not None:
149-
return self.upstream_session
186+
# 1) Explicit upstream session set by app/workflow (handles request clones)
187+
explicit = getattr(self, "upstream_session", None)
188+
if explicit is not None:
189+
return explicit
150190

151191
# 2) Try request-scoped session from FastMCP Context (may raise outside requests)
152192
try:
@@ -216,6 +256,13 @@ def bind_request(
216256
"""
217257
# Shallow copy to preserve references to registries/loggers while keeping isolation
218258
bound: Context = self.model_copy(deep=False)
259+
object.__setattr__(bound, "_upstream_session", None)
260+
try:
261+
object.__setattr__(bound, "_parent_context", self)
262+
except Exception:
263+
pass
264+
bound.request_session_id = None
265+
bound.request_identity = None
219266
try:
220267
setattr(bound, "_request_context", request_context)
221268
except Exception:
@@ -260,11 +307,16 @@ async def log(
260307
"""
261308
# If we have a live FastMCP request context, delegate to parent
262309
try:
263-
# will raise if request_context is not available
264310
_ = self.request_context # type: ignore[attr-defined]
265-
return await super().log(level, message, logger_name=logger_name) # type: ignore[misc]
266311
except Exception:
267312
pass
313+
else:
314+
try:
315+
return await super().log( # type: ignore[misc]
316+
level, message, logger_name=logger_name
317+
)
318+
except Exception:
319+
pass
268320

269321
# Fall back to local logger if available
270322
try:
@@ -305,12 +357,10 @@ async def read_resource(self, uri: Any) -> Any: # type: ignore[override]
305357
"""
306358
# Use the parent implementation if request-bound fastmcp is available
307359
try:
308-
if getattr(self, "_fastmcp", None) is not None:
309-
return await super().read_resource(uri) # type: ignore[misc]
360+
return await super().read_resource(uri) # type: ignore[misc]
310361
except Exception:
311362
pass
312363

313-
# Fall back to app-managed FastMCP if present
314364
try:
315365
mcp = self.mcp
316366
if mcp is not None:
@@ -501,6 +551,9 @@ def get_current_context() -> Context:
501551
Synchronous initializer/getter for global application context.
502552
For async usage, use aget_current_context instead.
503553
"""
554+
request_ctx = get_current_request_context()
555+
if request_ctx is not None:
556+
return request_ctx
504557
global _global_context
505558
if _global_context is None:
506559
try:

0 commit comments

Comments
 (0)