Skip to content

Commit baa8547

Browse files
authored
Minor cleanup of how we set the upstream_session for the Agent App server (#404)
* Add @app.tool and @app.async_tool decorators * lint and format * mcp logging notifications * Fixes for logger * checkpoint * Cleanup * post-merge checkpoint * Working * Remove custom get-status tool * Fix example * Update readmes * Get @app.tool working with Temporal as well * Fix linter * Tests are passing * bump pyproject * Consolidate _set_upstream_from_request_ctx_if_available in one place * Some more cleanup
1 parent aeef2db commit baa8547

File tree

1 file changed

+7
-51
lines changed

1 file changed

+7
-51
lines changed

src/mcp_agent/server/app_server.py

Lines changed: 7 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def _resolve_workflows_and_context(
140140
"""Resolve the workflows mapping and underlying app context regardless of startup mode.
141141
142142
Tries lifespan ServerContext first (including compatible mocks), then attached app.
143+
Also ensures the app context is updated with the current upstream session once per request.
143144
"""
144145
# Try lifespan-provided ServerContext first
145146
lifespan_ctx = getattr(ctx.request_context, "lifespan_context", None)
@@ -148,6 +149,11 @@ def _resolve_workflows_and_context(
148149
and hasattr(lifespan_ctx, "workflows")
149150
and hasattr(lifespan_ctx, "context")
150151
):
152+
# Ensure upstream session once at resolution time
153+
try:
154+
_set_upstream_from_request_ctx_if_available(ctx)
155+
except Exception:
156+
pass
151157
return lifespan_ctx.workflows, lifespan_ctx.context
152158

153159
# Fall back to app attached to FastMCP
@@ -341,11 +347,6 @@ def list_workflows(ctx: MCPContext) -> Dict[str, Dict[str, Any]]:
341347
Returns information about each workflow type including name, description, and parameters.
342348
This helps in making an informed decision about which workflow to run.
343349
"""
344-
# Ensure upstream session is set for any logs emitted during this call
345-
try:
346-
_set_upstream_from_request_ctx_if_available(ctx)
347-
except Exception:
348-
pass
349350
result: Dict[str, Dict[str, Any]] = {}
350351
workflows, _ = _resolve_workflows_and_context(ctx)
351352
workflows = workflows or {}
@@ -390,12 +391,6 @@ async def list_workflow_runs(ctx: MCPContext) -> List[Dict[str, Any]]:
390391
Returns:
391392
A dictionary mapping workflow instance IDs to their detailed status information.
392393
"""
393-
# Ensure upstream session is set for any logs emitted during this call
394-
try:
395-
_set_upstream_from_request_ctx_if_available(ctx)
396-
except Exception:
397-
pass
398-
399394
server_context = getattr(
400395
ctx.request_context, "lifespan_context", None
401396
) or _get_attached_server_context(ctx.fastmcp)
@@ -428,11 +423,6 @@ async def run_workflow(
428423
A dict with workflow_id and run_id for the started workflow run, can be passed to
429424
workflows/get_status, workflows/resume, and workflows/cancel.
430425
"""
431-
# Ensure upstream session is set before starting the workflow
432-
try:
433-
_set_upstream_from_request_ctx_if_available(ctx)
434-
except Exception:
435-
pass
436426
return await _workflow_run(ctx, workflow_name, run_parameters, **kwargs)
437427

438428
@mcp.tool(name="workflows-get_status")
@@ -454,11 +444,6 @@ async def get_workflow_status(
454444
Returns:
455445
A dictionary with comprehensive information about the workflow status.
456446
"""
457-
# Ensure upstream session is available for any status-related logs
458-
try:
459-
_set_upstream_from_request_ctx_if_available(ctx)
460-
except Exception:
461-
pass
462447
return await _workflow_status(ctx, run_id=run_id, workflow_name=workflow_id)
463448

464449
@mcp.tool(name="workflows-resume")
@@ -486,11 +471,6 @@ async def resume_workflow(
486471
Returns:
487472
True if the workflow was resumed, False otherwise.
488473
"""
489-
# Ensure upstream session is available for any status-related logs
490-
try:
491-
_set_upstream_from_request_ctx_if_available(ctx)
492-
except Exception:
493-
pass
494474
server_context: ServerContext = ctx.request_context.lifespan_context
495475
workflow_registry = server_context.workflow_registry
496476

@@ -533,11 +513,6 @@ async def cancel_workflow(
533513
Returns:
534514
True if the workflow was cancelled, False otherwise.
535515
"""
536-
# Ensure upstream session is available for any status-related logs
537-
try:
538-
_set_upstream_from_request_ctx_if_available(ctx)
539-
except Exception:
540-
pass
541516
server_context: ServerContext = ctx.request_context.lifespan_context
542517
workflow_registry = server_context.workflow_registry
543518

@@ -597,7 +572,6 @@ def create_declared_function_tools(mcp: FastMCP, server_context: ServerContext):
597572
"""
598573
Register tools declared via @app.tool/@app.async_tool on the attached app.
599574
- @app.tool registers a synchronous tool with the same signature as the function
600-
that runs the auto-generated workflow and waits for completion.
601575
- @app.async_tool registers alias tools <name>-run and <name>-get_status
602576
that proxy to the workflow run/status utilities.
603577
"""
@@ -762,12 +736,6 @@ def _make_adapter(context_param_name: str, inner_wrapper):
762736
async def _adapter(**kw):
763737
if context_param_name not in kw:
764738
raise ToolError("Context not provided")
765-
_ctx_obj = kw.get(context_param_name)
766-
if _ctx_obj is not None:
767-
try:
768-
_set_upstream_from_request_ctx_if_available(_ctx_obj)
769-
except Exception:
770-
pass
771739
kw["__context__"] = kw.pop(context_param_name)
772740
return await inner_wrapper(**kw)
773741

@@ -865,17 +833,11 @@ async def _async_wrapper(**kwargs):
865833
parameters=params + [ctx_param], return_annotation=ann.get("return")
866834
)
867835

868-
# Adapter to map injected FastMCP context kwarg and set upstream
836+
# Adapter to map injected FastMCP context kwarg without additional propagation
869837
def _make_async_adapter(context_param_name: str, inner_wrapper):
870838
async def _adapter(**kw):
871839
if context_param_name not in kw:
872840
raise ToolError("Context not provided")
873-
_ctx_obj = kw.get(context_param_name)
874-
if _ctx_obj is not None:
875-
try:
876-
_set_upstream_from_request_ctx_if_available(_ctx_obj)
877-
except Exception:
878-
pass
879841
kw["__context__"] = kw.pop(context_param_name)
880842
return await inner_wrapper(**kw)
881843

@@ -948,7 +910,6 @@ async def run(
948910
ctx: MCPContext,
949911
run_parameters: Dict[str, Any] | None = None,
950912
) -> Dict[str, str]:
951-
_set_upstream_from_request_ctx_if_available(ctx)
952913
return await _workflow_run(ctx, workflow_name, run_parameters)
953914

954915
@mcp.tool(
@@ -961,7 +922,6 @@ async def run(
961922
""",
962923
)
963924
async def get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
964-
_set_upstream_from_request_ctx_if_available(ctx)
965925
return await _workflow_status(ctx, run_id=run_id, workflow_name=workflow_name)
966926

967927

@@ -1077,10 +1037,6 @@ async def _workflow_status(
10771037
ctx: MCPContext, run_id: str, workflow_name: str | None = None
10781038
) -> Dict[str, Any]:
10791039
# Ensure upstream session so status-related logs are forwarded
1080-
try:
1081-
_set_upstream_from_request_ctx_if_available(ctx)
1082-
except Exception:
1083-
pass
10841040
workflow_registry: WorkflowRegistry | None = _resolve_workflow_registry(ctx)
10851041

10861042
if not workflow_registry:

0 commit comments

Comments
 (0)