-
Notifications
You must be signed in to change notification settings - Fork 768
Fix issue with resume and other signals in mcp-agent #521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Caution Review failedThe pull request is closed. WalkthroughAdds guarded, optional Temporal integration in workflow decorators and info access, inserts a defensive None-status check in an example client's polling loop, bumps package version, and applies widespread formatting/whitespace cleanups. No public API signatures changed. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App
participant Workflow as Executor.Workflow
participant Temporal as Temporal SDK (optional)
rect rgb(237,247,255)
note over App,Workflow: Module load / decorator binding
App->>Workflow: import workflow module
alt Temporal available
Workflow->>Temporal: bind decorators via temporal_workflow
note right of Temporal: signals/queries decorated dynamically
else No Temporal
Workflow-->>App: load without Temporal decorators
end
end
rect rgb(247,255,237)
note over App,Workflow: Runtime signal/query handling
alt Temporal available
App->>Temporal: Signal / Query
Temporal->>Workflow: Invoke handler (uses temporal_workflow.info())
Workflow-->>Temporal: Return response
Temporal-->>App: Deliver result
else No Temporal
note over App: Non-Temporal path unchanged
end
end
sequenceDiagram
autonumber
participant Client as Example Client
participant Server as Temporal Server
participant WF as Workflow Run
Client->>Server: Poll workflow status
Server->>WF: Query status
WF-->>Server: status (may be None)
Server-->>Client: return status
alt status is None
Client->>Client: Log error and break loop
note right of Client: prevents processing invalid status
else valid status
Client->>Client: Continue polling/handling
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, great find!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/mcp_agent/mcp/client_proxy.py (1)
179-181
: Sanitizemethod
insignal_name
to ensure validity.
method
may include characters not suitable for signal names or logging/metrics. Recommend normalizing to a safe subset.Apply this diff and add the import:
+import re @@ - signal_name = f"mcp_rpc_{method}_{uuid.uuid4().hex}" + safe_method = re.sub(r'[^A-Za-z0-9_.:-]+', '_', method) + signal_name = f"mcp_rpc_{safe_method}_{uuid.uuid4().hex}"Outside this hunk, add at the top-level:
import re
🧹 Nitpick comments (11)
examples/human_input/temporal/main.py (3)
39-44
: Fix docstring Args to reflect app_ctx parameter.Docstring says “input: none” though the function accepts app_ctx. Update for clarity.
- Args: - input: none - - Returns: - str: The greeting result from the agent + Args: + app_ctx (Context): Workflow context provided by the framework; exposes app and executor. + + Returns: + str: The greeting result from the agent.
31-31
: Optional: make app_ctx non‑optional or guard against None.If the framework always provides app_ctx for @app.tool, mark it non‑optional. Otherwise add a small guard before use.
Option A (preferred if guaranteed non‑None):
-async def greet(app_ctx: Context | None = None) -> str: +async def greet(app_ctx: Context) -> str:Option B (if None is possible):
- app = app_ctx.app + if app_ctx is None: + raise RuntimeError("app_ctx is required in workflow context") + app = app_ctx.appPlease confirm which contract the decorator enforces.
Also applies to: 46-47
63-63
: Confirm logger supports structured kw argument ‘data’.If using Python stdlib logging, logger.info won’t accept arbitrary kwargs. If this isn’t the mcp-agent structured logger, switch to extra=… or serialize the data.
Example alternatives:
- logger.info("[workflow-mode] greet_tool agent result", extra={"data": {"result": result}})
- logger.info("[workflow-mode] greet_tool agent result %s", {"result": result})
src/mcp_agent/mcp/sampling_handler.py (2)
33-39
: Use a shared content-normalizer instead of hasattr checks.Current hasattr-based check can produce awkward strings for lists/bytes and duplicates logic with response formatting. Prefer a small helper and isinstance(TextContent).
Apply this diff within the block:
- for i, msg in enumerate(params.messages): - content = msg.content.text if hasattr(msg.content, "text") else str(msg.content) + for i, msg in enumerate(params.messages): + content = _content_to_text(msg.content)Add this helper near the top of the file (e.g., after logger definition):
def _content_to_text(content) -> str: if content is None: return "" if isinstance(content, TextContent): return content.text or "" if isinstance(content, (list, tuple)): parts = [] for c in content: parts.append(_content_to_text(c)) return " ".join(p for p in parts if p) if hasattr(content, "data"): try: # bytes -> utf-8, else str() data = content.data if isinstance(data, bytes): return data.decode("utf-8", errors="replace") return str(data) except Exception: return str(getattr(content, "data", content)) if isinstance(content, bytes): return content.decode("utf-8", errors="replace") if isinstance(content, dict) and "text" in content: return str(content.get("text", "")) return str(content)Additionally, consider truncating the preview consistently (e.g., 200 chars) via a small helper to keep prompts compact.
85-86
: Align response formatting with request path and reuse normalizer.Mirror the request path: normalize content via a helper and optionally truncate to prevent overly long human prompts.
Apply this diff:
- content = ( - result.content.text if hasattr(result.content, "text") else str(result.content) - ) + content = _content_to_text(result.content)(Optional) Truncate similarly to the request view:
preview = content[:500] + ("..." if len(content) > 500 else "")examples/human_input/temporal/client.py (1)
17-24
: Good defensive fallbacks for optional imports.Minor nit: catch ImportError instead of broad Exception.
Apply this diff:
-try: - from exceptiongroup import ExceptionGroup as _ExceptionGroup # Python 3.10 backport -except Exception: # pragma: no cover +try: + from exceptiongroup import ExceptionGroup as _ExceptionGroup # Python 3.10 backport +except ImportError: # pragma: no cover _ExceptionGroup = None # type: ignore -try: - from anyio import BrokenResourceError as _BrokenResourceError -except Exception: # pragma: no cover +try: + from anyio import BrokenResourceError as _BrokenResourceError +except ImportError: # pragma: no cover _BrokenResourceError = None # type: ignoresrc/mcp_agent/server/app_server.py (2)
819-833
: Ensure Temporal client is initialized before signaling.Call ensure_client() (when available) before accessing executor.client to avoid None or uninitialized client in some worker lifecycles.
- if app and app.context and hasattr(app.context, "executor"): - executor = app.context.executor - if hasattr(executor, "client"): - client = executor.client + if app and app.context and hasattr(app.context, "executor"): + executor = app.context.executor + if hasattr(executor, "ensure_client"): + await executor.ensure_client() + if hasattr(executor, "client"): + client = executor.client
526-535
: Harden Bearer token parsing.partition()+strip() tolerates extra spaces/tabs and avoids partial tokens.
- bearer = request.headers.get("Authorization", "") - bearer_token = ( - bearer.split(" ", 1)[1] if bearer.lower().startswith("bearer ") else "" - ) + bearer = request.headers.get("Authorization", "") + scheme, _, token = bearer.partition(" ") + bearer_token = token.strip() if scheme.lower() == "bearer" else ""src/mcp_agent/human_input/elicitation_handler.py (1)
102-114
: Consider surfacing metadata in the response.If result/content carries useful fields beyond “response”, optionally populate HumanInputResponse.metadata for downstream consumers.
src/mcp_agent/mcp/client_proxy.py (2)
190-206
: Unbounded HTTP timeout by default.
httpx.Timeout(None)
can hang indefinitely. In Temporal activities the activity timeout may bound it, but in other async contexts this can stall the system.Apply this diff to default to a sane finite timeout (overrideable via env):
- timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT") + timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT", "60") @@ - if timeout_float is None: - timeout = httpx.Timeout(None) + if timeout_float is None or timeout_float <= 0: + timeout = 60.0Repeat a similar change in the non‑workflow branch below.
248-251
: DRY: factor header building into a helper.Header construction is duplicated across functions. Extract a small
_auth_headers(gateway_token: Optional[str]) -> Dict[str, str]
to reduce copy/paste and drift.I can draft the helper and apply it across this file on request.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (14)
examples/human_input/temporal/client.py
(4 hunks)examples/human_input/temporal/main.py
(1 hunks)examples/mcp/mcp_elicitation/temporal/client.py
(4 hunks)examples/mcp/mcp_elicitation/temporal/main.py
(1 hunks)src/mcp_agent/app.py
(1 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(3 hunks)src/mcp_agent/executor/temporal/system_activities.py
(1 hunks)src/mcp_agent/executor/workflow.py
(6 hunks)src/mcp_agent/human_input/elicitation_handler.py
(7 hunks)src/mcp_agent/mcp/client_proxy.py
(7 hunks)src/mcp_agent/mcp/sampling_handler.py
(2 hunks)src/mcp_agent/server/app_server.py
(25 hunks)src/mcp_agent/workflows/factory.py
(1 hunks)tests/human_input/test_elicitation_handler.py
(7 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/app.py
src/mcp_agent/executor/workflow.py
examples/human_input/temporal/main.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
examples/human_input/temporal/client.py
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/mcp/client_proxy.py
🧬 Code graph analysis (8)
tests/human_input/test_elicitation_handler.py (2)
src/mcp_agent/human_input/elicitation_handler.py (1)
_handle_elicitation_response
(19-51)src/mcp_agent/human_input/types.py (1)
HumanInputRequest
(7-29)
src/mcp_agent/app.py (1)
src/mcp_agent/executor/decorator_registry.py (1)
get_workflow_run_decorator
(73-82)
examples/human_input/temporal/client.py (3)
src/mcp_agent/core/context.py (1)
Context
(57-103)examples/mcp/mcp_elicitation/temporal/client.py (1)
make_session
(125-137)examples/mcp_agent_server/asyncio/client.py (1)
make_session
(115-127)
examples/mcp/mcp_elicitation/temporal/main.py (1)
src/mcp_agent/app.py (1)
MCPApp
(54-1003)
src/mcp_agent/executor/workflow.py (1)
src/mcp_agent/executor/temporal/workflow_signal.py (1)
signal
(146-195)
examples/mcp/mcp_elicitation/temporal/client.py (3)
src/mcp_agent/app.py (3)
context
(161-166)server_registry
(173-174)logger
(205-222)src/mcp_agent/core/context.py (1)
Context
(57-103)src/mcp_agent/executor/workflow.py (2)
WorkflowExecution
(71-78)run_id
(140-145)
src/mcp_agent/human_input/elicitation_handler.py (2)
src/mcp_agent/executor/temporal/session_proxy.py (2)
request
(108-161)request
(360-363)src/mcp_agent/human_input/types.py (2)
HumanInputRequest
(7-29)HumanInputResponse
(32-42)
src/mcp_agent/server/app_server.py (4)
src/mcp_agent/executor/workflow.py (3)
id
(133-137)run_id
(140-145)executor
(125-130)src/mcp_agent/executor/temporal/session_proxy.py (3)
request
(108-161)request
(360-363)send_request
(183-204)src/mcp_agent/app.py (3)
logger
(205-222)context
(161-166)executor
(177-178)src/mcp_agent/executor/temporal/workflow_signal.py (1)
signal
(146-195)
🔇 Additional comments (29)
examples/human_input/temporal/main.py (1)
9-10
: LGTM on adding asyncio import.Import placement and usage with asyncio.run(main()) look correct.
src/mcp_agent/app.py (1)
710-712
: LGTM — Temporal run-decorator retrieval looks correct.Accessor call and placement are sound; no behavior change beyond formatting.
src/mcp_agent/workflows/factory.py (1)
107-110
: Formatting-only change — OK.Multi-line AgentSpec args improve readability; no semantic change.
src/mcp_agent/executor/temporal/system_activities.py (1)
93-98
: Signature reflow — OK.Parameter layout change only; behavior intact.
examples/mcp/mcp_elicitation/temporal/main.py (1)
16-16
: Compact MCPApp init — OK.Cosmetic; no functional change.
tests/human_input/test_elicitation_handler.py (3)
9-9
: Importing private helper for tests — OK.Explicit import of _handle_elicitation_response keeps tests clear.
26-27
: Test argument reflow — OK.Single‑line literals improve readability; semantics unchanged.
Also applies to: 35-35, 45-46
80-83
: Monkeypatch and mock tweaks — OK.Formatting-only; intent preserved.
Also applies to: 89-91, 95-95, 127-129, 149-151
examples/human_input/temporal/client.py (2)
123-127
: Session factory reflow and call sites — OK.Signature and tool call formatting changes only.
Also applies to: 138-141, 152-152
156-168
: Benign shutdown tolerance — OK.Guards against ExceptionGroup and BrokenResourceError look correct.
src/mcp_agent/executor/temporal/session_proxy.py (1)
109-110
: Method signatures reflow — OK.Pure formatting; no semantic impact.
Also applies to: 325-329, 361-362
examples/mcp/mcp_elicitation/temporal/client.py (3)
126-130
: Formatting and argument consolidation — OK.Cleaner signatures and dict literals; behavior unchanged.
Also applies to: 141-144, 157-159, 165-173, 175-178, 183-184
186-194
: Nice defensive check for None workflow status.Prevents tight-looping on invalid responses.
242-245
: Benign shutdown tolerance — OK.ExceptionGroup/BrokenResourceError guards are correct and safe.
Also applies to: 250-251
src/mcp_agent/server/app_server.py (4)
753-756
: LGTM: clearer async request ingress logging.
1554-1556
: Nice: safe unwrapping of WorkflowResult by discriminator.
1662-1664
: LGTM: robust exclusion of FastMCP Context-typed params.
1765-1767
: LGTM: explicit ctx/run_parameters signature on workflow run tool.src/mcp_agent/human_input/elicitation_handler.py (1)
33-35
: Slash-command short‑circuit looks good.Early return for /decline|/cancel avoids downstream ambiguity.
src/mcp_agent/executor/workflow.py (3)
30-36
: Opportunistic Temporal import fixes registration‑order bug.This removes the sys.modules timing hazard and ensures decorators register when Temporal is present. Good alignment with prior guidance to import guardedly.
465-579
: LGTM: queries are deterministic and guard missing counters.Good defensive reads and best‑effort summaries without awaiting.
433-456
: Dynamic signal handler signature is correct.
Temporal Python SDK expectsdef handler(self, name: str, args: Sequence[workflow.RawValue])
for dynamic signals — the current signature matches, no change required.src/mcp_agent/mcp/client_proxy.py (7)
11-13
: Good: keyword‑only parameters for URL resolution.Precedence is clear and resilient to positional misuse. LGTM.
41-49
: LGTM: clear signature and optional auth.Types and kwargs are consistent with usage in this module.
119-126
: LGTM: notify proxy signature.Consistent with other helpers and preserves optional token handling.
164-175
: Context detection is fine; minor overhead only.Importing within the function is acceptable; exceptions are handled cleanly.
176-178
: Good guard for non‑Temporal contexts.Clear error for callers outside workflow/activity.
211-214
: Good: includesignal_name
in async request.Matches the PR objective to correlate responses via dynamic signal handling.
Please confirm the server endpoint accepts
signal_name
and uses it as the exact Temporal signal name (no transformation). If it transforms, we should mirror the transformation here to keep names aligned.
153-160
: No workflow code path callsrequest_via_proxy(make_async_call=True)
AST-based scan of the repository found no callsites with
make_async_call=True
inside@workflow.defn
functions or elsewhere.
async def ask_via_proxy( | ||
execution_id: str, | ||
prompt: str, | ||
metadata: Dict[str, Any] | None = None, | ||
*, | ||
gateway_url: Optional[str] = None, | ||
gateway_token: Optional[str] = None, | ||
execution_id: str, | ||
prompt: str, | ||
metadata: Dict[str, Any] | None = None, | ||
*, | ||
gateway_url: Optional[str] = None, | ||
gateway_token: Optional[str] = None, | ||
) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return type may not match if JSON is a list.
r.json()
can return non‑dict JSON. Either validate the shape or widen the return type to Any
.
Apply this diff to coerce to dict or return a structured error:
- try:
- return r.json() if r.content else {"error": "invalid_response"}
+ try:
+ body = r.json() if r.content else None
+ return body if isinstance(body, dict) else {"error": "invalid_response"}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/client_proxy.py around lines 82 to 89, the function
ask_via_proxy currently assumes r.json() returns a dict but JSON can be a list
or other types; update the function to validate the parsed JSON shape and either
coerce it to a dict (e.g., wrap non-dict responses as {"data": parsed_json}) or
raise/return a structured error dict indicating unexpected type, and adjust the
function's return type to Dict[str, Any] | Any (or widen to Any) accordingly so
callers handle non-dict responses safely.
make_async_call: bool, | ||
execution_id: str, | ||
method: str, | ||
params: Dict[str, Any] | None = None, | ||
*, | ||
gateway_url: Optional[str] = None, | ||
gateway_token: Optional[str] = None, | ||
) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential non‑determinism: HTTP inside workflow context.
This function performs an HTTP request even when workflow.in_workflow()
is true. Direct I/O in workflow code breaks determinism and can cause duplicate side‑effects on replay. Prefer executing an activity to perform the HTTP and return the signal_name
back to the workflow.
Would you like a sketch to refactor this path to workflow.execute_activity(...)
?
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/client_proxy.py around lines 153 to 160: the function
performs direct HTTP I/O even when workflow.in_workflow() is true, which breaks
Temporal determinism; change the path so that when workflow.in_workflow()
returns True you do not perform HTTP directly but instead call
workflow.execute_activity(...) to run an activity that performs the HTTP request
and returns the signal_name (pass gateway_url/gateway_token/params through
activity args), await the activity result inside the workflow with appropriate
scheduleToClose/startToClose timeouts, and keep the existing non-workflow path
to perform the HTTP call directly; also add/update the corresponding activity
implementation and registration to perform the actual HTTP request and return
the same response shape so callers are unchanged.
Context
wait_for_signal
continues to block.Root cause
The dynamic signal handler defined in
workflow.py
only existed when temporalio.workflow was already imported at class-definition time (if "temporalio.workflow" in sys.modules). In cloud builds that module wasn’t loaded yet when Workflow was imported (after change 87ab941), so the handler never registered. Without it, incoming signals never update the workflowSignalMailbox
, so the workflow sees no signal despite Temporal recording the event.Fix
Replace the sys.modules guard with an opportunistic import at module load. We try to import temporalio.workflow/RawValue; if available, we register the dynamic signal handler and related queries immediately, regardless of earlier import order. When Temporal isn’t installed (asyncio engine), the import cleanly fails and the code remains a no-op.
With the handler guaranteed to exist whenever Temporal is present, the mailbox is updated, wait_for_signal observes the version bump, and paused workflows resume in cloud exactly as they do locally.
Summary by CodeRabbit
New Features
Bug Fixes
Style
Chores