-
Notifications
You must be signed in to change notification settings - Fork 768
Sampling/Elicitation for workflows via sampling #502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a Temporal-enabled MCP elicitation example (tool, workflow, worker, client, configs), implements an async-request signaling path between Temporal workflows and the MCP server, centralizes gateway authentication, enriches human sampling prompts with summaries/metadata, and extends create_llm to accept agent-name strings. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant WF as Temporal Workflow
participant Act as SystemActivities (activity)
participant CP as ClientProxy
participant S as MCP Server (app_server)
participant Sess as Upstream Session
rect #eef7ff
note over WF: Workflow initiates an in-workflow request (async)
WF->>Act: relay_request(make_async_call=true, exec_id, method, params)
Act->>CP: request_via_proxy(make_async_call=true,...)
CP->>S: POST /internal/session/by-run/{workflow_id}/{exec_id}/async-request
S-->>CP: {"error":"", "signal_name": "..."}
CP-->>Act: {"signal_name": "..."}
Act-->>WF: returns signal_name
end
rect #f6ffed
note over S,Sess: Server processes request in background and signals workflow
S->>Sess: send_request(method, params)
Sess-->>S: result
S->>WF: workflow_handle.signal("_user_response", result)
end
WF->>WF: wait_for_signal("_user_response")
WF-->>WF: receive result → continue
sequenceDiagram
autonumber
participant C as Example Client
participant BAS as basic_agent_server (SSE)
participant W as Temporal Worker
participant App as MCP App
C->>BAS: call tool book_table(...)
BAS-->>C: tool result
C->>BAS: start workflow TestWorkflow.run(args)
BAS->>W: schedule workflow task
W->>App: execute workflow.run(args)
App->>App: elicit + generate haiku via upstream session
W-->>BAS: workflow started (ids)
loop poll status
C->>BAS: workflows-get_status(run_id)
BAS-->>C: status (running|completed|error|...)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/mcp_agent/mcp/sampling_handler.py (1)
169-173
: Avoid leaking full request payloads in metadata.
params.model_dump()
can include user prompts/PII; send only what you need.Apply this diff:
- metadata={ - "type": "sampling_request_approval", - "original_params": params.model_dump(), - }, + metadata={ + "type": "sampling_request_approval", + # Exclude verbose/sensitive fields like messages + "original_params": params.model_dump(exclude={"messages"}), + },
🧹 Nitpick comments (25)
examples/mcp/mcp_elicitation/temporal/mcp_agent.secrets.yaml.example (1)
3-7
: Keep real secrets out of VCS and consider env interpolation.Good to ship as .example. If the loader supports it, prefer ${OPENAI_API_KEY} / ${ANTHROPIC_API_KEY} to nudge users toward env-based secrets.
examples/mcp/mcp_elicitation/temporal/requirements.txt (1)
2-6
: Pin or constrain versions for reproducible example runs.Unpinned deps can break the example unexpectedly; either use a constraints file or minimal upper/lower bounds.
examples/mcp/mcp_elicitation/main.py (2)
18-19
: Don’t registerexample_usage
as a tool.This function boots the app and runs a demo; registering it as a tool is confusing and could cause nested runs if invoked via MCP.
Apply this diff:
-@app.tool async def example_usage():
33-34
: Avoid duplicate output; prefer logger.Drop the print to keep output routing consistent.
- logger.info(f"Result: {res}") - print(f"Result: {res}") + logger.info(f"Result: {res}")src/mcp_agent/mcp/sampling_handler.py (6)
37-43
: Handle message.content being a list or non-text type.MCP messages often carry a list of content parts; current logic assumes a single object.
Apply this diff:
- for i, msg in enumerate(params.messages): - content = ( - msg.content.text if hasattr(msg.content, "text") else str(msg.content) - ) - messages_text += f" Message {i + 1} ({msg.role}): {content[:200]}{'...' if len(content) > 200 else ''}\n" + for i, msg in enumerate(params.messages): + obj = getattr(msg, "content", None) + parts: list[str] = [] + if isinstance(obj, list): + for c in obj: + if hasattr(c, "text") and getattr(c, "text"): + parts.append(c.text) # type: ignore[attr-defined] + elif hasattr(c, "data") and getattr(c, "data"): + parts.append(str(c.data)) # type: ignore[attr-defined] + else: + parts.append(str(c)) + content = " ".join(parts) + else: + if hasattr(obj, "text") and getattr(obj, "text"): + content = obj.text # type: ignore[attr-defined] + elif hasattr(obj, "data") and getattr(obj, "data"): + content = str(obj.data) # type: ignore[attr-defined] + else: + content = str(obj) + preview = content[:200] + messages_text += f" Message {i + 1} ({msg.role}): {preview}{'...' if len(content) > 200 else ''}\n"
86-92
: Ditto for result.content: support list content.Prevent poor summaries when providers return multi-part content.
- content = ( - result.content.text - if hasattr(result.content, "text") - else str(result.content) - ) + obj = result.content + if isinstance(obj, list): + parts: list[str] = [] + for c in obj: + if hasattr(c, "text") and getattr(c, "text"): + parts.append(c.text) # type: ignore[attr-defined] + elif hasattr(c, "data") and getattr(c, "data"): + parts.append(str(c.data)) # type: ignore[attr-defined] + else: + parts.append(str(c)) + content = " ".join(parts) + else: + if hasattr(obj, "text") and getattr(obj, "text"): + content = obj.text # type: ignore[attr-defined] + elif hasattr(obj, "data") and getattr(obj, "data"): + content = str(obj.data) # type: ignore[attr-defined] + else: + content = str(obj)
175-178
: Be lenient on approvals.Accept common variants to reduce friction.
- text = (resp.response or "").strip().lower() - return ( - (params, "") if text == "approve" else (None, resp.response or "rejected") - ) + text = (resp.response or "").strip().lower() + approved = text in {"approve", "approved", "yes", "y"} + return ((params, "") if approved else (None, resp.response or "rejected"))
200-202
: Redact response content in metadata.Avoid embedding full model output in request metadata.
- metadata={ - "type": "sampling_response_approval", - "original_result": result.model_dump(), - }, + metadata={ + "type": "sampling_response_approval", + "original_result": result.model_dump(exclude={"content"}), + },
161-173
: Consider adding a timeout to human approvals.Use the existing
timeout_seconds
field to prevent indefinite waits.req = HumanInputRequest( prompt=( "MCP server requests LLM sampling. Respond 'approve' to proceed, " "anything else to reject (your input will be recorded as reason)." f"\n\n{request_summary}" ), description="MCP Sampling Request Approval", request_id=f"sampling_request_{uuid4()}", + timeout_seconds=300, metadata={
190-203
: Apply the same timeout to response approvals.req = HumanInputRequest( prompt=( "LLM has generated a response. Respond 'approve' to send, " "anything else to reject (your input will be recorded as reason)." f"\n\n{response_summary}" ), description="MCP Sampling Response Approval", request_id=f"sampling_response_{uuid4()}", + timeout_seconds=300, metadata={
examples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml (1)
15-17
: Ensure logs directory exists or switch to rotating console for examples.File logging at debug is fine; just avoid failures due to missing dirs.
examples/mcp/mcp_elicitation/temporal/worker.py (1)
1-7
: Docstring accuracy and small cleanup
- The comment mentions TemporalExecutor.start_worker but this script uses create_temporal_worker_for_app directly.
- logger is defined but unused.
-This leverages the TemporalExecutor's start_worker method to handle the worker setup. +This leverages create_temporal_worker_for_app to construct and run a Temporal Worker.Optionally log startup to use the logger:
async def main(): @@ - async with create_temporal_worker_for_app(app) as worker: + logger.info("Starting Temporal worker...") + async with create_temporal_worker_for_app(app) as worker: await worker.run()src/mcp_agent/executor/temporal/session_proxy.py (3)
23-25
: Per-run global state needs cleanup_workflow_states grows without bounds. Clean up after consuming to avoid memory growth across many requests.
121-143
: Unbounded wait and key mismatch with _user_response
- This path waits on _workflow_states[exec_id], but current _user_response writes under run_id; unify on execution_id as proposed in workflow.py/app.py fixes.
- Add a timeout to avoid indefinite hangs; return a clear error on timeout.
- Clean up the state entry after returning.
act = self._context.task_registry.get_activity("mcp_relay_request") await self._executor.execute( act, True, exec_id, method, params or {}, ) - - # Wait for the _elicitation_response signal to be triggered - await workflow.wait_condition( - lambda: _workflow_states.get(exec_id, {}).get('response_received', False) - ) - - return _workflow_states.get(exec_id, {}).get('response_data', {"error": "no_response"}) + # Wait for the response signal to be triggered (bounded) + from datetime import timedelta + await workflow.wait_condition( + lambda: _workflow_states.get(exec_id, {}).get("response_received", False), + timeout=timedelta(minutes=10), + ) + resp = _workflow_states.get(exec_id, {}).get("response_data", {"error": "no_response"}) + # Best-effort cleanup + try: + _workflow_states.pop(exec_id, None) + except Exception: + pass + return resp
325-330
: Lazy import temporalio to match repo conventionModule-level from temporalio import workflow as _twf makes this module a hard dep when imported. Consider moving the import inside _in_workflow_runtime() with try/except ImportError.
src/mcp_agent/app.py (1)
733-735
: Minor: superfluous parenthesesThe extra parentheses around get_workflow_run_decorator(...) are harmless; you may remove for consistency.
src/mcp_agent/mcp/client_proxy.py (2)
160-219
: Async branch returns None on success; tighten contract.Function advertises Dict[str, Any] but returns None when the async POST succeeds. Return a small status payload to keep callers consistent.
if r.status_code >= 400: return {"error": r.text} + # Success: async path enqueued work; nothing to return from server now + return {"ok": True, "status": "queued"}
196-211
: Timeout defaults: consider a finite connect timeout.httpx.Timeout(None) disables all timeouts; a stalled TCP connect can hang a worker thread. Keep no read timeout if desired, but set a small connect timeout (e.g., 5s).
- if timeout_float is None: - timeout = httpx.Timeout(None) + if timeout_float is None: + timeout = httpx.Timeout(connect=5.0, read=None, write=None, pool=None)examples/mcp/mcp_elicitation/temporal/main.py (4)
32-35
: Use the provided request context consistently.Call elicit via app_ctx to avoid accidentally coupling to global app state.
- result = await app.context.upstream_session.elicit( + result = await app_ctx.upstream_session.elicit(
58-59
: Be resilient to CreateMessageResult shape.Accessing haiku.content.text assumes single TextContent. Guard for list/union to avoid AttributeError.
- app.logger.info(f"Haiku: {haiku.content.text}") + text = getattr(getattr(haiku, "content", None), "text", None) + if text is None: + # Try list-of-contents form + items = getattr(haiku, "content", []) or [] + text = next((getattr(c, "text", None) for c in items if getattr(c, "type", "") == "text"), None) + app.logger.info(f"Haiku: {text or '<no text content>'}")
75-83
: Duplicate ConfirmBooking model.Defined twice (tool and workflow). Extract once at module scope.
- class ConfirmBooking(BaseModel): - confirm: bool = Field(description="Confirm booking?") - notes: str = Field(default="", description="Special requests") + # Reuse module-scoped ConfirmBooking model to avoid drift.(Define ConfirmBooking once near top alongside imports.)
105-106
: Same content-shape guard inside workflow.Mirror the safer haiku text extraction here too.
- app.logger.info(f"Haiku: {haiku.content.text}") + text = getattr(getattr(haiku, "content", None), "text", None) + if text is None: + items = getattr(haiku, "content", []) or [] + text = next((getattr(c, "text", None) for c in items if getattr(c, "type", "") == "text"), None) + app.logger.info(f"Haiku: {text or '<no text content>'}")examples/mcp/mcp_elicitation/temporal/client.py (1)
269-279
: _tool_result_to_json: broaden tolerance slightly.If content[0].text isn’t a JSON string, return the raw text to improve debuggability.
- except (json.JSONDecodeError, TypeError): - # If it's not valid JSON, just use the text - return None + except (json.JSONDecodeError, TypeError): + return textsrc/mcp_agent/server/app_server.py (2)
61-71
: Cleanup idempotency keys on unregister to avoid leaks.Free keys in _IDEMPOTENCY_KEYS_SEEN when a run is removed.
async def _unregister_session(run_id: str) -> None: async with _RUN_SESSION_LOCK: execution_id = _RUN_EXECUTION_ID_REGISTRY.pop(run_id, None) if execution_id: _RUN_SESSION_REGISTRY.pop(execution_id, None) + # Also clear any idempotency keys tracked for this execution + async with _IDEMPOTENCY_KEYS_LOCK: + _IDEMPOTENCY_KEYS_SEEN.pop(execution_id, None)
371-385
: Idempotency map may grow unbounded.Now that cleanup is proposed on unregister, consider also pruning on terminal states in _workflow_status for belt-and-suspenders.
- if state in ("completed", "error", "cancelled"): + if state in ("completed", "error", "cancelled"): try: await _unregister_session(run_id) except Exception: pass(_unregister_session now clears idempotency keys too.)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
examples/mcp/mcp_elicitation/main.py
(1 hunks)examples/mcp/mcp_elicitation/temporal/client.py
(1 hunks)examples/mcp/mcp_elicitation/temporal/main.py
(1 hunks)examples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml
(1 hunks)examples/mcp/mcp_elicitation/temporal/mcp_agent.secrets.yaml.example
(1 hunks)examples/mcp/mcp_elicitation/temporal/requirements.txt
(1 hunks)examples/mcp/mcp_elicitation/temporal/worker.py
(1 hunks)src/mcp_agent/app.py
(4 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(2 hunks)src/mcp_agent/executor/temporal/system_activities.py
(1 hunks)src/mcp_agent/executor/workflow.py
(1 hunks)src/mcp_agent/mcp/client_proxy.py
(5 hunks)src/mcp_agent/mcp/sampling_handler.py
(3 hunks)src/mcp_agent/server/app_server.py
(21 hunks)
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
examples/mcp/mcp_elicitation/temporal/requirements.txt
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
examples/mcp/mcp_elicitation/temporal/worker.py
src/mcp_agent/executor/temporal/session_proxy.py
src/mcp_agent/app.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
examples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/workflows/conversation_workflow.py : Implement conversation-as-workflow with internal state management and user input waiting, rather than turn-as-workflow.
Applied to files:
src/mcp_agent/executor/workflow.py
🧬 Code graph analysis (11)
examples/mcp/mcp_elicitation/temporal/client.py (8)
src/mcp_agent/app.py (8)
MCPApp
(42-1003)config
(157-158)context
(149-154)executor
(165-166)workflow
(422-456)logger
(193-210)run
(397-420)server_registry
(161-162)src/mcp_agent/config.py (3)
LoggerSettings
(535-580)MCPSettings
(116-124)MCPServerSettings
(52-113)src/mcp_agent/elicitation/handler.py (1)
console_elicitation_callback
(138-167)src/mcp_agent/core/context.py (2)
Context
(57-103)mcp
(102-103)src/mcp_agent/executor/workflow.py (4)
executor
(118-123)WorkflowExecution
(64-71)run
(163-174)run_id
(133-138)src/mcp_agent/mcp/gen_client.py (1)
gen_client
(17-49)src/mcp_agent/mcp/mcp_agent_client_session.py (1)
MCPAgentClientSession
(73-428)src/mcp_agent/human_input/handler.py (1)
console_input_callback
(68-78)
src/mcp_agent/executor/temporal/system_activities.py (1)
src/mcp_agent/mcp/client_proxy.py (1)
request_via_proxy
(151-257)
examples/mcp/mcp_elicitation/temporal/worker.py (3)
src/mcp_agent/app.py (3)
executor
(165-166)logger
(193-210)run
(397-420)src/mcp_agent/executor/temporal/__init__.py (1)
create_temporal_worker_for_app
(498-554)examples/mcp/mcp_elicitation/temporal/main.py (2)
main
(109-121)run
(66-106)
src/mcp_agent/executor/temporal/session_proxy.py (3)
src/mcp_agent/executor/task_registry.py (1)
get_activity
(25-28)src/mcp_agent/executor/temporal/__init__.py (1)
execute
(212-228)src/mcp_agent/executor/temporal/system_activities.py (1)
relay_request
(92-104)
src/mcp_agent/mcp/sampling_handler.py (1)
src/mcp_agent/human_input/types.py (1)
HumanInputRequest
(7-29)
examples/mcp/mcp_elicitation/main.py (1)
src/mcp_agent/app.py (1)
tool
(753-805)
examples/mcp/mcp_elicitation/temporal/main.py (5)
src/mcp_agent/core/context.py (2)
mcp
(102-103)Context
(57-103)src/mcp_agent/server/app_server.py (4)
app
(135-137)create_mcp_server_for_app
(313-1291)run
(1669-1674)workflows
(140-142)src/mcp_agent/app.py (10)
MCPApp
(42-1003)executor
(165-166)workflow
(422-456)tool
(753-805)context
(149-154)upstream_session
(173-174)upstream_session
(177-178)workflow_run
(508-548)run
(397-420)workflows
(181-182)src/mcp_agent/executor/workflow.py (4)
executor
(118-123)Workflow
(74-851)WorkflowResult
(55-61)run
(163-174)src/mcp_agent/executor/temporal/session_proxy.py (2)
elicit
(306-322)create_message
(265-304)
src/mcp_agent/app.py (2)
src/mcp_agent/executor/workflow.py (3)
_user_response
(433-451)executor
(118-123)run_id
(133-138)src/mcp_agent/executor/decorator_registry.py (2)
get_workflow_signal_decorator
(131-140)get_workflow_run_decorator
(73-82)
src/mcp_agent/executor/workflow.py (2)
src/mcp_agent/app.py (3)
workflow
(422-456)_user_response
(668-686)executor
(165-166)src/mcp_agent/executor/temporal/workflow_signal.py (1)
signal
(145-194)
src/mcp_agent/mcp/client_proxy.py (1)
tests/executor/temporal/test_execution_id_and_interceptor.py (2)
post
(75-79)json
(62-63)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/workflow.py (3)
id
(126-130)run_id
(133-138)executor
(118-123)src/mcp_agent/executor/temporal/session_proxy.py (3)
request
(111-143)request
(342-345)send_request
(165-186)src/mcp_agent/executor/temporal/workflow_signal.py (1)
signal
(145-194)
🔇 Additional comments (8)
src/mcp_agent/mcp/sampling_handler.py (2)
188-195
: Nice usability improvement with the response summary.Appending a concise, readable summary should help reviewers.
266-271
: No change needed —generate_str
accepts list inputs.Multiple
generate_str
signatures acceptmessage: str | MessageParamT | List[MessageParamT]
(e.g. src/mcp_agent/workflows/parallel/parallel_llm.py:170, src/mcp_agent/workflows/orchestrator/orchestrator.py:289), so passingmessages
(a list) is supported.examples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml (1)
6-9
: Confirm task_queue/namespace match the workerexamples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml sets namespace='default' and task_queue='mcp-agent' (executor reads these in src/mcp_agent/executor/temporal/init.py).
No worker registration/startup was found in the repo (searched for Worker.new/Worker.start/WorkerOptions/Client.new_worker/taskQueue) — verify the worker process/deployment binds to namespace 'default' and task_queue 'mcp-agent'.examples/mcp/mcp_elicitation/temporal/worker.py (1)
15-15
: Import robustness for examplesfrom main import app works when run from this directory; if users run from repo root, it may fail. Consider adding a README note or switching to a package-style import if examples become packages.
src/mcp_agent/app.py (1)
702-714
: Signal decoration approach looks goodApplying the engine-specific signal decorator to the dynamically-added _user_response is correct and keeps Temporal optional.
examples/mcp/mcp_elicitation/temporal/client.py (1)
138-146
: Non-fatal: set server logging level is best-effort.Looks good; the try/except already handles older servers. No changes needed.
src/mcp_agent/server/app_server.py (2)
780-784
: Auth centralization LGTM.Shared gateway auth helper is concise and correctly checks either header.
516-543
: Security posture: constant-time compare used correctly.Good use of secrets.compare_digest; fallback to open access when no token is configured is intentional.
examples/mcp/mcp_elicitation/temporal/mcp_agent.secrets.yaml.example
Outdated
Show resolved
Hide resolved
src/mcp_agent/app.py
Outdated
# Create signal handler for elicitation response | ||
async def _user_response(self, response: dict[str,Any]): | ||
"""Signal handler that receives elicitation responses.""" | ||
# Import here to avoid circular dependencies | ||
try: | ||
from temporalio import workflow | ||
from mcp_agent.executor.temporal.session_proxy import _workflow_states | ||
|
||
if workflow.in_workflow(): | ||
workflow_info = workflow.info() | ||
workflow_key = f"{workflow_info.run_id}" | ||
|
||
if workflow_key not in _workflow_states: | ||
_workflow_states[workflow_key] = {} | ||
|
||
_workflow_states[workflow_key]['response_data'] = response | ||
_workflow_states[workflow_key]['response_received'] = True | ||
except ImportError: | ||
# Fallback for non-temporal environments | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same correlation bug in AutoWorkflow signal handler
This duplicate _user_response also writes under run_id; align with execution_id to match SessionProxy.request.
Apply:
- async def _user_response(self, response: dict[str,Any]):
+ async def _user_response(self, payload: dict[str,Any]):
"""Signal handler that receives elicitation responses."""
# Import here to avoid circular dependencies
try:
from temporalio import workflow
from mcp_agent.executor.temporal.session_proxy import _workflow_states
-
- if workflow.in_workflow():
- workflow_info = workflow.info()
- workflow_key = f"{workflow_info.run_id}"
-
- if workflow_key not in _workflow_states:
- _workflow_states[workflow_key] = {}
-
- _workflow_states[workflow_key]['response_data'] = response
- _workflow_states[workflow_key]['response_received'] = True
+ exec_id = None
+ try:
+ exec_id = payload.get("execution_id") or payload.get("exec_id") or payload.get("executionId")
+ except Exception:
+ exec_id = None
+ if not exec_id:
+ try:
+ from mcp_agent.executor.temporal.temporal_context import get_execution_id as _get_exec_id
+ exec_id = _get_exec_id()
+ except Exception:
+ exec_id = None
+ if not exec_id and workflow.in_workflow():
+ exec_id = workflow.info().run_id
+ if exec_id:
+ if exec_id not in _workflow_states:
+ _workflow_states[exec_id] = {}
+ _workflow_states[exec_id]['response_data'] = payload
+ _workflow_states[exec_id]['response_received'] = True
except ImportError:
# Fallback for non-temporal environments
pass
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Create signal handler for elicitation response | |
async def _user_response(self, response: dict[str,Any]): | |
"""Signal handler that receives elicitation responses.""" | |
# Import here to avoid circular dependencies | |
try: | |
from temporalio import workflow | |
from mcp_agent.executor.temporal.session_proxy import _workflow_states | |
if workflow.in_workflow(): | |
workflow_info = workflow.info() | |
workflow_key = f"{workflow_info.run_id}" | |
if workflow_key not in _workflow_states: | |
_workflow_states[workflow_key] = {} | |
_workflow_states[workflow_key]['response_data'] = response | |
_workflow_states[workflow_key]['response_received'] = True | |
except ImportError: | |
# Fallback for non-temporal environments | |
pass | |
# Create signal handler for elicitation response | |
async def _user_response(self, payload: dict[str,Any]): | |
"""Signal handler that receives elicitation responses.""" | |
# Import here to avoid circular dependencies | |
try: | |
from temporalio import workflow | |
from mcp_agent.executor.temporal.session_proxy import _workflow_states | |
exec_id = None | |
try: | |
exec_id = payload.get("execution_id") or payload.get("exec_id") or payload.get("executionId") | |
except Exception: | |
exec_id = None | |
if not exec_id: | |
try: | |
from mcp_agent.executor.temporal.temporal_context import get_execution_id as _get_exec_id | |
exec_id = _get_exec_id() | |
except Exception: | |
exec_id = None | |
if not exec_id and workflow.in_workflow(): | |
exec_id = workflow.info().run_id | |
if exec_id: | |
if exec_id not in _workflow_states: | |
_workflow_states[exec_id] = {} | |
_workflow_states[exec_id]['response_data'] = payload | |
_workflow_states[exec_id]['response_received'] = True | |
except ImportError: | |
# Fallback for non-temporal environments | |
pass |
🤖 Prompt for AI Agents
In src/mcp_agent/app.py around lines 667 to 687, the _user_response signal
handler currently keys state by workflow_info.run_id which duplicates the same
bug; change it to key by the workflow execution_id to match SessionProxy.request
correlation. Replace building workflow_key from run_id with code that uses
workflow_info.execution_id (falling back to run_id only if execution_id is not
present) and ensure the state is stored under that execution_id so
_workflow_states aligns with SessionProxy.request.
self, make_async_call: bool, execution_id: str, method: str, params: Dict[str, Any] | None = None | ||
) -> Dict[str, Any]: | ||
gateway_url = getattr(self.context, "gateway_url", None) | ||
gateway_token = getattr(self.context, "gateway_token", None) | ||
return await request_via_proxy( | ||
make_async_call=make_async_call, | ||
execution_id=execution_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Activity signature change may break in-flight workflows; prefer compatibility or versioned activity
Changing mcp_relay_request’s positional signature is risky for running Temporal workflows. Either:
- Add a new activity name (e.g., mcp_relay_request_async) and keep the old one intact, or
- Accept both shapes via a shim (less clean), or
- Make make_async_call keyword-only with a default and update all call sites to pass by name (still breaks old runs).
I recommend a new activity for the async path to avoid replay incompatibilities.
Run to confirm no other call sites rely on the old 3‑arg signature:
🏁 Script executed:
#!/bin/bash
rg -n -C2 -e 'get_activity\("mcp_relay_request"\)' -e '@activity\.defn\(name="mcp_relay_request"\)' -e '\brelay_request\(' --type=py
Length of output: 1492
Do not change the positional signature of mcp_relay_request — add a new activity for the async path
Changing the positional signature will break replay / in‑flight Temporal workflows. Occurrences: src/mcp_agent/executor/temporal/system_activities.py:91–94 (activity.defn name="mcp_relay_request"); src/mcp_agent/executor/temporal/session_proxy.py:123 (get_activity) and 141–143 (direct call self._system_activities.relay_request(False, ...)). Add a new activity name (e.g., mcp_relay_request_async) for async behavior and keep the original activity unchanged.
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/system_activities.py around lines 93 to 99,
the review warns that you must not change the positional signature of the
existing mcp_relay_request activity because that breaks Temporal replay;
instead, leave the existing function and its signature intact and add a new
activity function (e.g., mcp_relay_request_async) that implements the async path
(accepts the additional boolean/flag as a keyword-only or different positional
signature as needed). Register the new activity under a new activity.defn name
(e.g., "mcp_relay_request_async"), update
src/mcp_agent/executor/temporal/session_proxy.py where get_activity and direct
calls are made (around lines 123 and 141–143) to select or call the new async
activity when async behavior is required, and keep all existing calls to the
original mcp_relay_request unchanged to preserve replay compatibility.
src/mcp_agent/executor/workflow.py
Outdated
@workflow.signal() | ||
async def _user_response(self, response: Dict[str, Any]): | ||
"""Signal handler that receives user responses.""" | ||
# Import here to avoid circular dependencies | ||
try: | ||
from temporalio import workflow | ||
from mcp_agent.executor.temporal.session_proxy import _workflow_states | ||
|
||
if workflow.in_workflow(): | ||
workflow_info = workflow.info() | ||
workflow_key = f"{workflow_info.run_id}" | ||
|
||
if workflow_key not in _workflow_states: | ||
_workflow_states[workflow_key] = {} | ||
|
||
_workflow_states[workflow_key]['response_data'] = response | ||
_workflow_states[workflow_key]['response_received'] = True | ||
except ImportError: | ||
# Fallback for non-temporal environments | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signal/result correlation bug: run_id vs execution_id mismatch will hang wait_condition
_session_proxy.py waits on _workflow_states[exec_id], but this handler writes to a key derived from workflow_info.run_id. This can cause indefinite waiting. Write into the execution_id bucket (or both), preferably using an execution_id in the payload with a fallback to run_id.
Apply:
- @workflow.signal()
- async def _user_response(self, response: Dict[str, Any]):
+ @workflow.signal(name="_user_response")
+ async def _user_response(self, payload: Dict[str, Any]):
"""Signal handler that receives user responses."""
# Import here to avoid circular dependencies
try:
from temporalio import workflow
from mcp_agent.executor.temporal.session_proxy import _workflow_states
-
- if workflow.in_workflow():
- workflow_info = workflow.info()
- workflow_key = f"{workflow_info.run_id}"
-
- if workflow_key not in _workflow_states:
- _workflow_states[workflow_key] = {}
-
- _workflow_states[workflow_key]['response_data'] = response
- _workflow_states[workflow_key]['response_received'] = True
+ # Prefer explicit execution_id in payload; fallback to helper or run_id
+ exec_id = None
+ try:
+ exec_id = payload.get("execution_id") or payload.get("exec_id") or payload.get("executionId")
+ except Exception:
+ exec_id = None
+ if not exec_id:
+ try:
+ from mcp_agent.executor.temporal.temporal_context import get_execution_id as _get_exec_id
+ exec_id = _get_exec_id()
+ except Exception:
+ exec_id = None
+ if not exec_id and workflow.in_workflow():
+ exec_id = workflow.info().run_id
+ if exec_id:
+ if exec_id not in _workflow_states:
+ _workflow_states[exec_id] = {}
+ _workflow_states[exec_id]['response_data'] = payload
+ _workflow_states[exec_id]['response_received'] = True
except ImportError:
# Fallback for non-temporal environments
pass
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@workflow.signal() | |
async def _user_response(self, response: Dict[str, Any]): | |
"""Signal handler that receives user responses.""" | |
# Import here to avoid circular dependencies | |
try: | |
from temporalio import workflow | |
from mcp_agent.executor.temporal.session_proxy import _workflow_states | |
if workflow.in_workflow(): | |
workflow_info = workflow.info() | |
workflow_key = f"{workflow_info.run_id}" | |
if workflow_key not in _workflow_states: | |
_workflow_states[workflow_key] = {} | |
_workflow_states[workflow_key]['response_data'] = response | |
_workflow_states[workflow_key]['response_received'] = True | |
except ImportError: | |
# Fallback for non-temporal environments | |
pass | |
@workflow.signal(name="_user_response") | |
async def _user_response(self, payload: Dict[str, Any]): | |
"""Signal handler that receives user responses.""" | |
# Import here to avoid circular dependencies | |
try: | |
from temporalio import workflow | |
from mcp_agent.executor.temporal.session_proxy import _workflow_states | |
# Prefer explicit execution_id in payload; fallback to helper or run_id | |
exec_id = None | |
try: | |
exec_id = payload.get("execution_id") or payload.get("exec_id") or payload.get("executionId") | |
except Exception: | |
exec_id = None | |
if not exec_id: | |
try: | |
from mcp_agent.executor.temporal.temporal_context import get_execution_id as _get_exec_id | |
exec_id = _get_exec_id() | |
except Exception: | |
exec_id = None | |
if not exec_id and workflow.in_workflow(): | |
exec_id = workflow.info().run_id | |
if exec_id: | |
if exec_id not in _workflow_states: | |
_workflow_states[exec_id] = {} | |
_workflow_states[exec_id]['response_data'] = payload | |
_workflow_states[exec_id]['response_received'] = True | |
except ImportError: | |
# Fallback for non-temporal environments | |
pass |
🤖 Prompt for AI Agents
src/mcp_agent/executor/workflow.py around lines 432 to 451: the signal handler
currently keys _workflow_states by workflow_info.run_id but session_proxy
expects execution_id, causing waits to hang; change the handler to extract
execution_id from the incoming response payload if present (fall back to
workflow_info.execution_id or run_id), then write response_data and
response_received into _workflow_states[execution_id] (and optionally also
mirror into the run_id key) so the session proxy can reliably find the signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good, but concerned about the use of a global _workflow_states.
At minimum would be great to consolidate that into WorkflowRegistry, or have APIs to interact with it so callers aren't mutating the dict directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/mcp_agent/mcp/client_proxy.py (1)
184-189
: URL path-safety: also quote workflow_id.Unescaped workflow_id can break routing for IDs with special chars.
- url = f"{base}/internal/session/by-run/{workflow_id}/{quote(execution_id, safe='')}/async-request" + url = f"{base}/internal/session/by-run/{quote(workflow_id, safe='')}/{quote(execution_id, safe='')}/async-request"src/mcp_agent/executor/workflow.py (1)
432-447
: Fix signal/result correlation: key by execution_id, not run_id (prevents hangs).Current handler writes under run_id while wait_condition checks execution_id, causing indefinite waits. Use execution_id from payload/context with run_id fallback, and name the signal explicitly.
Apply:
- @workflow.signal() - async def _user_response(self, response: Dict[str, Any]): + @workflow.signal(name="_user_response") + async def _user_response(self, payload: Dict[str, Any]): """Signal handler that receives user responses.""" # Import here to avoid circular dependencies try: from temporalio import workflow from mcp_agent.executor.temporal.session_proxy import set_signal_response - - if workflow.in_workflow(): - workflow_info = workflow.info() - workflow_key = f"{workflow_info.run_id}" - - set_signal_response(workflow_key, response) + exec_id = None + if isinstance(payload, dict): + exec_id = ( + payload.get("execution_id") + or payload.get("exec_id") + or payload.get("executionId") + ) + if not exec_id: + try: + from mcp_agent.executor.temporal.temporal_context import ( + get_execution_id as _get_exec_id, + ) + exec_id = _get_exec_id() + except Exception: + exec_id = None + if not exec_id and workflow.in_workflow(): + exec_id = workflow.info().run_id + if exec_id: + set_signal_response(exec_id, payload) except ImportError: # Fallback for non-temporal environments passsrc/mcp_agent/app.py (1)
667-683
: Same correlation bug in AutoWorkflow handler — write by execution_id.Mirror the workflow.py fix so async elicitation doesn’t hang.
- async def _user_response(self, response: dict[str,Any]): + async def _user_response(self, payload: dict[str,Any]): """Signal handler that receives elicitation responses.""" # Import here to avoid circular dependencies try: from temporalio import workflow from mcp_agent.executor.temporal.session_proxy import set_signal_response - - if workflow.in_workflow(): - workflow_info = workflow.info() - workflow_key = f"{workflow_info.run_id}" - - set_signal_response(workflow_key, response) + exec_id = None + if isinstance(payload, dict): + exec_id = ( + payload.get("execution_id") + or payload.get("exec_id") + or payload.get("executionId") + ) + if not exec_id: + try: + from mcp_agent.executor.temporal.temporal_context import ( + get_execution_id as _get_exec_id, + ) + exec_id = _get_exec_id() + except Exception: + exec_id = None + if not exec_id and workflow.in_workflow(): + exec_id = workflow.info().run_id + if exec_id: + set_signal_response(exec_id, payload) except ImportError: # Fallback for non-temporal environments pass
🧹 Nitpick comments (2)
src/mcp_agent/executor/temporal/session_proxy.py (2)
22-30
: Comment mismatch: it’s keyed by execution_id, not run_id.Update to avoid confusion during debugging.
-# run_id -> { +# execution_id -> {
183-189
: Nit: comment names the wrong signal.It says “_elicitation_response”; actual signal is “_user_response”.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
examples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml
(1 hunks)examples/mcp/mcp_elicitation/temporal/mcp_agent.secrets.yaml.example
(1 hunks)examples/mcp/mcp_elicitation/temporal/requirements.txt
(1 hunks)src/mcp_agent/app.py
(4 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(9 hunks)src/mcp_agent/executor/temporal/system_activities.py
(1 hunks)src/mcp_agent/executor/workflow.py
(1 hunks)src/mcp_agent/mcp/client_proxy.py
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- examples/mcp/mcp_elicitation/temporal/mcp_agent.secrets.yaml.example
- examples/mcp/mcp_elicitation/temporal/requirements.txt
- src/mcp_agent/executor/temporal/system_activities.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/app.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
examples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml
🧬 Code graph analysis (4)
src/mcp_agent/app.py (3)
src/mcp_agent/executor/workflow.py (3)
_user_response
(433-447)executor
(118-123)run_id
(133-138)src/mcp_agent/executor/temporal/session_proxy.py (1)
set_signal_response
(44-53)src/mcp_agent/executor/decorator_registry.py (2)
get_workflow_signal_decorator
(131-140)get_workflow_run_decorator
(73-82)
src/mcp_agent/mcp/client_proxy.py (2)
src/mcp_agent/executor/temporal/session_proxy.py (1)
reset_signal_response
(32-41)tests/executor/temporal/test_execution_id_and_interceptor.py (2)
post
(75-79)json
(62-63)
src/mcp_agent/executor/workflow.py (2)
src/mcp_agent/app.py (3)
workflow
(422-456)_user_response
(668-682)executor
(165-166)src/mcp_agent/executor/temporal/session_proxy.py (1)
set_signal_response
(44-53)
src/mcp_agent/executor/temporal/session_proxy.py (2)
src/mcp_agent/executor/temporal/__init__.py (1)
execute
(212-228)src/mcp_agent/executor/temporal/system_activities.py (1)
relay_request
(92-104)
🔇 Additional comments (5)
examples/mcp/mcp_elicitation/temporal/mcp_agent.config.yaml (1)
1-1
: $schema path fixed — LGTMPath now points to repo-root schema correctly for editor validation.
src/mcp_agent/app.py (4)
451-451
: No functional change.Benign whitespace/format tweak.
689-689
: Class dict wiring looks good.Exposes _user_response on the generated class.
698-709
: Good: engine-specific signal decoration applied with explicit name.No change requested.
729-731
: No-op parenthesis change.Safe; no behavior impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/mcp_agent/executor/temporal/session_proxy.py (2)
352-356
: Serialize IncludeContext
include_context
is a pydantic model; must be JSON-serialized before sending.- if include_context is not None: - params["includeContext"] = include_context + if include_context is not None: + params["includeContext"] = include_context.model_dump(by_alias=True, mode="json")
382-389
: Serialize RequestedSchema
requestedSchema
must be JSON-serialized; sending the model will fail JSON encoding.- params: Dict[str, Any] = { - "message": message, - "requestedSchema": requestedSchema, - } + params: Dict[str, Any] = { + "message": message, + "requestedSchema": requestedSchema.model_dump(by_alias=True, mode="json"), + }
♻️ Duplicate comments (4)
src/mcp_agent/executor/temporal/system_activities.py (1)
91-106
: Do not changemcp_relay_request
positional signature; add a new activity for async pathChanging the activity signature breaks replay and in-flight workflows. Keep
mcp_relay_request(execution_id, method, params)
intact and addmcp_relay_request_async(...)
.- @activity.defn(name="mcp_relay_request") - async def relay_request( - self, make_async_call: bool, execution_id: str, method: str, params: Dict[str, Any] | None = None - ) -> Dict[str, Any]: + @activity.defn(name="mcp_relay_request") + async def relay_request( + self, execution_id: str, method: str, params: Dict[str, Any] | None = None + ) -> Dict[str, Any]: gateway_url = getattr(self.context, "gateway_url", None) gateway_token = getattr(self.context, "gateway_token", None) - - return await request_via_proxy( - make_async_call=make_async_call, + return await request_via_proxy( + make_async_call=False, execution_id=execution_id, method=method, params=params or {}, gateway_url=gateway_url, gateway_token=gateway_token, ) + + @activity.defn(name="mcp_relay_request_async") + async def relay_request_async( + self, execution_id: str, method: str, params: Dict[str, Any] | None = None + ) -> Dict[str, Any]: + gateway_url = getattr(self.context, "gateway_url", None) + gateway_token = getattr(self.context, "gateway_token", None) + return await request_via_proxy( + make_async_call=True, + execution_id=execution_id, + method=method, + params=params or {}, + gateway_url=gateway_url, + gateway_token=gateway_token, + )src/mcp_agent/executor/temporal/session_proxy.py (2)
22-30
: Drop asyncio.Lock and make signal-state helpers synchronous; compatible with Temporal signal handlersSignal handlers cannot
await
, and usingasyncio.Lock
inside workflow code risks non-determinism. Make the helpers pure sync and operate on the in-memory dict.-# } -_WORKFLOW_SIGNAL_LOCK = asyncio.Lock() -_workflow_signal_states: Dict[str, Dict[str, Any]] = {} +# } +_workflow_signal_states: Dict[str, Dict[str, Any]] = {} @@ -async def reset_signal_response(execution_id: str) -> None: +def reset_signal_response(execution_id: str) -> None: @@ - async with _WORKFLOW_SIGNAL_LOCK: - if execution_id not in _workflow_signal_states: - _workflow_signal_states[execution_id] = {} - - _workflow_signal_states[execution_id]['response_data'] = None - _workflow_signal_states[execution_id]['response_received'] = False + state = _workflow_signal_states.setdefault(execution_id, {}) + state["response_data"] = None + state["response_received"] = False @@ -async def set_signal_response(execution_id: str, data: Any) -> None: +def set_signal_response(execution_id: str, data: Any) -> None: @@ - async with _WORKFLOW_SIGNAL_LOCK: - if execution_id not in _workflow_signal_states: - _workflow_signal_states[execution_id] = {} - - _workflow_signal_states[execution_id]['response_data'] = data - _workflow_signal_states[execution_id]['response_received'] = True + state = _workflow_signal_states.setdefault(execution_id, {}) + state["response_data"] = data + state["response_received"] = True @@ -async def has_signal_response(execution_id: str) -> bool: +def has_signal_response(execution_id: str) -> bool: @@ - async with _WORKFLOW_SIGNAL_LOCK: - if execution_id not in _workflow_signal_states: - return False - return _workflow_signal_states[execution_id]['response_received'] + state = _workflow_signal_states.get(execution_id) + return bool(state and state.get("response_received")) @@ -async def get_signal_response(execution_id: str) -> Any: +def get_signal_response(execution_id: str) -> Any: @@ - async with _WORKFLOW_SIGNAL_LOCK: - if execution_id not in _workflow_signal_states or \ - not _workflow_signal_states[execution_id]['response_received']: - raise RuntimeError("No signal response received yet") - return _workflow_signal_states[execution_id]['response_data'] + state = _workflow_signal_states.get(execution_id) + if not state or not state.get("response_received"): + raise RuntimeError("No signal response received yet") + return state["response_data"]Also applies to: 32-75
189-199
: Do not change the signature of the existing activity; call a new async variant insteadSwitch to a new activity name (e.g.,
mcp_relay_request_async
) for the async path to preserve replay compatibility ofmcp_relay_request
.- act = self._context.task_registry.get_activity("mcp_relay_request") + act = self._context.task_registry.get_activity("mcp_relay_request_async")src/mcp_agent/mcp/client_proxy.py (1)
184-189
: Quote workflow_id in URL path
workflow_id
may contain unsafe characters. Quote it likeexecution_id
.- url = f"{base}/internal/session/by-run/{workflow_id}/{quote(execution_id, safe='')}/async-request" + url = f"{base}/internal/session/by-run/{quote(workflow_id, safe='')}/{quote(execution_id, safe='')}/async-request"
🧹 Nitpick comments (4)
src/mcp_agent/workflows/factory.py (1)
68-75
: Type the provider overloads consistentlyUse the
SupportedLLMProviders
alias in overloads for consistency and better type checking.@overload def create_llm( - agent: Agent | AgentSpec, - provider: str | None = "openai", + agent: Agent | AgentSpec, + provider: SupportedLLMProviders | None = "openai", model: str | ModelPreferences | None = None, request_params: RequestParams | None = None, context: Context | None = None, ) -> AugmentedLLM: ...src/mcp_agent/executor/temporal/session_proxy.py (1)
200-204
: Nit: comment names the wrong signalComment mentions
_elicitation_response
but PR adds_user_response
. Update for clarity.src/mcp_agent/mcp/client_proxy.py (2)
212-214
: Return a value on async path to match the declared return typeReturn a small ack to keep the return type consistent.
- if r.status_code >= 400: - return {"error": r.text} + if r.status_code >= 400: + return {"error": r.text} + return {"ok": True}
151-159
: Consider widening return type hintOptional: if you prefer not to return an ack for async path, change the type to
Dict[str, Any] | None
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/mcp_agent/app.py
(4 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(9 hunks)src/mcp_agent/executor/temporal/system_activities.py
(1 hunks)src/mcp_agent/executor/workflow.py
(1 hunks)src/mcp_agent/mcp/client_proxy.py
(5 hunks)src/mcp_agent/workflows/factory.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/mcp_agent/app.py
- src/mcp_agent/executor/workflow.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/workflows/factory.py
🧬 Code graph analysis (4)
src/mcp_agent/executor/temporal/system_activities.py (1)
src/mcp_agent/mcp/client_proxy.py (1)
request_via_proxy
(151-252)
src/mcp_agent/mcp/client_proxy.py (2)
src/mcp_agent/executor/temporal/session_proxy.py (1)
reset_signal_response
(32-41)tests/executor/temporal/test_execution_id_and_interceptor.py (2)
post
(75-79)json
(62-63)
src/mcp_agent/workflows/factory.py (4)
src/mcp_agent/workflows/llm/augmented_llm.py (2)
AugmentedLLM
(251-845)RequestParams
(126-196)src/mcp_agent/core/context.py (1)
Context
(57-103)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/agents/agent_spec.py (1)
AgentSpec
(8-43)
src/mcp_agent/executor/temporal/session_proxy.py (2)
src/mcp_agent/executor/temporal/__init__.py (1)
execute
(212-228)src/mcp_agent/executor/temporal/system_activities.py (1)
relay_request
(92-105)
src/mcp_agent/mcp/client_proxy.py
Outdated
# Reset the signal response state, so we're ready to accept a new response signal | ||
await reset_signal_response(execution_id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call reset_signal_response synchronously (after making it sync)
After refactor, drop await
here.
- await reset_signal_response(execution_id)
+ reset_signal_response(execution_id)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Reset the signal response state, so we're ready to accept a new response signal | |
await reset_signal_response(execution_id) | |
# Reset the signal response state, so we're ready to accept a new response signal | |
reset_signal_response(execution_id) |
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/client_proxy.py around lines 179 to 181, the call to
reset_signal_response is currently awaited but should be invoked synchronously
after the refactor that made it a regular (non-async) function; remove the
leading await and call reset_signal_response(execution_id) directly, and ensure
the reset_signal_response definition has been converted from async def to def
(or otherwise exposed as a synchronous function) so no awaiting is required.
@overload | ||
def create_llm( | ||
agent: str, | ||
server_names: List[str] | None = None, | ||
instruction: str | None = None, | ||
provider: str = "openai", | ||
model: str | ModelPreferences | None = None, | ||
request_params: RequestParams | None = None, | ||
context: Context | None = None, | ||
) -> AugmentedLLM: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix create_llM: string overload not handled; passing agent
as str
will break at runtime
Implementation only checks agent_name
for str
. Calls like create_llm("my-agent", ...)
will pass a str
into provider_cls(agent=...)
and crash. Normalize when agent
is a str
, and guard against both agent
(str) and agent_name
being provided.
def create_llm(
- agent: Agent | AgentSpec | None = None,
- agent_name: str | None = None,
+ agent: Agent | AgentSpec | str | None = None,
+ agent_name: str | None = None,
server_names: List[str] | None = None,
instruction: str | None = None,
provider: str = "openai",
model: str | ModelPreferences | None = None,
request_params: RequestParams | None = None,
context: Context | None = None,
) -> AugmentedLLM:
@@
- if isinstance(agent_name, str):
+ # Accept either `agent` as str (positional) or `agent_name`, but not both
+ if isinstance(agent, str):
+ if agent_name:
+ raise ValueError("Provide either 'agent' as str or 'agent_name', not both.")
+ agent_name = agent
+ agent = None
+
+ if isinstance(agent_name, str):
# Handle the case where first argument is agent_name (string)
agent_obj = agent_from_spec(
AgentSpec(
name=agent_name, instruction=instruction, server_names=server_names or []
),
context=context,
)
@@
else:
# Handle Agent case
agent_obj = agent
+
+ if agent_obj is None:
+ raise ValueError("create_llm requires either 'agent' (Agent/AgentSpec/str) or 'agent_name'.")
Also applies to: 90-117
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/factory.py around lines 78-87 (and also apply the
same fix to 90-117), the create_llm overloads accept agent as either an Agent
instance or a str, but the implementation only checks agent_name for str and may
forward a raw string into provider_cls(agent=...), causing a runtime crash.
Normalize inputs up-front: if agent is a str, treat it as agent_name and set
agent=None and agent_name=that string; if both agent and agent_name are
supplied, raise/handle the conflict. Then use the normalized agent/agent_name
when constructing provider_cls and guard provider_cls(...) calls so they never
receive a plain str for the agent parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/mcp_agent/server/app_server.py (1)
736-755
: Ensure Temporal client is initialized before signaling and fix misleading comment.Avoid
None
client and update comment to reflectworkflow_id
+run_id
usage.- # Signal the workflow with the result using method-specific signal + # Signal the workflow with the result using method-specific signal try: # Try to get Temporal client from the app context app = _get_attached_app(mcp_server) if app and app.context and hasattr(app.context, 'executor'): executor = app.context.executor - if hasattr(executor, 'client'): - client = executor.client - # Find the workflow using execution_id as both workflow_id and run_id - try: - workflow_handle = client.get_workflow_handle( - workflow_id=workflow_id, - run_id=execution_id - ) - - await workflow_handle.signal(signal_name, result) - logger.info(f"[async-request] signaled workflow {execution_id} " - f"with {method} result using signal") - except Exception as signal_error: - logger.warning(f"[async-request] failed to signal workflow {execution_id}:" - f" {signal_error}") + # Ensure client is ready + if hasattr(executor, 'ensure_client'): + await executor.ensure_client() + client = getattr(executor, 'client', None) + # Find the workflow using workflow_id + run_id (execution_id) + if client is not None: + try: + workflow_handle = client.get_workflow_handle( + workflow_id=workflow_id, + run_id=execution_id + ) + await workflow_handle.signal(signal_name, result) + logger.info(f"[async-request] signaled workflow {execution_id} " + f"with {method} result using signal") + except Exception as signal_error: + logger.warning( + f"[async-request] failed to signal workflow {execution_id}: {signal_error}" + ) + else: + logger.warning("[async-request] Temporal client not available; cannot signal workflow")
🧹 Nitpick comments (3)
src/mcp_agent/mcp/client_proxy.py (2)
178-183
: Sanitize signal names to avoid forbidden characters.Method names may include slashes/spaces; normalize to a safe subset for Temporal signal names.
+import re @@ - signal_name = f"mcp_rpc_{method}_{uuid.uuid4().hex}" + method_safe = re.sub(r'[^a-zA-Z0-9_.-]+', '_', str(method)) + signal_name = f"mcp_rpc_{method_safe}_{uuid.uuid4().hex}"
189-205
: Honor “no timeout” when MCP_GATEWAY_REQUEST_TIMEOUT <= 0.Current logic only treats unset as “no timeout”; values like 0 or negative should also disable timeouts per the comment.
- timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT") - timeout_float: float | None - if timeout_str is None: - timeout_float = None - else: - try: - timeout_float = float(str(timeout_str).strip()) - except Exception: - timeout_float = None + timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT") + timeout_float: float | None + try: + timeout_float = None if timeout_str is None else float(str(timeout_str).strip()) + if timeout_float is not None and timeout_float <= 0: + timeout_float = None + except Exception: + timeout_float = None @@ - if timeout_float is None: + if timeout_float is None: timeout = httpx.Timeout(None) else: timeout = timeout_floatApply the same change to the synchronous branch for consistency.
Also applies to: 230-245
src/mcp_agent/server/app_server.py (1)
377-385
: Idempotency key registry can grow unbounded; add TTL/eviction.
_IDEMPOTENCY_KEYS_SEEN
is never pruned. Add TTL (e.g., timestamped keys + periodic sweep) or LRU per execution_id to prevent unbounded memory growth.If helpful, I can draft a lightweight TTL sweep using an
asyncio.Task
plus a max set size per execution_id. Want a patch?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/mcp_agent/app.py
(3 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(8 hunks)src/mcp_agent/mcp/client_proxy.py
(5 hunks)src/mcp_agent/server/app_server.py
(21 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/mcp_agent/app.py
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
🧬 Code graph analysis (3)
src/mcp_agent/executor/temporal/session_proxy.py (2)
src/mcp_agent/executor/temporal/__init__.py (1)
execute
(212-228)src/mcp_agent/executor/temporal/system_activities.py (1)
relay_request
(92-105)
src/mcp_agent/server/app_server.py (4)
src/mcp_agent/executor/workflow.py (3)
id
(126-130)run_id
(133-138)executor
(118-123)src/mcp_agent/executor/temporal/session_proxy.py (3)
request
(108-157)request
(356-359)send_request
(179-200)src/mcp_agent/app.py (4)
logger
(205-222)context
(161-166)executor
(177-178)server_registry
(173-174)src/mcp_agent/executor/temporal/workflow_signal.py (1)
signal
(145-194)
src/mcp_agent/mcp/client_proxy.py (1)
tests/executor/temporal/test_execution_id_and_interceptor.py (2)
post
(75-79)json
(62-63)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks awesome! Just a few minor comments
if in_temporal: | ||
workflow_id = workflow.info().workflow_id | ||
else: | ||
in_temporal = activity.in_activity() | ||
if in_temporal: | ||
workflow_id = activity.info().workflow_id | ||
except ImportError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code doesn't initialize workflow_id
in the case where neither workflow.in_workflow()
nor activity.in_activity()
returns true. This will cause a reference error when constructing the URL later in the function. Consider adding a default value for workflow_id
before the try block or handling the case where in_temporal
is false after the exception handling.
if in_temporal: | |
workflow_id = workflow.info().workflow_id | |
else: | |
in_temporal = activity.in_activity() | |
if in_temporal: | |
workflow_id = activity.info().workflow_id | |
except ImportError: | |
workflow_id = None | |
if in_temporal: | |
workflow_id = workflow.info().workflow_id | |
else: | |
in_temporal = activity.in_activity() | |
if in_temporal: | |
workflow_id = activity.info().workflow_id | |
except ImportError: | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/mcp_agent/workflows/factory.py (1)
90-117
: Fixcreate_llm
: Missing handling foragent
asstr
causes runtime errorThe implementation doesn't handle when
agent
is passed as a string, which will crash at runtime when passed toprovider_cls(agent=...)
on line 125. The function should normalize inputs upfront to prevent this issue.Apply this diff to fix the input handling:
def create_llm( agent: Agent | AgentSpec | None = None, agent_name: str | None = None, server_names: List[str] | None = None, instruction: str | None = None, provider: str = "openai", model: str | ModelPreferences | None = None, request_params: RequestParams | None = None, context: Context | None = None, ) -> AugmentedLLM: """ Create an Augmented LLM from an agent, agent spec, or agent name. """ + # Handle the case where agent is passed as a string + if isinstance(agent, str): + if agent_name is not None: + raise ValueError("Cannot specify both 'agent' as string and 'agent_name'") + agent_name = agent + agent = None + if isinstance(agent_name, str): # Handle the case where first argument is agent_name (string) agent_obj = agent_from_spec( AgentSpec( name=agent_name, instruction=instruction, server_names=server_names or [] ), context=context, ) elif isinstance(agent, AgentSpec): # Handle AgentSpec case agent_obj = agent_from_spec(agent, context=context) else: # Handle Agent case agent_obj = agent + + if agent_obj is None: + raise ValueError("Must provide either 'agent' (Agent/AgentSpec/str) or 'agent_name'")
🧹 Nitpick comments (5)
src/mcp_agent/executor/temporal/session_proxy.py (3)
129-135
: Consider adding more context to error messages.When returning errors, include the execution context to aid debugging, especially for the "no signal name" case.
if not signal_name: - return {"error": "no_signal_name_returned_from_activity"} + return {"error": "no_signal_name_returned_from_activity", "execution_id": exec_id, "method": method}
215-233
: Consider adding retry logic for log forwarding.Since logging is important for observability, consider adding a retry mechanism for the activity-based log forwarding in case of transient failures.
if _in_workflow_runtime() and exec_id: + max_retries = 3 + for attempt in range(max_retries): try: act = self._context.task_registry.get_activity("mcp_forward_log") namespace = ( (data or {}).get("namespace") if isinstance(data, dict) else (logger or "mcp_agent") ) message = (data or {}).get("message") if isinstance(data, dict) else "" await self._executor.execute( act, exec_id, str(level), namespace or (logger or "mcp_agent"), message or "", (data or {}), ) return except Exception: + if attempt < max_retries - 1: + await _twf.sleep(0.1 * (attempt + 1)) # Exponential backoff + else: # Fall back to notify path below pass
324-329
: Style: Consider consistent parameter formatting.The indentation changes appear to be formatter-driven but create inconsistent style across methods. Consider standardizing the parameter formatting across all method signatures in the file.
src/mcp_agent/workflows/factory.py (2)
68-88
: Update overload signatures to match implementationThe overload signatures don't accurately reflect the implementation's capability to accept
agent
as a string. The first overload should be updated to includestr
in the union type.Apply this diff to fix the overload signatures:
@overload def create_llm( - agent: Agent | AgentSpec, + agent: Agent | AgentSpec | str, provider: str | None = "openai", model: str | ModelPreferences | None = None, request_params: RequestParams | None = None, context: Context | None = None, ) -> AugmentedLLM: ... @overload def create_llm( agent_name: str, server_names: List[str] | None = None, instruction: str | None = None, provider: str = "openai", model: str | ModelPreferences | None = None, request_params: RequestParams | None = None, context: Context | None = None, ) -> AugmentedLLM: ...
100-102
: Enhance docstring with usage examples and parameter detailsThe docstring should be expanded to clearly document all supported input modes and provide usage examples for better developer experience.
Apply this diff to improve the documentation:
""" Create an Augmented LLM from an agent, agent spec, or agent name. + + This function supports multiple ways to create an AugmentedLLM: + 1. From an existing Agent or AgentSpec instance + 2. From an agent name (string) with optional server_names and instruction + 3. Using positional string argument for agent name + + Args: + agent: An Agent, AgentSpec, or string (agent name) + agent_name: Alternative way to specify agent name when not using positional arg + server_names: List of MCP server names (only used with agent_name) + instruction: Agent instruction (only used with agent_name) + provider: LLM provider (default: "openai") + model: Model identifier or preferences + request_params: Default request parameters + context: Execution context + + Returns: + AugmentedLLM instance configured with the specified agent + + Examples: + >>> # From existing agent + >>> llm = create_llm(my_agent) + >>> + >>> # From agent name with servers + >>> llm = create_llm("assistant", server_names=["server1", "server2"]) + >>> + >>> # Using agent_name parameter + >>> llm = create_llm(agent_name="helper", instruction="Be helpful") """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/mcp_agent/executor/temporal/session_proxy.py
(4 hunks)src/mcp_agent/workflows/factory.py
(2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/workflows/factory.py
🧬 Code graph analysis (2)
src/mcp_agent/executor/temporal/session_proxy.py (3)
src/mcp_agent/executor/temporal/__init__.py (1)
execute
(212-228)src/mcp_agent/executor/temporal/workflow_signal.py (1)
wait_for_signal
(70-126)src/mcp_agent/executor/temporal/system_activities.py (1)
relay_request
(92-105)
src/mcp_agent/workflows/factory.py (4)
src/mcp_agent/workflows/llm/augmented_llm.py (2)
AugmentedLLM
(251-845)RequestParams
(126-196)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/agents/agent_spec.py (1)
AgentSpec
(8-43)src/mcp_agent/core/context.py (1)
Context
(57-103)
🔇 Additional comments (3)
src/mcp_agent/executor/temporal/session_proxy.py (3)
119-127
: LGTM! Async workflow signaling path correctly implemented.The implementation properly uses the async API with signaling for workflow-based requests. The
True
flag clearly indicates async mode, and the execution pattern aligns with the Temporal workflow requirements.
138-153
: Robust payload handling with proper type checking.The implementation correctly handles both Temporal payload wrappers and plain dicts, providing good flexibility for different response types from the signaling system.
156-161
: Clear synchronous/asynchronous path distinction.The use of
False
to indicate synchronous mode provides a clear separation between workflow and non-workflow execution paths, maintaining determinism.
_user_response
to (auto) workflowssession_proxy
, if we're running in a temporal workflow, opt for the async request, and wait for the signal to arriveclient_proxy
, call either the new async request (if requested), or the existing synchronous requestapp_server
, add a new asynchronous endpoint for sampling and elicitation. When called returns immediately after starting a task that executes on the request, then signals the result to the workflowinternal
endpointOther things:
sampling_handler
, add more details on the sampling request/response for the user to make an informed decisionSummary by CodeRabbit
New Features
Bug Fixes / Improvements
Refactor
Documentation
Chores