-
Notifications
You must be signed in to change notification settings - Fork 768
Enable log notifications from MCP agent server to client #401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds upstream log forwarding from server/app/workflows to clients, introduces declared function tools (@app.tool/@app.async_tool) with bare-usage support, enhances logging API (context binding, runtime level control), updates asyncio examples (server/client) to use new tools and server logging, exposes set_logging_level and tooling endpoints, and adds tests for upstream logging and capability registration. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as App/Workflow Code
participant Log as Logger (bound_context)
participant Bus as AsyncEventBus
participant L as MCPUpstreamLoggingListener
participant Sess as UpstreamServerSession
App->>Log: emit info/debug/etc. (extra...)
Note right of Log: extract upstream_session from bound_context
Log->>Bus: emit Event{..., upstream_session}
Bus-->>L: dispatch matched event
L->>Sess: send_log_message(level, data, logger, related_request_id)
Note over Sess,L: exceptions swallowed (best-effort forwarding)
sequenceDiagram
autonumber
participant Client as Example Client
participant MCP as MCP Server
rect rgb(235,245,255)
Note over Client,MCP: Synchronous tool
Client->>MCP: tools/grade_story.run(story)
MCP-->>Client: result (report)
end
rect rgb(245,235,255)
Note over Client,MCP: Asynchronous tool with polling
Client->>MCP: tools/grade_story_async.run(story)
MCP-->>Client: accepted {run_id}
loop poll
Client->>MCP: tools/grade_story_async-get_status.run(run_id)
MCP-->>Client: status {pending|running|completed|error}
end
alt completed
MCP-->>Client: result (report)
else error
MCP-->>Client: error info
end
end
Note over Client,MCP: Server logs forwarded upstream to client via logging callback
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/mcp_agent/app.py (1)
473-477
: Signal handler wrapper should await async callables
decorated_fn
may return a coroutine; the wrapper currently returns it without awaiting, causing "coroutine was never awaited" and lost execution. Fix by awaiting when needed.@functools.wraps(decorated_fn) -async def wrapper(*args, **kwargs): - signal_handler_args = args[1:] - return decorated_fn(*signal_handler_args, **kwargs) +async def wrapper(*args, **kwargs): + signal_handler_args = args[1:] + res = decorated_fn(*signal_handler_args, **kwargs) + if asyncio.iscoroutine(res): + return await res + return ressrc/mcp_agent/server/app_server.py (2)
68-76
: Fix: ServerContext.register_workflow checks wrong mapping
self.context.workflows
doesn’t exist (Context has no ‘workflows’). Use the propertyself.workflows
(delegates to app.workflows).- if workflow_name not in self.context.workflows: + if workflow_name not in self.workflows: self.workflows[workflow_name] = workflow_cls
596-603
: Async declared tools: add ‘-get_status’ alias or correct docs/endpointsTop-level doc says async declares run+get_status, but only the run tool is registered. Either add the status alias or update docs and list output. Adding the alias improves UX and aligns with capabilities.
@@ - - @app.async_tool registers alias tools <name>-run and <name>-get_status - that proxy to the workflow run/status utilities. + - @app.async_tool registers tools '<name>' (start run) and '<name>-get_status' + that proxy to the workflow run/status utilities. @@ - if getattr(workflow_cls, "__mcp_agent_sync_tool__", False): - endpoints = [ - f"{workflow_name}", - ] - elif getattr(workflow_cls, "__mcp_agent_async_tool__", False): - endpoints = [ - f"{workflow_name}", - ] + if getattr(workflow_cls, "__mcp_agent_sync_tool__", False): + endpoints = [f"{workflow_name}"] + caps = ["run"] + elif getattr(workflow_cls, "__mcp_agent_async_tool__", False): + endpoints = [f"{workflow_name}", f"{workflow_name}-get_status"] + caps = ["run", "get_status"] else: endpoints = [ f"workflows-{workflow_name}-run", f"workflows-{workflow_name}-get_status", ] + caps = ["run", "get_status", "resume", "cancel"] @@ - result[workflow_name] = { + result[workflow_name] = { "name": workflow_name, "description": workflow_cls.__doc__ or run_fn_tool.description, - "capabilities": ["run", "resume", "cancel", "get_status"], + "capabilities": caps, "tool_endpoints": endpoints, "run_parameters": run_fn_tool.parameters, } @@ elif mode == "async": # Use the declared name as the async run endpoint - run_tool_name = f"{name_local}" + run_tool_name = f"{name_local}" @@ _async_wrapper.__signature__ = inspect.Signature( parameters=params + [ctx_param], return_annotation=ann.get("return") ) @@ _async_adapter = _make_async_adapter(ctx_param_name, _async_wrapper) @@ # Register the async run tool mcp.add_tool( _async_adapter, name=run_tool_name, description=full_desc, structured_output=False, ) registered.add(run_tool_name) + + # Also register '<name>-get_status' alias + status_tool_name = f"{name_local}-get_status" + if status_tool_name not in registered: + from typing import Dict as _Dict, Any as _Any + async def _async_status_wrapper(*, run_id: str, **kw): + ctx: MCPContext = kw.pop("__context__") + return await _workflow_status(ctx, run_id, workflow_name=wname_local) + # Annotations and signature + _ann_s = {"run_id": str} + if _Ctx is not None: + _ann_s[ctx_param_name] = _Ctx + _ann_s["return"] = _Dict[str, _Any] + _async_status_wrapper.__annotations__ = _ann_s + _async_status_wrapper.__name__ = status_tool_name + _async_status_wrapper.__doc__ = ( + f"Get status for the asynchronously started '{wname_local}' workflow." + ) + # Signature: run_id positional/kw + kw-only ctx + _params_s = [ + inspect.Parameter("run_id", kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=str), + inspect.Parameter(ctx_param_name, kind=inspect.Parameter.KEYWORD_ONLY, annotation=_Ctx) if _Ctx is not None + else inspect.Parameter(ctx_param_name, kind=inspect.Parameter.KEYWORD_ONLY), + ] + _async_status_wrapper.__signature__ = inspect.Signature(parameters=_params_s) + # Adapter to inject __context__ and set upstream + _async_status_adapter = _make_async_adapter(ctx_param_name, _async_status_wrapper) + mcp.add_tool( + _async_status_adapter, + name=status_tool_name, + description=_async_status_wrapper.__doc__, + structured_output=False, + ) + registered.add(status_tool_name)Also applies to: 785-893, 357-377
🧹 Nitpick comments (23)
examples/mcp_agent_server/asyncio/client.py (5)
25-30
: Constrain CLI log-level to valid values.Add argparse choices and normalize case at parse time.
- parser.add_argument( - "--server-log-level", - type=str, - default=None, - help="Set initial server logging level (debug, info, notice, warning, error, critical, alert, emergency)", - ) + parser.add_argument( + "--server-log-level", + type=str.lower, + choices=["debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"], + default="info", + help="Set minimum server logging level (debug, info, notice, warning, error, critical, alert, emergency)", + )
79-92
: Graceful fallback on unsupported set_logging_level is good.Optional: narrow the exception to the protocol’s “method not found” if available.
221-236
: De-duplicate tool-result parsing.Extract the repeated “structuredContent/result or JSON from content” logic into a helper to reduce branching.
Add this helper (outside changed range):
def extract_payload(result): sc = getattr(result, "structuredContent", None) if isinstance(sc, dict) and ("result" in sc): return sc["result"] payload = _tool_result_to_json(result) if payload is not None: return payload if getattr(result, "content", None) and getattr(result.content[0], "text", None): try: return json.loads(result.content[0].text) except Exception: return None return NoneThen here:
grade_payload = extract_payload(grade_result)
263-279
: Async status polling OK.Optional: add a max wait or deadline to prevent infinite loops.
290-296
: Fix misleading comment in _tool_result_to_json.The code returns None on invalid JSON; the comment says otherwise.
- # If it's not valid JSON, just use the text + # If it's not valid JSON, return None so callers can fall back to other shapesexamples/mcp_agent_server/README.md (2)
55-61
: Typo: “VSCode” → “VS Code”.-| **Client Independence** | Connect from any MCP client: Claude, VSCode, Cursor, MCP Inspector, or custom apps | +| **Client Independence** | Connect from any MCP client: Claude, VS Code, Cursor, MCP Inspector, or custom apps |
102-118
: MD040: add a language to the ASCII art code fence.Mark the fence as “text” to satisfy markdownlint.
examples/mcp_agent_server/asyncio/README.md (8)
16-16
: Grammar: use imperative voice.-- Preferred: Declaring MCP tools with `@app.tool` and `@app.async_tool` +- Preferred: Declare MCP tools with `@app.tool` and `@app.async_tool`
102-105
: Add language to shell fence.-``` +```bash cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml--- `108-113`: **Mark API keys block as YAML.** ```diff -``` +```yaml anthropic: api_key: "your-anthropic-api-key" openai: api_key: "your-openai-api-key"
--- `121-125`: **Add language to shell fence.** ```diff -``` +```bash # Make sure you're in the mcp_agent_server/asyncio directory uv run client.py
--- `139-144`: **Add language to server shell fence.** ```diff -``` +```bash uv run basic_agent_server.py # Optionally, run with the example custom FastMCP settings uv run basic_agent_server.py --custom-fastmcp-settings
--- `148-153`: **Add language to client shell fence.** ```diff -``` +```bash uv run client.py # Optionally, run with the example custom FastMCP settings uv run client.py --custom-fastmcp-settings
--- `195-201`: **Add language to inspector shell fence.** ```diff -``` +```bash npx @modelcontextprotocol/inspector \ uv \ --directory /path/to/mcp-agent/examples/mcp_agent_server/asyncio \ run \ basic_agent_server.py
--- `233-238`: **Add language to which-paths shell fence.** ```diff -``` +```bash which uvx which npx
</blockquote></details> <details> <summary>src/mcp_agent/app.py (3)</summary><blockquote> `192-199`: **Avoid mutating Logger private attr; rebind via get_logger** Touching `_bound_context` reaches into internals. Re-resolve via `get_logger(...)`, which updates the existing instance’s bound context safely. ```diff - else: - # Update the logger's bound context in case upstream_session was set after logger creation - if self._context and hasattr(self._logger, "_bound_context"): - self._logger._bound_context = self._context + else: + # Rebind via get_logger to avoid touching private attrs + if self._context: + self._logger = get_logger( + f"mcp_agent.{self.name}", + session_id=self._context.session_id, + context=self._context, + )
559-576
: Safer signature inspection and injection for app_ctx/Context
- Use the public
inspect.Parameter.empty
.- Don’t overwrite a user-supplied
app_ctx
kwarg.- if param.annotation != _inspect.Parameter.empty: + if param.annotation is not _inspect.Parameter.empty: ... - if app_context_param_name and getattr(workflow_self, "_context", None): - call_kwargs[app_context_param_name] = workflow_self._context + if ( + app_context_param_name + and getattr(workflow_self, "_context", None) + and app_context_param_name not in call_kwargs + ): + call_kwargs[app_context_param_name] = workflow_self._context ... - p.annotation is not _inspect._empty + p.annotation is not _inspect.Parameter.emptyAlso applies to: 585-596
351-351
: Fix typo in comment ("Update")- # Updatre progress display before logging is shut down + # Update progress display before logging is shut downsrc/mcp_agent/logging/logger.py (3)
362-382
: Optional: make set_min_level robust if configure() was never calledIf
_event_filter_ref
is stillNone
, instantiate one so calls from the MCPset_logging_level
endpoint never no-op.@classmethod def set_min_level(cls, level: EventType | str) -> None: - """Update the minimum logging level on the shared event filter, if available.""" - if cls._event_filter_ref is None: - return + """Update the minimum logging level on the shared event filter, if available.""" + if cls._event_filter_ref is None: + cls._event_filter_ref = EventFilter()
299-305
: Use a single Final import consistentlyMinor consistency: you already import
Final
at the top; remove the inner alias.- from typing import Final as _Final - - MCP_UPSTREAM_LISTENER_NAME: _Final[str] = "mcp_upstream" + MCP_UPSTREAM_LISTENER_NAME: Final[str] = "mcp_upstream"
402-411
: Docstring: ‘context’ is not deprecated; it’s used for upstream forwardingUpdate parameter docs to reflect current behavior.
- context: Deprecated/ignored. Present for backwards compatibility. + context: Optional bound application context (e.g., MCPApp Context). If provided, + the logger forwards events upstream using context.upstream_session.src/mcp_agent/server/app_server.py (2)
391-392
: Doc: returns a list, not a dictAdjust return description to match the implementation.
- Returns: - A dictionary mapping workflow instance IDs to their detailed status information. + Returns: + A list of workflow instance status dictionaries.
953-955
: Doc: fix endpoint name in referenceUse the hyphenated tool name, not a path with slashes.
- run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run. + run_id: The run ID of the running workflow, received from workflows-{workflow_name}-run.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
examples/mcp_agent_server/README.md
(4 hunks)examples/mcp_agent_server/asyncio/README.md
(6 hunks)examples/mcp_agent_server/asyncio/client.py
(7 hunks)src/mcp_agent/app.py
(5 hunks)src/mcp_agent/logging/logger.py
(8 hunks)src/mcp_agent/server/app_server.py
(19 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-28T15:07:09.951Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:09.951Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/server/app_server.py
examples/mcp_agent_server/asyncio/client.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/server/app_server.py
examples/mcp_agent_server/README.md
examples/mcp_agent_server/asyncio/client.py
examples/mcp_agent_server/asyncio/README.md
🧬 Code graph analysis (4)
src/mcp_agent/logging/logger.py (4)
src/mcp_agent/logging/events.py (3)
Event
(39-61)EventContext
(23-36)EventFilter
(64-112)src/mcp_agent/app.py (3)
session_id
(185-186)context
(145-150)logger
(189-199)src/mcp_agent/logging/transport.py (3)
AsyncEventBus
(281-530)get
(308-315)add_listener
(442-444)src/mcp_agent/logging/listeners.py (1)
MCPUpstreamLoggingListener
(235-297)
src/mcp_agent/server/app_server.py (4)
src/mcp_agent/app.py (9)
logger
(189-199)MCPApp
(40-860)context
(145-150)upstream_session
(169-170)upstream_session
(173-174)workflows
(177-178)tool
(639-685)executor
(161-162)workflow
(405-438)src/mcp_agent/logging/logger.py (3)
LoggingConfig
(258-395)set_min_level
(363-381)get_logger
(402-430)src/mcp_agent/core/context.py (2)
mcp
(98-99)Context
(57-99)src/mcp_agent/executor/temporal/workflow_registry.py (2)
get_workflow_status
(137-168)get_workflow
(63-66)
src/mcp_agent/app.py (3)
src/mcp_agent/logging/logger.py (1)
get_logger
(402-430)src/mcp_agent/core/context.py (2)
mcp
(98-99)Context
(57-99)tests/server/test_app_server.py (2)
decorator
(296-298)decorator
(379-381)
examples/mcp_agent_server/asyncio/client.py (3)
src/mcp_agent/mcp/gen_client.py (1)
gen_client
(16-41)src/mcp_agent/mcp/mcp_agent_client_session.py (1)
MCPAgentClientSession
(72-445)examples/mcp_agent_server/temporal/client.py (1)
_tool_result_to_json
(114-124)
🪛 LanguageTool
examples/mcp_agent_server/README.md
[grammar] ~67-~67: There might be a mistake here.
Context: ...r experience: - Asyncio Implementation - [Temporal Implementation](./temporal/READ...
(QB_NEW_EN)
examples/mcp_agent_server/asyncio/README.md
[grammar] ~16-~16: There might be a mistake here.
Context: ...eclaring MCP tools with @app.tool
and @app.async_tool
- Connecting to an MCP server using `gen_c...
(QB_NEW_EN)
[grammar] ~17-~17: There might be a mistake here.
Context: ...ol- Connecting to an MCP server using
gen_client` - Running workflows remotely and monitorin...
(QB_NEW_EN)
[grammar] ~83-~83: There might be a mistake here.
Context: ...ol: grade_story
(returns final result) - Async tool: grade_story_async
(returns...
(QB_NEW_EN)
🪛 markdownlint-cli2 (0.17.2)
examples/mcp_agent_server/README.md
121-121: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (6)
examples/mcp_agent_server/asyncio/client.py (3)
67-78
: Session factory wiring is correct.Signature matches gen_client’s expected
(read, write, timeout) -> ClientSession
, and the callback is injected properly.
146-152
: Status parsing fallback is reasonable.Good use of structuredContent/result before erroring.
58-65
: Verify async support forlogging_callback
. TheMCPAgentClientSession
constructor’slogging_callback
parameter (typeLoggingFnT
frommcp.client.session
) is passed through to the underlying client; confirm thatLoggingFnT
supports async callables or wrap the callback synchronously if it doesn’t.examples/mcp_agent_server/asyncio/README.md (2)
77-87
: Endpoint list is clear and accurate.
159-182
: Client log-receiving example is solid and consistent with code.src/mcp_agent/server/app_server.py (1)
344-349
: Upstream binding: best-effort try/except is fineSetting upstream session in
list_workflows
before emitting logs is a good guard for forwarding.
# Tolerant parsing of run IDs from tool result | ||
run_payload = _tool_result_to_json(run_result) | ||
if not run_payload: | ||
sc = getattr(run_result, "structuredContent", None) | ||
if isinstance(sc, dict): | ||
run_payload = sc.get("result") or sc | ||
if not run_payload: | ||
# Last resort: parse unstructured content if present and non-empty | ||
if getattr(run_result, "content", None) and run_result.content[0].text: | ||
run_payload = json.loads(run_result.content[0].text) | ||
else: | ||
raise RuntimeError( | ||
"Unable to extract workflow run IDs from tool result" | ||
) | ||
|
||
execution = WorkflowExecution(**run_payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Harden run-ID extraction: catch JSON errors and validate keys.
Currently json.loads(...)
isn’t caught and missing keys can raise later.
# Tolerant parsing of run IDs from tool result
run_payload = _tool_result_to_json(run_result)
if not run_payload:
sc = getattr(run_result, "structuredContent", None)
if isinstance(sc, dict):
run_payload = sc.get("result") or sc
if not run_payload:
# Last resort: parse unstructured content if present and non-empty
- if getattr(run_result, "content", None) and run_result.content[0].text:
- run_payload = json.loads(run_result.content[0].text)
+ if getattr(run_result, "content", None) and getattr(run_result.content[0], "text", None):
+ try:
+ run_payload = json.loads(run_result.content[0].text)
+ except Exception as e:
+ raise RuntimeError(f"Invalid JSON in tool result content: {e}") from e
else:
raise RuntimeError(
"Unable to extract workflow run IDs from tool result"
)
- execution = WorkflowExecution(**run_payload)
+ if not isinstance(run_payload, dict) or not all(k in run_payload for k in ("workflow_id", "run_id")):
+ raise RuntimeError(f"Tool result missing workflow_id/run_id: {run_payload!r}")
+ execution = WorkflowExecution(**run_payload)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Tolerant parsing of run IDs from tool result | |
run_payload = _tool_result_to_json(run_result) | |
if not run_payload: | |
sc = getattr(run_result, "structuredContent", None) | |
if isinstance(sc, dict): | |
run_payload = sc.get("result") or sc | |
if not run_payload: | |
# Last resort: parse unstructured content if present and non-empty | |
if getattr(run_result, "content", None) and run_result.content[0].text: | |
run_payload = json.loads(run_result.content[0].text) | |
else: | |
raise RuntimeError( | |
"Unable to extract workflow run IDs from tool result" | |
) | |
execution = WorkflowExecution(**run_payload) | |
# Tolerant parsing of run IDs from tool result | |
run_payload = _tool_result_to_json(run_result) | |
if not run_payload: | |
sc = getattr(run_result, "structuredContent", None) | |
if isinstance(sc, dict): | |
run_payload = sc.get("result") or sc | |
if not run_payload: | |
# Last resort: parse unstructured content if present and non-empty | |
if getattr(run_result, "content", None) and getattr(run_result.content[0], "text", None): | |
try: | |
run_payload = json.loads(run_result.content[0].text) | |
except Exception as e: | |
raise RuntimeError(f"Invalid JSON in tool result content: {e}") from e | |
else: | |
raise RuntimeError( | |
"Unable to extract workflow run IDs from tool result" | |
) | |
if not isinstance(run_payload, dict) or not all(k in run_payload for k in ("workflow_id", "run_id")): | |
raise RuntimeError(f"Tool result missing workflow_id/run_id: {run_payload!r}") | |
execution = WorkflowExecution(**run_payload) |
async_ids = ( | ||
(getattr(async_run_result, "structuredContent", {}) or {}).get( | ||
"result" | ||
) | ||
or _tool_result_to_json(async_run_result) | ||
or json.loads(async_run_result.content[0].text) | ||
) | ||
async_run_id = async_ids["run_id"] | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate async IDs and guard JSON parsing.
Avoid KeyError when run_id
is missing; catch JSON decode.
- async_ids = (
- (getattr(async_run_result, "structuredContent", {}) or {}).get(
- "result"
- )
- or _tool_result_to_json(async_run_result)
- or json.loads(async_run_result.content[0].text)
- )
- async_run_id = async_ids["run_id"]
+ async_ids = (
+ (getattr(async_run_result, "structuredContent", {}) or {}).get("result")
+ or _tool_result_to_json(async_run_result)
+ or (
+ json.loads(async_run_result.content[0].text)
+ if getattr(async_run_result, "content", None) and getattr(async_run_result.content[0], "text", None)
+ else None
+ )
+ )
+ if not isinstance(async_ids, dict) or "run_id" not in async_ids:
+ raise RuntimeError(f"grade_story_async: missing run_id in response: {async_ids!r}")
+ async_run_id = async_ids["run_id"]
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async_ids = ( | |
(getattr(async_run_result, "structuredContent", {}) or {}).get( | |
"result" | |
) | |
or _tool_result_to_json(async_run_result) | |
or json.loads(async_run_result.content[0].text) | |
) | |
async_run_id = async_ids["run_id"] | |
logger.info( | |
async_ids = ( | |
(getattr(async_run_result, "structuredContent", {}) or {}).get("result") | |
or _tool_result_to_json(async_run_result) | |
or ( | |
json.loads(async_run_result.content[0].text) | |
if getattr(async_run_result, "content", None) | |
and getattr(async_run_result.content[0], "text", None) | |
else None | |
) | |
) | |
if not isinstance(async_ids, dict) or "run_id" not in async_ids: | |
raise RuntimeError(f"grade_story_async: missing run_id in response: {async_ids!r}") | |
async_run_id = async_ids["run_id"] | |
logger.info( |
🤖 Prompt for AI Agents
In examples/mcp_agent_server/asyncio/client.py around lines 245 to 253, the code
assumes async_ids contains a dict with "run_id" and blindly calls json.loads on
content, which can raise KeyError or JSONDecodeError; update the code to: ensure
async_ids is a dict (if it's a list or None, normalize or pick the first
element), use .get("run_id") and check for its presence before using it (log and
raise a clear exception or return early if missing), wrap json.loads in a
try/except catching json.JSONDecodeError to handle malformed JSON (log the raw
content and re-raise or return a sensible error), and add minimal unit/log
messages so failures are observable rather than causing uncaught exceptions.
```python | ||
from mcp_agent.app import MCPApp | ||
from typing import Optional | ||
|
||
app = MCPApp(name="basic_agent_server") | ||
|
||
# Synchronous tool – returns the final result to the caller | ||
@app.tool | ||
async def grade_story(story: str, app_ctx: Optional[Context] = None) -> str: | ||
""" | ||
Grade a student's short story and return a structured report. | ||
""" | ||
# ... implement using your agents/LLMs ... | ||
return "Report..." | ||
|
||
# Asynchronous tool – starts a workflow and returns IDs to poll later | ||
@app.async_tool(name="grade_story_async") | ||
async def grade_story_async(story: str, app_ctx: Optional[Context] = None) -> str: | ||
""" | ||
Start grading the story asynchronously. | ||
This tool starts the workflow and returns 'workflow_id' and 'run_id'. Use the | ||
generic 'workflows-get_status' tool with the returned IDs to retrieve status/results. | ||
""" | ||
# ... implement using your agents/LLMs ... | ||
return "(async run)" | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Fix sample: missing Context import and async return shape.
Import Context (or remove the annotation) and return IDs from the async tool as documented.
```python
from mcp_agent.app import MCPApp
-from typing import Optional
+from typing import Optional
+from mcp_agent.context import Context # verify correct import path
@@
# Synchronous tool – returns the final result to the caller
@app.tool
async def grade_story(story: str, app_ctx: Optional[Context] = None) -> str:
@@
# Asynchronous tool – starts a workflow and returns IDs to poll later
@app.async_tool(name="grade_story_async")
-async def grade_story_async(story: str, app_ctx: Optional[Context] = None) -> str:
+async def grade_story_async(story: str, app_ctx: Optional[Context] = None) -> dict[str, str]:
@@
- # ... implement using your agents/LLMs ...
- return "(async run)"
+ # ... implement using your agents/LLMs ...
+ return {"workflow_id": "grade_story_async", "run_id": "<generated-run-id>"}
To locate the correct Context import:
---
🏁 Script executed:
```shell
#!/bin/bash
rg -nP '\bclass\s+Context\b|from\s+mcp_agent\..*import\s+Context' -C2
Length of output: 31203
Update Context import and async tool return
- In
examples/mcp_agent_server/asyncio/README.md
, change the import tofrom mcp_agent.core.context import Context
- Update
grade_story_async
to return adict[str, str]
(e.g.{"workflow_id": ..., "run_id": ...}
) instead of a plain string.
🤖 Prompt for AI Agents
In examples/mcp_agent_server/asyncio/README.md around lines 24 to 50, update the
Context import to use from mcp_agent.core.context import Context and change the
async tool signature and return to return a dict[str,str] (e.g. async def
grade_story_async(story: str, app_ctx: Optional[Context] = None) -> dict[str,
str]) and ensure the function returns a mapping like {"workflow_id": "<id>",
"run_id": "<id>"} instead of a plain string; add any necessary typing imports if
missing.
### Preferred: Declare tools with decorators | ||
|
||
Instead of only defining workflow classes, you can expose tools directly from functions: | ||
|
||
```python | ||
from mcp_agent.app import MCPApp | ||
|
||
app = MCPApp(name="my_agent_server") | ||
|
||
@app.tool | ||
async def do_something(arg: str) -> str: | ||
"""Do something synchronously and return the final result.""" | ||
return "done" | ||
|
||
@app.async_tool(name="do_something_async") | ||
async def do_something_async(arg: str) -> str: | ||
""" | ||
Start work asynchronously. | ||
Returns 'workflow_id' and 'run_id'. Use 'workflows-get_status' with the returned | ||
IDs to retrieve status and results. | ||
""" | ||
return "started" | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Align async tool example with described return contract.
Example says it returns workflow_id/run_id but returns a string.
@app.tool
async def do_something(arg: str) -> str:
"""Do something synchronously and return the final result."""
return "done"
@app.async_tool(name="do_something_async")
-async def do_something_async(arg: str) -> str:
+async def do_something_async(arg: str) -> dict[str, str]:
"""
Start work asynchronously.
Returns 'workflow_id' and 'run_id'. Use 'workflows-get_status' with the returned
IDs to retrieve status and results.
"""
- return "started"
+ return {"workflow_id": "do_something_async", "run_id": "<generated-run-id>"}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
### Preferred: Declare tools with decorators | |
Instead of only defining workflow classes, you can expose tools directly from functions: | |
```python | |
from mcp_agent.app import MCPApp | |
app = MCPApp(name="my_agent_server") | |
@app.tool | |
async def do_something(arg: str) -> str: | |
"""Do something synchronously and return the final result.""" | |
return "done" | |
@app.async_tool(name="do_something_async") | |
async def do_something_async(arg: str) -> str: | |
""" | |
Start work asynchronously. | |
Returns 'workflow_id' and 'run_id'. Use 'workflows-get_status' with the returned | |
IDs to retrieve status and results. | |
""" | |
return "started" | |
``` | |
### Preferred: Declare tools with decorators | |
Instead of only defining workflow classes, you can expose tools directly from functions: | |
🤖 Prompt for AI Agents
examples/mcp_agent_server/README.md lines 70-93: the async tool example claims
it returns workflow_id and run_id but the function currently returns a single
string; change the example so the async tool returns the expected identifiers
(either a tuple like (workflow_id, run_id) or a dict with keys "workflow_id" and
"run_id"), and update the function body and docstring accordingly to show the
actual return shape used by the workflows-get_status call.
bus = AsyncEventBus.get(transport=transport) | ||
# Keep a reference to the provided filter so we can update at runtime | ||
if event_filter is not None: | ||
cls._event_filter_ref = event_filter | ||
|
||
# If already initialized, ensure critical listeners exist and return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guarantee a shared EventFilter so set_logging_level always works
If configure()
is called without an event_filter
, _event_filter_ref
stays None
; later set_min_level
becomes a no-op and the upstream listener (created with None
) won’t respond to level changes. Create a default filter and pass the shared reference to listeners.
- bus = AsyncEventBus.get(transport=transport)
- # Keep a reference to the provided filter so we can update at runtime
- if event_filter is not None:
- cls._event_filter_ref = event_filter
+ bus = AsyncEventBus.get(transport=transport)
+ # Ensure a shared filter instance always exists for runtime updates
+ if event_filter is None:
+ event_filter = EventFilter()
+ cls._event_filter_ref = event_filter
...
- MCP_UPSTREAM_LISTENER_NAME: Final[str] = "mcp_upstream"
- bus.add_listener(
- MCP_UPSTREAM_LISTENER_NAME,
- MCPUpstreamLoggingListener(event_filter=event_filter),
- )
+ MCP_UPSTREAM_LISTENER_NAME: Final[str] = "mcp_upstream"
+ bus.add_listener(
+ MCP_UPSTREAM_LISTENER_NAME,
+ MCPUpstreamLoggingListener(event_filter=cls._event_filter_ref),
+ )
Also applies to: 331-346
logger.error(f"grade_story: Error generating result: {e}") | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation for grade_story
is str
, but the function returns None
on error. This creates a type inconsistency that could cause type checking failures and potentially break client code expecting a string return value. Consider either:
- Returning an error message string instead of
None
- Changing the return type annotation to
str | None
- Raising an exception that can be properly handled by the caller
This pattern appears in both the sync and async versions of the function.
logger.error(f"grade_story: Error generating result: {e}") | |
return None | |
logger.error(f"grade_story: Error generating result: {e}") | |
return f"Error grading story: {str(e)}" |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
logger.error(f"grade_story_async: Error generating result: {e}") | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation for grade_story_async
is str
, but the function returns None
on error. This creates a type inconsistency that could cause type checking failures and potentially break client expectations. Consider either:
- Changing the return type annotation to
str | None
- Returning an error string instead of
None
- Raising an exception that can be properly handled by the caller
This would maintain type consistency and provide clearer error handling semantics.
logger.error(f"grade_story_async: Error generating result: {e}") | |
return None | |
logger.error(f"grade_story_async: Error generating result: {e}") | |
return "ERROR: Failed to generate grading result" |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
result = await workflow.execute_activity( | ||
activity_task, | ||
args=args, | ||
*args, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change modifies the argument passing pattern from args=args
to *args
, which unpacks the arguments differently. The original code passed args
as a named parameter to workflow.execute_activity()
, but this change passes them as positional arguments. This could cause activity execution to fail if the receiving function expects a specific parameter structure. Consider maintaining the original behavior with args=args
unless there's a specific reason for this change that isn't apparent from the diff.
*args, | |
args=args, |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
# Check for missing required (non-default) parameters | ||
for p in params: | ||
if p.default is inspect._empty and p.name not in bound.arguments: | ||
raise ValueError(f"Missing required workflow argument '{p.name}'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is using inspect._empty
instead of the public API inspect.Parameter.empty
. This will likely cause an AttributeError at runtime since _empty
is an implementation detail that could change. Consider updating to use the documented public attribute:
if p.default is inspect.Parameter.empty and p.name not in bound.arguments:
raise ValueError(f"Missing required workflow argument '{p.name}'")
# Check for missing required (non-default) parameters | |
for p in params: | |
if p.default is inspect._empty and p.name not in bound.arguments: | |
raise ValueError(f"Missing required workflow argument '{p.name}'") | |
# Check for missing required (non-default) parameters | |
for p in params: | |
if p.default is inspect.Parameter.empty and p.name not in bound.arguments: | |
raise ValueError(f"Missing required workflow argument '{p.name}'") |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
"result" | ||
) | ||
or _tool_result_to_json(async_run_result) | ||
or json.loads(async_run_result.content[0].text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code attempts to parse JSON from async_run_result.content[0].text
without first verifying that content
exists and contains elements. This could lead to an IndexError
if content
is empty or None
. Consider adding defensive checks:
if getattr(async_run_result, "content", None) and async_run_result.content:
async_ids = json.loads(async_run_result.content[0].text)
or json.loads(async_run_result.content[0].text) | |
or (getattr(async_run_result, "content", None) and async_run_result.content and json.loads(async_run_result.content[0].text)) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
MCP models logging via the
notifications/message
messages: https://modelcontextprotocol.io/specification/2025-03-26/server/utilities/loggingWe leverage this so that when an agent is set up as an MCP server, the logger emits notification messages back to the upstream client.
Note: currently this will work for asyncio execution mode. Additional work is necessary for it to work in a Temporal context, which @roman-van-der-krogt is tackling as part of #386
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests