-
Notifications
You must be signed in to change notification settings - Fork 771
Temporal MCP upstream-session proxy #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds Temporal execution-id propagation, a SessionProxy and SystemActivities for in-workflow gateway calls, internal HTTP proxy routes and session registry on the server, per-run workflow memo support for gateway overrides, Temporal-aware logging/transport changes, gateway HTTP proxy utilities, tests, and assorted CLI/formatting tweaks. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant MCPServer as MCP Server
participant Executor as Temporal Executor
participant Workflow
participant SysAct as SystemActivities
participant Gateway
participant Upstream as Upstream Session
Client->>MCPServer: run_workflow(...)
MCPServer->>Executor: start_workflow(type, memo={execution_id,gateway_*})
Executor-->>MCPServer: start result (workflow_id, run_id)
MCPServer->>MCPServer: register execution_id -> session
MCPServer-->>Client: {workflow_id, run_id, execution_id}
Workflow->>SysAct: mcp_forward_log(execution_id, level, ns, msg, data)
SysAct->>Gateway: POST /internal/workflows/log
Gateway->>Upstream: deliver log to upstream session
Upstream-->>Gateway: ack
Gateway-->>SysAct: {ok:true}
SysAct-->>Workflow: true
sequenceDiagram
autonumber
participant Workflow
participant SysAct as SystemActivities
participant Gateway
participant MCPServer as MCP Server
participant Human
Workflow->>SysAct: mcp_request_user_input(session_id, workflow_id, execution_id, prompt)
SysAct->>Gateway: POST /internal/human/prompts
Gateway->>MCPServer: enqueue prompt + correlation
MCPServer-->>Human: notify prompt
Human-->>MCPServer: human_input.submit(request_id, text)
MCPServer-->>Gateway: forward response
Gateway-->>SysAct: {text: ...}
SysAct-->>Workflow: {text: ...}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
await self.sys_acts.relay_notify( | ||
get_execution_id(), "notifications/message", params | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_execution_id()
function can return None
, but this value is passed directly to relay_notify()
without any null checking. This could lead to activity failures when attempting to look up sessions by execution ID. Consider adding a null check before calling relay_notify()
or providing a fallback execution ID value to ensure the activity can properly route messages.
await self.sys_acts.relay_notify( | |
get_execution_id(), "notifications/message", params | |
) | |
execution_id = get_execution_id() | |
if execution_id is not None: | |
await self.sys_acts.relay_notify( | |
execution_id, "notifications/message", params | |
) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
handle = client.get_workflow_handle( | ||
workflow_id=info.get("workflow_id"), run_id=info.get("run_id") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code retrieves workflow_id
and run_id
from the info
dictionary without validating their presence. Since dict.get()
returns None
for missing keys, passing these potentially None
values to client.get_workflow_handle()
could cause runtime errors.
Consider adding validation before creating the workflow handle:
workflow_id = info.get("workflow_id")
run_id = info.get("run_id")
if not workflow_id or not run_id:
return {"ok": False, "error": "missing workflow identification"}
handle = client.get_workflow_handle(workflow_id=workflow_id, run_id=run_id)
This ensures the Temporal client only receives valid identifiers.
handle = client.get_workflow_handle( | |
workflow_id=info.get("workflow_id"), run_id=info.get("run_id") | |
) | |
workflow_id = info.get("workflow_id") | |
run_id = info.get("run_id") | |
if not workflow_id or not run_id: | |
return {"ok": False, "error": "missing workflow identification"} | |
handle = client.get_workflow_handle(workflow_id=workflow_id, run_id=run_id) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
src/mcp_agent/logging/logger.py
Outdated
if isinstance( | ||
loop, temporalio.worker._workflow_instance._WorkflowInstanceImpl | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type check relies on the private internal class temporalio.worker._workflow_instance._WorkflowInstanceImpl
, which creates fragility as internal implementation details may change between versions. Consider using a more robust approach:
# More robust approach
try:
# Check if this is a Temporal workflow environment
is_temporal_workflow = hasattr(loop, "__temporal_workflow_info")
# Or use a public API if available
# is_temporal_workflow = temporalio.workflow.in_workflow()
if is_temporal_workflow:
self.event_bus.emit_with_stderr_transport(event)
else:
loop.run_until_complete(self.event_bus.emit(event))
except Exception:
# Fallback to stderr in case of any issues
self.event_bus.emit_with_stderr_transport(event)
This approach would be more resilient to Temporal SDK changes and provide better error handling.
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
src/mcp_agent/executor/workflow.py
Outdated
from pydantic import BaseModel, ConfigDict, Field | ||
from mcp_agent.core.context_dependent import ContextDependent | ||
from mcp_agent.executor.workflow_signal import Signal, SignalMailbox | ||
from mcp_agent.executor.temporal import TemporalExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is probably a merge conflict resolution issue... this should be only in TYPE_CHECKING so that workflow.py
doesn't have a direct Temporal dependency. Otherwise even if a user just wants asyncio they still need to import temporal package.
return cfg.url.rstrip("/") | ||
|
||
# Fallback: default local server | ||
return "http://127.0.0.1:8000" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 21
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
src/mcp_agent/core/context.py (2)
10-10
: Mask secret in Context to avoid accidental logging/serialization leaks.gateway_token is sensitive. Hide it from repr and typical dumps.
Apply:
-from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field @@ - gateway_url: str | None = None - gateway_token: str | None = None + gateway_url: str | None = None + gateway_token: str | None = Field(default=None, repr=False)Also applies to: 92-95
74-74
: Fix mutable default list.loaded_subagents shares a single list across Context instances.
Apply:
- loaded_subagents: List["AgentSpec"] = [] + loaded_subagents: List["AgentSpec"] = Field(default_factory=list)src/mcp_agent/executor/temporal/temporal_context.py (1)
1-15
: Use ContextVar instead of a module-level global to prevent cross-task/thread leaks.Temporal + async can run on multiple tasks/threads; a plain global will bleed IDs.
Apply:
-from typing import Optional +from typing import Optional +from contextvars import ContextVar @@ EXECUTION_ID_KEY = "__execution_id" -_execution_id: Optional[str] = None +_execution_id: ContextVar[Optional[str]] = ContextVar("_execution_id", default=None) @@ -def set_execution_id(execution_id: str) -> None: - global _execution_id - _execution_id = execution_id +def set_execution_id(execution_id: Optional[str]) -> None: + _execution_id.set(execution_id) @@ -def get_execution_id() -> Optional[str]: - return _execution_id +def get_execution_id() -> Optional[str]: + return _execution_id.get()src/mcp_agent/server/app_server.py (2)
819-835
: Missing return value in workflows-resume.Function promises bool but returns None.
if result: logger.debug( f"Signaled workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) else: logger.error( f"Failed to signal workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) + return bool(result)
862-870
: Missing return value in workflows-cancel.Function promises bool but returns None.
if result: logger.debug(f"Cancelled workflow {workflow_name} with ID {run_id}") else: logger.error(f"Failed to cancel workflow {workflow_name} with ID {run_id}") + return bool(result)
🧹 Nitpick comments (16)
examples/mcp_agent_server/temporal/client.py (2)
45-65
: Prefer structured logger over print in server log callback.Keeps logs consistent and machine-parsable.
Apply:
async def on_server_log(params: LoggingMessageNotificationParams) -> None: # Pretty-print server logs locally for demonstration level = params.level.upper() name = params.logger or "server" # params.data can be any JSON-serializable data - print(f"[SERVER LOG] [{level}] [{name}] {params.data}") + logger.info(f"[SERVER LOG] [{level}] [{name}]", data=params.data)
88-97
: Remove redundant status fetch and duplicate “Started” log.This block re-parses run_result and logs the same message twice; it adds no value and can confuse users.
Apply:
- get_status_result = await server.call_tool( - "workflows-BasicAgentWorkflow-get_status", - arguments={"run_id": run_id}, - ) - - execution = WorkflowExecution(**json.loads(run_result.content[0].text)) - run_id = execution.run_id - logger.info( - f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" - )tests/executor/test_workflow.py (1)
349-353
: Also cover the non-None memo passthrough.Good assertion for the new
workflow_memo=None
kwarg. Please add a companion test whereworkflow_memo={"k": "v"}
is passed torun_async(...)
and verify it’s forwarded toexecutor.start_workflow(..., workflow_memo={"k": "v"})
. This guards against regressions in the memo plumbing.examples/mcp_agent_server/temporal/basic_agent_server.py (1)
62-76
: Prefer structured logging over embedding large strings.Use logger data fields instead of interpolating the full LLM output into the message. This reduces log noise/PII risk and plays nicer with downstream transports.
Apply:
- app_logger.info("Starting finder agent") + app_logger.info("Starting finder agent", input=input) @@ - app_logger.info(f"Finder agent completed with result {result}") + app_logger.info("Finder agent completed", result=result)src/mcp_agent/logging/transport.py (1)
436-451
: Non-blocking stderr fallback looks good; add a brief docstring.This is a sensible fallback for restricted loops. Add a short docstring for clarity.
- def emit_with_stderr_transport(self, event: Event): + def emit_with_stderr_transport(self, event: Event): + """Print to stderr immediately and enqueue for listeners without awaiting transport.""" print( f"[{event.type}] {event.namespace}: {event.message}", file=sys.stderr, )src/mcp_agent/executor/temporal/session_proxy.py (2)
33-41
: Avoid sending null logger fieldSkip including logger when None to reduce payload noise.
Apply this diff:
- params: Dict[str, Any] = { - "level": level, - "data": data, - "logger": logger, - } + params: Dict[str, Any] = {"level": level, "data": data} + if logger is not None: + params["logger"] = logger
28-30
: Tighten type: data: Any is sufficientThe union Dict[str, Any] | Any is redundant.
Apply this diff:
- level: str, - data: Dict[str, Any] | Any, + level: str, + data: Any,src/mcp_agent/executor/workflow.py (4)
256-270
: Don’t swallow exceptions when binding upstream_sessionAt least debug-log the failure; duplication with initialize() is fine but silent pass makes issues hard to diagnose.
Apply this diff:
- except Exception: - pass + except Exception as e: + self._logger.debug(f"Unable to bind upstream_session/_temporal_run_id: {e}")
384-416
: Normalize level and honor namespace in asyncio path
- Make level case-insensitive.
- Use the provided namespace to fetch a logger in asyncio mode.
Apply this diff:
async def log_upstream( self, level: str, namespace: str, message: str, data: Dict[str, Any] | None = None, ): - if self.context.config.execution_engine == "temporal": + lvl = (level or "info").lower() + if self.context.config.execution_engine == "temporal": @@ - # asyncio: use local logger - if level == "debug": - self._logger.debug(message, **(data or {})) - elif level == "warning": - self._logger.warning(message, **(data or {})) - elif level == "error": - self._logger.error(message, **(data or {})) - else: - self._logger.info(message, **(data or {})) + # asyncio: use namespaced logger + logger = get_logger(namespace or f"workflow.{self.name}", context=self.context) + if lvl not in ("debug", "warning", "error", "info"): + lvl = "info" + getattr(logger, lvl)(message, **(data or {}))
417-440
: ask_user ignores metadata beyond signal_nameOnly signal_name from metadata is forwarded; consider passing the entire metadata dict through the activity and to ask_via_proxy.
I can propose a small change set across Workflow.ask_user and SystemActivities.request_user_input to include and forward metadata if you want.
817-854
: Storing gateway_token in Temporal memo has persistence/security implicationsTemporal memo is durable history. Prefer a short-lived reference (e.g., an ID into your secrets store) or encrypt memo fields with a data converter. Consider rotating tokens per run.
If desired, I can draft an encrypted data converter setup for Temporal (AES-GCM, key from KMS) and wire it into your client/worker.
src/mcp_agent/executor/temporal/__init__.py (1)
284-286
: Document the new workflow_memo parameterUpdate start_workflow docstring/args section to mention workflow_memo and its use.
Example addition to the docstring “Args” block:
- workflow_memo: Optional dict stored in the workflow’s memo (e.g., gateway_url, gateway_token).
src/mcp_agent/server/app_server.py (2)
746-751
: Ensure upstream session setup failures don’t fail tool invocation.Minor: You already wrap in try/except above; consistent here too.
- try: - _set_upstream_from_request_ctx_if_available(ctx) - except Exception: - pass + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass(Apply same defensive pattern to all tool entrypoints for consistency.)
1287-1301
: Guard upstream session attachment for schema-only tools.These tool wrappers call _set_upstream_from_request_ctx_if_available without a try/except; retain symmetry with other entrypoints.
- _set_upstream_from_request_ctx_if_available(ctx) + try: + _set_upstream_from_request_ctx_if_available(ctx) + except Exception: + pass(Apply to both run and get_status.)
src/mcp_agent/executor/temporal/interceptor.py (1)
72-73
: Reduce noisy “ZZ …” info logs or demote to debug.These logs will spam output in production.
- logger.info("ZZ Creating interceptor with payload converter") + logger.debug("Creating interceptor with payload converter") ... - logger.info("ZZ Creating client interceptor class") + logger.debug("Creating client interceptor class") ... - logger.info("ZZ Creating activity interceptor class") + logger.debug("Creating activity interceptor class") ... - logger.info("ZZ Creating workflow interceptor class") + logger.debug("Creating workflow interceptor class") ... - logger.info("ZZ Creating client outbound interceptor") + logger.debug("Creating client outbound interceptor") ... - logger.info("ZZ Creating worker inbound interceptor") + logger.debug("Creating worker inbound interceptor")Also applies to: 78-81, 86-88, 92-94, 105-107, 145-147
src/mcp_agent/mcp/client_proxy.py (1)
45-67
: DRY up headers and timeout retrievalMinor duplication: token header construction and timeout env parsing repeat across functions. Consider small helpers to centralize this logic.
Also applies to: 80-99, 112-128, 140-153
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (17)
examples/mcp_agent_server/asyncio/basic_agent_server.py
(0 hunks)examples/mcp_agent_server/temporal/basic_agent_server.py
(1 hunks)examples/mcp_agent_server/temporal/client.py
(3 hunks)src/mcp_agent/app.py
(2 hunks)src/mcp_agent/cli/cloud/main.py
(1 hunks)src/mcp_agent/core/context.py
(1 hunks)src/mcp_agent/executor/temporal/__init__.py
(6 hunks)src/mcp_agent/executor/temporal/interceptor.py
(1 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(1 hunks)src/mcp_agent/executor/temporal/system_activities.py
(1 hunks)src/mcp_agent/executor/temporal/temporal_context.py
(1 hunks)src/mcp_agent/executor/workflow.py
(6 hunks)src/mcp_agent/logging/logger.py
(4 hunks)src/mcp_agent/logging/transport.py
(2 hunks)src/mcp_agent/mcp/client_proxy.py
(1 hunks)src/mcp_agent/server/app_server.py
(19 hunks)tests/executor/test_workflow.py
(1 hunks)
💤 Files with no reviewable changes (1)
- examples/mcp_agent_server/asyncio/basic_agent_server.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
examples/mcp_agent_server/temporal/client.py
src/mcp_agent/server/app_server.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
examples/mcp_agent_server/temporal/client.py
🧬 Code graph analysis (12)
src/mcp_agent/app.py (1)
src/mcp_agent/logging/logger.py (1)
debug
(139-147)
src/mcp_agent/logging/transport.py (1)
src/mcp_agent/logging/events.py (1)
Event
(39-61)
examples/mcp_agent_server/temporal/basic_agent_server.py (3)
src/mcp_agent/app.py (1)
logger
(189-206)src/mcp_agent/logging/logger.py (1)
info
(149-157)src/mcp_agent/agents/agent.py (1)
attach_llm
(156-192)
src/mcp_agent/executor/temporal/session_proxy.py (3)
src/mcp_agent/core/context.py (1)
Context
(57-103)src/mcp_agent/executor/temporal/system_activities.py (3)
SystemActivities
(14-96)relay_notify
(67-80)relay_request
(83-96)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(13-14)
src/mcp_agent/executor/temporal/interceptor.py (3)
src/mcp_agent/logging/logger.py (1)
get_logger
(390-417)src/mcp_agent/executor/temporal/temporal_context.py (2)
get_execution_id
(13-14)set_execution_id
(8-10)src/mcp_agent/executor/temporal/__init__.py (1)
start_workflow
(277-392)
src/mcp_agent/logging/logger.py (2)
src/mcp_agent/logging/transport.py (2)
emit_with_stderr_transport
(436-450)emit
(410-434)src/mcp_agent/app.py (3)
logger
(189-206)session_id
(185-186)context
(145-150)
src/mcp_agent/executor/temporal/__init__.py (3)
src/mcp_agent/app.py (4)
executor
(161-162)logger
(189-206)context
(145-150)workflow_task
(796-872)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(14-96)forward_log
(18-38)request_user_input
(41-64)relay_notify
(67-80)relay_request
(83-96)src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(54-93)
examples/mcp_agent_server/temporal/client.py (5)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
MCPAgentClientSession
(72-445)src/mcp_agent/config.py (1)
MCPServerSettings
(51-108)examples/mcp_agent_server/asyncio/client.py (2)
on_server_log
(59-64)make_session
(67-77)src/mcp_agent/mcp/gen_client.py (1)
gen_client
(16-41)src/mcp_agent/executor/workflow.py (2)
run_id
(136-141)WorkflowExecution
(67-74)
src/mcp_agent/executor/workflow.py (5)
src/mcp_agent/app.py (7)
executor
(161-162)workflow_signal
(447-495)logger
(189-206)context
(145-150)config
(153-154)session_id
(185-186)workflow
(412-445)src/mcp_agent/executor/temporal/__init__.py (1)
TemporalExecutor
(58-475)src/mcp_agent/executor/temporal/session_proxy.py (1)
SessionProxy
(8-58)src/mcp_agent/logging/logger.py (1)
get_logger
(390-417)src/mcp_agent/executor/task_registry.py (1)
get_activity
(25-28)
src/mcp_agent/executor/temporal/system_activities.py (2)
src/mcp_agent/mcp/client_proxy.py (4)
log_via_proxy
(33-67)ask_via_proxy
(70-99)notify_via_proxy
(102-127)request_via_proxy
(130-153)src/mcp_agent/core/context_dependent.py (1)
ContextDependent
(9-48)
src/mcp_agent/mcp/client_proxy.py (3)
src/mcp_agent/mcp/mcp_server_registry.py (1)
ServerRegistry
(51-385)src/mcp_agent/app.py (1)
server_registry
(157-158)src/mcp_agent/cli/core/api_client.py (1)
post
(75-87)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/workflow.py (3)
executor
(121-126)run_id
(136-141)run_async
(196-382)src/mcp_agent/executor/temporal/temporal_context.py (1)
set_execution_id
(8-10)src/mcp_agent/executor/temporal/session_proxy.py (3)
request
(55-58)send_log_message
(25-47)notify
(49-53)
🔇 Additional comments (14)
src/mcp_agent/cli/cloud/main.py (1)
16-22
: LGTM: import reformat only.Style-only change; no behavioral impact.
examples/mcp_agent_server/temporal/client.py (2)
9-14
: LGTM: imports extended for client-session customization and logging.
66-71
: LGTM: custom ClientSession factory is correctly wired into gen_client.Signature matches (read_stream, write_stream, read_timeout_seconds) -> ClientSession.
src/mcp_agent/app.py (1)
188-206
: LGTM on logger context binding.The bound-context update on reuse is correct and keeps upstream forwarding in sync.
src/mcp_agent/logging/logger.py (1)
407-409
: LGTM on context binding updates.Passing the bound context positionally and refreshing
_bound_context
on reuse is consistent with theLogger
constructor and avoids try/except noise.Also applies to: 415-416
src/mcp_agent/executor/workflow.py (3)
224-225
: Good: support for workflow memo plumbed through run_asyncExtraction of __mcp_agent_workflow_memo looks correct.
242-246
: Memo forwarded to Temporal start_workflowPlumbing is correct and aligns with TemporalExecutor.start_workflow signature.
246-247
: No action needed: WorkflowHandle.result_run_id is available in the Temporal Python SDK
The WorkflowHandle returned by Client.start_workflow exposes both result_run_id and run_id, so the existing fallback is covered.src/mcp_agent/executor/temporal/__init__.py (3)
269-272
: Interceptor wiring looks goodIncludes ContextPropagationInterceptor always; adds TracingInterceptor when tracing is enabled.
376-386
: Memo forwarded to Temporal start_workflowBoth code paths (with/without input_arg) correctly pass memo=workflow_memo or {}.
506-512
: System activities registered with stable namesGood addition; ensures workflows can execute logging, user-input, and relay activities.
src/mcp_agent/executor/temporal/interceptor.py (1)
49-51
: Type mismatch: set_execution_id(None).set_execution_id is typed to accept str; here it may receive None. Update temporal_context.set_execution_id to Optional[str] or guard before setting.
We can send a patch in temporal_context to accept Optional[str] if you confirm this is the intended contract.
src/mcp_agent/mcp/client_proxy.py (2)
14-23
: Precedence + token handling look goodExplicit override → env var → registry → default is clear; token header resolution is straightforward.
Also applies to: 48-51
29-31
: Confirm default gateway URLIs
http://127.0.0.1:8000
the intended dev default for the gateway in this repo? If not, update or source from settings.
# Override the server configuration to point to our local script | ||
context.server_registry.registry["basic_agent_server"] = MCPServerSettings( | ||
name="basic_agent_server", | ||
description="Local workflow server running the basic agent example", | ||
transport="sse", | ||
url="http://0.0.0.0:8000/sse", | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix client URL: 0.0.0.0 is a bind address, not a connect target.
Use localhost/127.0.0.1; current value will often fail to connect.
Apply:
context.server_registry.registry["basic_agent_server"] = MCPServerSettings(
name="basic_agent_server",
description="Local workflow server running the basic agent example",
transport="sse",
- url="http://0.0.0.0:8000/sse",
+ url="http://127.0.0.1:8000/sse",
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Override the server configuration to point to our local script | |
context.server_registry.registry["basic_agent_server"] = MCPServerSettings( | |
name="basic_agent_server", | |
description="Local workflow server running the basic agent example", | |
transport="sse", | |
url="http://0.0.0.0:8000/sse", | |
) | |
# Override the server configuration to point to our local script | |
context.server_registry.registry["basic_agent_server"] = MCPServerSettings( | |
name="basic_agent_server", | |
description="Local workflow server running the basic agent example", | |
transport="sse", | |
url="http://127.0.0.1:8000/sse", | |
) |
🤖 Prompt for AI Agents
In examples/mcp_agent_server/temporal/client.py around lines 36 to 43, the
client URL is using the bind address "0.0.0.0" which cannot be used as a connect
target; change the URL to a loopback address such as "http://127.0.0.1:8000/sse"
(or "http://localhost:8000/sse") so the client can successfully connect to the
local server, replacing the existing url value only.
async def request_user_input( | ||
self, | ||
session_id: str, | ||
workflow_id: str, | ||
execution_id: str, | ||
prompt: str, | ||
signal_name: str = "human_input", | ||
) -> Dict[str, Any]: | ||
# Reuse proxy ask API; returns {result} or {error} | ||
registry = self.context.server_registry | ||
gateway_url = getattr(self.context, "gateway_url", None) | ||
gateway_token = getattr(self.context, "gateway_token", None) | ||
return await ask_via_proxy( | ||
registry, | ||
execution_id=execution_id, | ||
prompt=prompt, | ||
metadata={ | ||
"session_id": session_id, | ||
"workflow_id": workflow_id, | ||
"signal_name": signal_name, | ||
}, | ||
gateway_url=gateway_url, | ||
gateway_token=gateway_token, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Include run_id in user-input correlation metadata.
Without run_id, the gateway cannot deterministically address the correct run when signaling.
Add run_id to the activity signature and metadata, then propagate from the workflow.
- async def request_user_input(
- self,
- session_id: str,
- workflow_id: str,
- execution_id: str,
- prompt: str,
- signal_name: str = "human_input",
- ) -> Dict[str, Any]:
+ async def request_user_input(
+ self,
+ session_id: str,
+ workflow_id: str,
+ run_id: str,
+ execution_id: str,
+ prompt: str,
+ signal_name: str = "human_input",
+ ) -> Dict[str, Any]:
# Reuse proxy ask API; returns {result} or {error}
registry = self.context.server_registry
gateway_url = getattr(self.context, "gateway_url", None)
gateway_token = getattr(self.context, "gateway_token", None)
return await ask_via_proxy(
registry,
execution_id=execution_id,
prompt=prompt,
metadata={
"session_id": session_id,
"workflow_id": workflow_id,
+ "run_id": run_id,
"signal_name": signal_name,
},
gateway_url=gateway_url,
gateway_token=gateway_token,
)
Coordinate call sites accordingly (e.g., pass workflow.info().run_id from the workflow).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def request_user_input( | |
self, | |
session_id: str, | |
workflow_id: str, | |
execution_id: str, | |
prompt: str, | |
signal_name: str = "human_input", | |
) -> Dict[str, Any]: | |
# Reuse proxy ask API; returns {result} or {error} | |
registry = self.context.server_registry | |
gateway_url = getattr(self.context, "gateway_url", None) | |
gateway_token = getattr(self.context, "gateway_token", None) | |
return await ask_via_proxy( | |
registry, | |
execution_id=execution_id, | |
prompt=prompt, | |
metadata={ | |
"session_id": session_id, | |
"workflow_id": workflow_id, | |
"signal_name": signal_name, | |
}, | |
gateway_url=gateway_url, | |
gateway_token=gateway_token, | |
) | |
async def request_user_input( | |
self, | |
session_id: str, | |
workflow_id: str, | |
run_id: str, | |
execution_id: str, | |
prompt: str, | |
signal_name: str = "human_input", | |
) -> Dict[str, Any]: | |
# Reuse proxy ask API; returns {result} or {error} | |
registry = self.context.server_registry | |
gateway_url = getattr(self.context, "gateway_url", None) | |
gateway_token = getattr(self.context, "gateway_token", None) | |
return await ask_via_proxy( | |
registry, | |
execution_id=execution_id, | |
prompt=prompt, | |
metadata={ | |
"session_id": session_id, | |
"workflow_id": workflow_id, | |
"run_id": run_id, | |
"signal_name": signal_name, | |
}, | |
gateway_url=gateway_url, | |
gateway_token=gateway_token, | |
) |
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/system_activities.py around lines 41 to 64,
the request_user_input activity does not include run_id in its signature or the
metadata sent to ask_via_proxy, so the gateway cannot deterministically address
the run; update the activity signature to accept run_id: str, add "run_id":
run_id into the metadata dict passed to ask_via_proxy, and then update all
workflow call sites to pass workflow.info().run_id (or equivalent) when invoking
this activity so the run_id is propagated end-to-end.
async with _PENDING_PROMPTS_LOCK: | ||
_PENDING_PROMPTS[request_id] = { | ||
"workflow_id": metadata.get("workflow_id"), | ||
"execution_id": execution_id, | ||
"signal_name": metadata.get("signal_name", "human_input"), | ||
"session_id": metadata.get("session_id"), | ||
} | ||
await session.send_log_message( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Human prompt correlation misses run_id; signaling may target the wrong execution.
Stored metadata lacks run_id, but human_input.submit tries to use it. Without run_id, Temporal may select an unintended run.
Include run_id in metadata, store it, and use it on signal.
_PENDING_PROMPTS[request_id] = {
- "workflow_id": metadata.get("workflow_id"),
+ "workflow_id": metadata.get("workflow_id"),
+ "run_id": metadata.get("run_id"),
"execution_id": execution_id,
"signal_name": metadata.get("signal_name", "human_input"),
"session_id": metadata.get("session_id"),
}
Coordinate with SystemActivities.request_user_input to pass run_id (see comment in system_activities.py).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async with _PENDING_PROMPTS_LOCK: | |
_PENDING_PROMPTS[request_id] = { | |
"workflow_id": metadata.get("workflow_id"), | |
"execution_id": execution_id, | |
"signal_name": metadata.get("signal_name", "human_input"), | |
"session_id": metadata.get("session_id"), | |
} | |
await session.send_log_message( | |
async with _PENDING_PROMPTS_LOCK: | |
_PENDING_PROMPTS[request_id] = { | |
"workflow_id": metadata.get("workflow_id"), | |
"run_id": metadata.get("run_id"), | |
"execution_id": execution_id, | |
"signal_name": metadata.get("signal_name", "human_input"), | |
"session_id": metadata.get("session_id"), | |
} | |
await session.send_log_message( |
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 554 to 561, the pending
prompt metadata is missing run_id which causes human_input.submit to potentially
signal the wrong Temporal run; update the metadata stored in _PENDING_PROMPTS to
include "run_id": metadata.get("run_id") (or the appropriate run_id variable),
and ensure the value originates from SystemActivities.request_user_input (update
that call to pass run_id if not already done). Store run_id alongside
workflow_id/execution_id/signal_name/session_id and ensure human_input.submit
uses this run_id when sending the signal so the correct run is targeted.
workflow_memo = { | ||
"gateway_url": gateway_url, | ||
"gateway_token": gateway_token, | ||
"execution_id": execution_id, | ||
} | ||
except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Avoid storing gateway_token in Temporal memo (plaintext).
Temporal memos are not encrypted by default and are visible via workflow introspection. Persisting tokens here is a secret leakage risk.
- Do not include gateway_token in workflow_memo.
- Prefer per-run rpc_metadata/headers or KMS-encrypted payloads. If unavoidable, store a short-lived opaque reference and resolve server-side.
- workflow_memo = {
- "gateway_url": gateway_url,
- "gateway_token": gateway_token,
- "execution_id": execution_id,
- }
+ workflow_memo = {
+ "gateway_url": gateway_url,
+ "execution_id": execution_id,
+ }
If per-run headers are needed, we can extend TemporalExecutor.start_workflow to accept rpc_metadata and plumb it from here.
Remove gateway_token from Temporal memo
Memos are stored unencrypted and exposed via workflow introspection (docs.temporal.io).
- Remove
gateway_token
fromworkflow_memo
. - Pass secrets via per-run
rpc_metadata
/headers or wrap them in KMS-encrypted payloads (e.g. extendTemporalExecutor.start_workflow
to accept and forwardrpc_metadata
).
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 1437-1442, remove the
gateway_token field from the workflow_memo dict so secrets are not stored in the
unencrypted Temporal memo; instead, extend the workflow start path to pass
gateway_token via per-run rpc_metadata/headers or an encrypted payload (e.g.,
add an optional rpc_metadata parameter to TemporalExecutor.start_workflow and
forward it to the Temporal client, or wrap gateway_token with KMS before
including it), and update any callers of start_workflow to supply the token via
the new rpc_metadata/encrypted payload rather than placing it into
workflow_memo.
try: | ||
state = str(status.get("status", "")).lower() | ||
if state in ("completed", "error", "cancelled"): | ||
# await _unregister_session(run_id) | ||
pass | ||
except Exception: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Unregister sessions and idempotency keys on terminal states.
Currently commented out, causing session and idempotency set leaks.
- if state in ("completed", "error", "cancelled"):
- # await _unregister_session(run_id)
- pass
+ if state in ("completed", "error", "cancelled"):
+ try:
+ await _unregister_session(run_id)
+ except Exception:
+ pass
+ # Purge idempotency keys for this execution_id
+ try:
+ exec_id = _RUN_EXECUTION_ID_REGISTRY.get(run_id)
+ if exec_id is not None:
+ async with _lock_idem():
+ _IDEMPOTENCY_KEYS_SEEN.pop(exec_id, None)
+ except Exception:
+ pass
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
state = str(status.get("status", "")).lower() | |
if state in ("completed", "error", "cancelled"): | |
# await _unregister_session(run_id) | |
pass | |
except Exception: | |
pass | |
try: | |
state = str(status.get("status", "")).lower() | |
if state in ("completed", "error", "cancelled"): | |
try: | |
await _unregister_session(run_id) | |
except Exception: | |
pass | |
# Purge idempotency keys for this execution_id | |
try: | |
exec_id = _RUN_EXECUTION_ID_REGISTRY.get(run_id) | |
if exec_id is not None: | |
async with _lock_idem(): | |
_IDEMPOTENCY_KEYS_SEEN.pop(exec_id, None) | |
except Exception: | |
pass | |
except Exception: | |
pass |
_EXECUTION_ID: str | None = None | ||
|
||
|
||
def set_execution_id(execution_id: Optional[str]) -> None: | ||
global _EXECUTION_ID | ||
_EXECUTION_ID = execution_id | ||
|
||
|
||
def get_execution_id() -> Optional[str]: | ||
return _EXECUTION_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global variable _EXECUTION_ID
creates a potential thread-safety issue. Since Temporal workers operate in a multi-threaded environment, concurrent calls to set_execution_id()
and get_execution_id()
could lead to race conditions where execution IDs are overwritten between threads, causing incorrect context propagation.
Consider using thread-local storage instead:
import threading
_EXECUTION_ID = threading.local()
def set_execution_id(execution_id: Optional[str]) -> None:
_EXECUTION_ID.value = execution_id
def get_execution_id() -> Optional[str]:
return getattr(_EXECUTION_ID, "value", None)
This would ensure each thread maintains its own isolated execution ID context.
_EXECUTION_ID: str | None = None | |
def set_execution_id(execution_id: Optional[str]) -> None: | |
global _EXECUTION_ID | |
_EXECUTION_ID = execution_id | |
def get_execution_id() -> Optional[str]: | |
return _EXECUTION_ID | |
import threading | |
_EXECUTION_ID = threading.local() | |
def set_execution_id(execution_id: Optional[str]) -> None: | |
_EXECUTION_ID.value = execution_id | |
def get_execution_id() -> Optional[str]: | |
return getattr(_EXECUTION_ID, "value", None) | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/mcp_agent/server/app_server.py (2)
762-819
: resume_workflow and cancel_workflow don’t return valuesBoth tools promise a bool but return None.
result = await workflow_registry.resume_workflow( run_id=run_id, workflow_id=workflow_name, signal_name=signal_name, payload=payload, ) @@ else: logger.error( f"Failed to signal workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) + return bool(result)
819-853
: Return the result of cancel_workflowMatch the tool’s documented return type.
result = await workflow_registry.cancel_workflow( run_id=run_id, workflow_id=workflow_name ) @@ else: logger.error(f"Failed to cancel workflow {workflow_name} with ID {run_id}") + return bool(result)
♻️ Duplicate comments (11)
src/mcp_agent/executor/temporal/interceptor.py (1)
39-52
: Restore and always reset execution_id to prevent context bleedSave previous execution_id and restore it in a finally block so unrelated tasks don’t inherit stale IDs.
@contextmanager def context_from_header( input: _InputWithHeaders, payload_converter: temporalio.converter.PayloadConverter ): - execution_id_payload = input.headers.get(EXECUTION_ID_KEY) + prev = get_execution_id() + execution_id_payload = input.headers.get(EXECUTION_ID_KEY) execution_id_from_header = ( payload_converter.from_payload(execution_id_payload, str) if execution_id_payload else None ) set_execution_id(execution_id_from_header if execution_id_from_header else None) - - yield + try: + yield + finally: + set_execution_id(prev)src/mcp_agent/server/app_server.py (10)
39-47
: Avoid module-level asyncio.Lock and unbounded registries; lazy-init locks and plan cleanupCreating locks at import time can bind to the wrong loop; registries can grow indefinitely.
-_RUN_SESSION_REGISTRY: Dict[str, Any] = {} -_RUN_EXECUTION_ID_REGISTRY: Dict[str, str] = {} -_RUN_SESSION_LOCK = asyncio.Lock() -_PENDING_PROMPTS: Dict[str, Dict[str, Any]] = {} -_PENDING_PROMPTS_LOCK = asyncio.Lock() -_IDEMPOTENCY_KEYS_SEEN: Dict[str, Set[str]] = {} -_IDEMPOTENCY_KEYS_LOCK = asyncio.Lock() +_RUN_SESSION_REGISTRY: Dict[str, Any] = {} +_RUN_EXECUTION_ID_REGISTRY: Dict[str, str] = {} +_RUN_SESSION_LOCK: asyncio.Lock | None = None +_PENDING_PROMPTS: Dict[str, Dict[str, Any]] = {} +_PENDING_PROMPTS_LOCK: asyncio.Lock | None = None +_IDEMPOTENCY_KEYS_SEEN: Dict[str, Set[str]] = {} +_IDEMPOTENCY_KEYS_LOCK: asyncio.Lock | None = None + +def _lock_run() -> asyncio.Lock: + global _RUN_SESSION_LOCK + if _RUN_SESSION_LOCK is None: + _RUN_SESSION_LOCK = asyncio.Lock() + return _RUN_SESSION_LOCK + +def _lock_prompts() -> asyncio.Lock: + global _PENDING_PROMPTS_LOCK + if _PENDING_PROMPTS_LOCK is None: + _PENDING_PROMPTS_LOCK = asyncio.Lock() + return _PENDING_PROMPTS_LOCK + +def _lock_idem() -> asyncio.Lock: + global _IDEMPOTENCY_KEYS_LOCK + if _IDEMPOTENCY_KEYS_LOCK is None: + _IDEMPOTENCY_KEYS_LOCK = asyncio.Lock() + return _IDEMPOTENCY_KEYS_LOCK
338-344
: Default-deny auth for internal routesEndpoints are effectively unauthenticated if MCP_GATEWAY_TOKEN is unset. Require a token or an explicit insecure opt-in (or loopback-only).
- gw_token = os.environ.get("MCP_GATEWAY_TOKEN") - if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: + gw_token = os.environ.get("MCP_GATEWAY_TOKEN") + allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" + header_token = request.headers.get("X-MCP-Gateway-Token") + is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") + if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)): return JSONResponse( {"ok": False, "error": "unauthorized"}, status_code=401 )
346-353
: Idempotency keys can grow unbounded; cap or TTL themBound memory per execution_id.
async with _IDEMPOTENCY_KEYS_LOCK: seen = _IDEMPOTENCY_KEYS_SEEN.setdefault(execution_id or "", set()) if idempotency_key in seen: return JSONResponse({"ok": True, "idempotent": True}) seen.add(idempotency_key) + # Optional cap to last N keys + if len(seen) > 1000: + # remove arbitrary extra keys + for _ in range(len(seen) - 1000): + try: + seen.pop() + except KeyError: + break
500-506
: Default-deny auth for workflow logsSame security posture as notify.
- gw_token = os.environ.get("MCP_GATEWAY_TOKEN") - if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: + gw_token = os.environ.get("MCP_GATEWAY_TOKEN") + allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" + header_token = request.headers.get("X-MCP-Gateway-Token") + is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") + if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)): return JSONResponse( {"ok": False, "error": "unauthorized"}, status_code=401 )
537-541
: Default-deny auth for human promptsHarden this internal route as well.
- gw_token = os.environ.get("MCP_GATEWAY_TOKEN") - if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: + gw_token = os.environ.get("MCP_GATEWAY_TOKEN") + allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" + header_token = request.headers.get("X-MCP-Gateway-Token") + is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") + if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)): return JSONResponse({"error": "unauthorized"}, status_code=401)
554-563
: Store run_id with pending prompt metadataWithout run_id, human_input.submit may target the wrong run or fail.
async with _PENDING_PROMPTS_LOCK: _PENDING_PROMPTS[request_id] = { "workflow_id": metadata.get("workflow_id"), + "run_id": metadata.get("run_id"), "execution_id": execution_id, "signal_name": metadata.get("signal_name", "human_input"), "session_id": metadata.get("session_id"), }
Also update SystemActivities.request_user_input to include run_id in metadata it sends. I can prepare that patch if desired.
858-884
: Validate workflow/run IDs before creating Temporal handleAvoid passing None to get_workflow_handle and return clear error.
async def human_input_submit(request_id: str, text: str) -> Dict[str, Any]: @@ async with _PENDING_PROMPTS_LOCK: info = _PENDING_PROMPTS.pop(request_id, None) if not info: return {"ok": False, "error": "unknown request_id"} + workflow_id = info.get("workflow_id") + run_id = info.get("run_id") + if not workflow_id or not run_id: + return {"ok": False, "error": "missing workflow identification"} try: @@ - handle = client.get_workflow_handle( - workflow_id=info.get("workflow_id"), run_id=info.get("run_id") - ) + handle = client.get_workflow_handle(workflow_id=workflow_id, run_id=run_id)
1391-1408
: Don’t treat X-Forwarded-Proto as a full URLAvoid assigning bare “http/https” to gateway_url; reconstruct only with host present.
- gateway_url = ( - h.get("X-MCP-Gateway-URL") - or h.get("X-Forwarded-Url") - or h.get("X-Forwarded-Proto") - ) - # Best-effort reconstruction if only proto/host provided - if gateway_url is None: + gateway_url = h.get("X-MCP-Gateway-URL") or h.get("X-Forwarded-Url") + # Best-effort reconstruction if only proto/host provided + if not gateway_url: proto = h.get("X-Forwarded-Proto") or "http" host = h.get("X-Forwarded-Host") or h.get("Host") if host: gateway_url = f"{proto}://{host}"
1419-1425
: Don’t store gateway_token in Temporal memo (plaintext secret)Memos are visible via introspection; remove token from memo and pass via headers/metadata instead.
- if gateway_url or gateway_token: + if gateway_url or gateway_token: workflow_memo = { "gateway_url": gateway_url, - "gateway_token": gateway_token, "execution_id": execution_id, }Follow-up: plumb gateway_token via per-run RPC metadata or authorized internal calls; I can draft that change.
1480-1490
: Also purge idempotency keys on terminal workflow statesPrevent per-run memory leaks after completion.
if state in ("completed", "error", "cancelled"): try: await _unregister_session(run_id) except Exception: pass + # Purge idempotency keys for this execution_id + try: + exec_id = _RUN_EXECUTION_ID_REGISTRY.get(run_id) + if exec_id: + async with _lock_idem(): + _IDEMPOTENCY_KEYS_SEEN.pop(exec_id, None) + except Exception: + pass
🧹 Nitpick comments (8)
src/mcp_agent/executor/temporal/interceptor.py (2)
20-22
: Use MutableMapping and in-place update for headersTyping as Mapping while reassigning the attribute is misleading. Prefer MutableMapping (or dict) and update in place.
-from typing import Any, Mapping, Protocol, Type +from typing import Any, MutableMapping, Protocol, Type ... class _InputWithHeaders(Protocol): - headers: Mapping[str, temporalio.api.common.v1.Payload] + headers: MutableMapping[str, temporalio.api.common.v1.Payload] ... - if execution_id_val: - input.headers = { - **input.headers, - EXECUTION_ID_KEY: payload_converter.to_payload(execution_id_val), - } + if execution_id_val: + input.headers[EXECUTION_ID_KEY] = payload_converter.to_payload(execution_id_val)Also applies to: 27-36
57-66
: Docstring mismatch (“user ID” vs execution ID)Update wording to “execution ID” to match actual behavior and key name.
src/mcp_agent/executor/temporal/session_proxy.py (3)
246-271
: Serialize include_context if it’s a Pydantic modelAvoid passing model instances directly; dump to JSON-compatible dict.
- if include_context is not None: - params["includeContext"] = include_context + if include_context is not None: + try: + params["includeContext"] = include_context.model_dump( + by_alias=True, mode="json" + ) # type: ignore[attr-defined] + except Exception: + params["includeContext"] = include_context
272-286
: Serialize requestedSchema for elicitationDump model to JSON-compatible dict before sending.
- params: Dict[str, Any] = { - "message": message, - "requestedSchema": requestedSchema, - } + try: + schema_json = requestedSchema.model_dump(by_alias=True, mode="json") # type: ignore[attr-defined] + except Exception: + schema_json = requestedSchema + params: Dict[str, Any] = { + "message": message, + "requestedSchema": schema_json, + }
288-294
: Private runtime check API may change across Temporal versions_workflow._Runtime.current() is private; consider guarding with a fallback or centralizing this behind a single helper to ease future changes.
Would you like me to add a compatibility helper that tries multiple strategies (private API, then feature-detection) and unit tests around it?
src/mcp_agent/server/app_server.py (3)
50-66
: Switch to lazy locks in helpersUse the new lock helpers to prevent loop affinity issues.
- async with _RUN_SESSION_LOCK: + async with _lock_run(): _RUN_SESSION_REGISTRY[execution_id] = session _RUN_EXECUTION_ID_REGISTRY[run_id] = execution_id ... - async with _RUN_SESSION_LOCK: + async with _lock_run(): execution_id = _RUN_EXECUTION_ID_REGISTRY.pop(run_id, None) if execution_id: _RUN_SESSION_REGISTRY.pop(execution_id, None) ... - async with _RUN_SESSION_LOCK: + async with _lock_run(): return _RUN_SESSION_REGISTRY.get(execution_id)
545-545
: Remove duplicate uuid importuuid is already imported at module top.
- import uuid
1338-1342
: Consider contextvars for execution_idThe global in temporal_context is process-wide and unsafe under concurrency. Switching to contextvars will remove many restore/bleed concerns.
I can provide a small patch to replace the global with a ContextVar and adapt get/set helpers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/mcp_agent/executor/temporal/interceptor.py
(1 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(1 hunks)src/mcp_agent/executor/temporal/temporal_context.py
(1 hunks)src/mcp_agent/server/app_server.py
(19 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/mcp_agent/executor/temporal/temporal_context.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/server/app_server.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/server/app_server.py
🧬 Code graph analysis (3)
src/mcp_agent/executor/temporal/session_proxy.py (5)
src/mcp_agent/core/context.py (2)
mcp
(102-103)Context
(57-103)src/mcp_agent/executor/temporal/system_activities.py (3)
SystemActivities
(14-96)relay_notify
(67-80)relay_request
(83-96)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(13-14)src/mcp_agent/executor/task_registry.py (1)
get_activity
(25-28)src/mcp_agent/executor/temporal/__init__.py (1)
execute
(193-209)
src/mcp_agent/server/app_server.py (5)
src/mcp_agent/executor/temporal/__init__.py (3)
uuid
(453-460)TemporalExecutor
(58-475)ensure_client
(260-275)src/mcp_agent/core/context.py (3)
mcp
(102-103)Context
(57-103)run_async
(282-285)src/mcp_agent/executor/workflow.py (3)
executor
(121-126)run_id
(136-141)run_async
(196-382)src/mcp_agent/executor/temporal/temporal_context.py (1)
set_execution_id
(8-10)src/mcp_agent/executor/temporal/session_proxy.py (7)
request
(96-109)request
(305-308)send_log_message
(157-192)send_progress_notification
(194-212)notify
(77-94)notify
(302-303)send_request
(134-155)
src/mcp_agent/executor/temporal/interceptor.py (2)
src/mcp_agent/logging/logger.py (1)
get_logger
(390-417)src/mcp_agent/executor/temporal/temporal_context.py (2)
get_execution_id
(13-14)set_execution_id
(8-10)
@mcp_server.custom_route( | ||
"/internal/session/by-run/{execution_id}/request", | ||
methods=["POST"], | ||
include_in_schema=False, | ||
) | ||
async def _relay_request(request: Request): | ||
from mcp.types import ( | ||
CreateMessageRequest, | ||
CreateMessageRequestParams, | ||
CreateMessageResult, | ||
ElicitRequest, | ||
ElicitRequestParams, | ||
ElicitResult, | ||
ListRootsRequest, | ||
ListRootsResult, | ||
PingRequest, | ||
EmptyResult, | ||
ServerRequest, | ||
) | ||
|
||
body = await request.json() | ||
execution_id = request.path_params.get("execution_id") | ||
method = body.get("method") | ||
params = body.get("params") or {} | ||
|
||
session = await _get_session(execution_id) | ||
if not session: | ||
return JSONResponse({"error": "session_not_available"}, status_code=503) | ||
|
||
try: | ||
# Prefer generic request passthrough if available | ||
rpc = getattr(session, "rpc", None) | ||
if rpc and hasattr(rpc, "request"): | ||
result = await rpc.request(method, params) | ||
return JSONResponse(result) | ||
# Fallback: Map a small set of supported server->client requests | ||
if method == "sampling/createMessage": | ||
req = ServerRequest( | ||
CreateMessageRequest( | ||
method="sampling/createMessage", | ||
params=CreateMessageRequestParams(**params), | ||
) | ||
) | ||
result = await session.send_request( # type: ignore[attr-defined] | ||
request=req, | ||
result_type=CreateMessageResult, | ||
) | ||
return JSONResponse( | ||
result.model_dump(by_alias=True, mode="json", exclude_none=True) | ||
) | ||
elif method == "elicitation/create": | ||
req = ServerRequest( | ||
ElicitRequest( | ||
method="elicitation/create", | ||
params=ElicitRequestParams(**params), | ||
) | ||
) | ||
result = await session.send_request( # type: ignore[attr-defined] | ||
request=req, | ||
result_type=ElicitResult, | ||
) | ||
return JSONResponse( | ||
result.model_dump(by_alias=True, mode="json", exclude_none=True) | ||
) | ||
elif method == "roots/list": | ||
req = ServerRequest(ListRootsRequest(method="roots/list")) | ||
result = await session.send_request( # type: ignore[attr-defined] | ||
request=req, | ||
result_type=ListRootsResult, | ||
) | ||
return JSONResponse( | ||
result.model_dump(by_alias=True, mode="json", exclude_none=True) | ||
) | ||
elif method == "ping": | ||
req = ServerRequest(PingRequest(method="ping")) | ||
result = await session.send_request( # type: ignore[attr-defined] | ||
request=req, | ||
result_type=EmptyResult, | ||
) | ||
return JSONResponse( | ||
result.model_dump(by_alias=True, mode="json", exclude_none=True) | ||
) | ||
else: | ||
return JSONResponse( | ||
{"error": f"unsupported method: {method}"}, status_code=400 | ||
) | ||
except Exception as e: | ||
return JSONResponse({"error": str(e)}, status_code=500) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing auth on /internal/session/.../request
Add the same token validation as for notify/log routes.
async def _relay_request(request: Request):
+ # Optional shared-secret auth
+ gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
+ allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true"
+ header_token = request.headers.get("X-MCP-Gateway-Token")
+ is_loopback = request.client and request.client.host in ("127.0.0.1", "::1")
+ if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)):
+ return JSONResponse({"error": "unauthorized"}, status_code=401)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@mcp_server.custom_route( | |
"/internal/session/by-run/{execution_id}/request", | |
methods=["POST"], | |
include_in_schema=False, | |
) | |
async def _relay_request(request: Request): | |
from mcp.types import ( | |
CreateMessageRequest, | |
CreateMessageRequestParams, | |
CreateMessageResult, | |
ElicitRequest, | |
ElicitRequestParams, | |
ElicitResult, | |
ListRootsRequest, | |
ListRootsResult, | |
PingRequest, | |
EmptyResult, | |
ServerRequest, | |
) | |
body = await request.json() | |
execution_id = request.path_params.get("execution_id") | |
method = body.get("method") | |
params = body.get("params") or {} | |
session = await _get_session(execution_id) | |
if not session: | |
return JSONResponse({"error": "session_not_available"}, status_code=503) | |
try: | |
# Prefer generic request passthrough if available | |
rpc = getattr(session, "rpc", None) | |
if rpc and hasattr(rpc, "request"): | |
result = await rpc.request(method, params) | |
return JSONResponse(result) | |
# Fallback: Map a small set of supported server->client requests | |
if method == "sampling/createMessage": | |
req = ServerRequest( | |
CreateMessageRequest( | |
method="sampling/createMessage", | |
params=CreateMessageRequestParams(**params), | |
) | |
) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=CreateMessageResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
elif method == "elicitation/create": | |
req = ServerRequest( | |
ElicitRequest( | |
method="elicitation/create", | |
params=ElicitRequestParams(**params), | |
) | |
) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=ElicitResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
elif method == "roots/list": | |
req = ServerRequest(ListRootsRequest(method="roots/list")) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=ListRootsResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
elif method == "ping": | |
req = ServerRequest(PingRequest(method="ping")) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=EmptyResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
else: | |
return JSONResponse( | |
{"error": f"unsupported method: {method}"}, status_code=400 | |
) | |
except Exception as e: | |
return JSONResponse({"error": str(e)}, status_code=500) | |
@mcp_server.custom_route( | |
"/internal/session/by-run/{execution_id}/request", | |
methods=["POST"], | |
include_in_schema=False, | |
) | |
async def _relay_request(request: Request): | |
# Optional shared-secret auth | |
gw_token = os.environ.get("MCP_GATEWAY_TOKEN") | |
allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" | |
header_token = request.headers.get("X-MCP-Gateway-Token") | |
is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") | |
if (gw_token and header_token != gw_token) or ( | |
not gw_token and not (allow_insecure or is_loopback) | |
): | |
return JSONResponse({"error": "unauthorized"}, status_code=401) | |
from mcp.types import ( | |
CreateMessageRequest, | |
CreateMessageRequestParams, | |
CreateMessageResult, | |
ElicitRequest, | |
ElicitRequestParams, | |
ElicitResult, | |
ListRootsRequest, | |
ListRootsResult, | |
PingRequest, | |
EmptyResult, | |
ServerRequest, | |
) | |
body = await request.json() | |
execution_id = request.path_params.get("execution_id") | |
method = body.get("method") | |
params = body.get("params") or {} | |
session = await _get_session(execution_id) | |
if not session: | |
return JSONResponse({"error": "session_not_available"}, status_code=503) | |
try: | |
# Prefer generic request passthrough if available | |
rpc = getattr(session, "rpc", None) | |
if rpc and hasattr(rpc, "request"): | |
result = await rpc.request(method, params) | |
return JSONResponse(result) | |
# Fallback: Map a small set of supported server->client requests | |
if method == "sampling/createMessage": | |
req = ServerRequest( | |
CreateMessageRequest( | |
method="sampling/createMessage", | |
params=CreateMessageRequestParams(**params), | |
) | |
) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=CreateMessageResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
elif method == "elicitation/create": | |
req = ServerRequest( | |
ElicitRequest( | |
method="elicitation/create", | |
params=ElicitRequestParams(**params), | |
) | |
) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=ElicitResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
elif method == "roots/list": | |
req = ServerRequest(ListRootsRequest(method="roots/list")) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=ListRootsResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
elif method == "ping": | |
req = ServerRequest(PingRequest(method="ping")) | |
result = await session.send_request( # type: ignore[attr-defined] | |
request=req, | |
result_type=EmptyResult, | |
) | |
return JSONResponse( | |
result.model_dump(by_alias=True, mode="json", exclude_none=True) | |
) | |
else: | |
return JSONResponse( | |
{"error": f"unsupported method: {method}"}, status_code=400 | |
) | |
except Exception as e: | |
return JSONResponse({"error": str(e)}, status_code=500) |
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 400 to 488, the internal
route /internal/session/by-run/{execution_id}/request is missing the same token
authentication used by the notify/log routes; add the same token validation call
used by those routes at the top of _relay_request (before fetching the session),
reject requests with a 401/403 JSONResponse when validation fails, and reuse the
existing validation helper or decorator (e.g., call the same function that
notify/log use or apply the same token check logic) so behavior and error
responses match the other internal endpoints.
# Token counting and cost tracking | ||
token_counter: Optional[TokenCounter] = None | ||
|
||
# Dynamic gateway configuration (per-run overrides via Temporal memo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might make more sense to be done in TemporalSettings instead of the root context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rholinshead I think you're thinking of the config, not context. This does need to be a runtime value, though possibly the value could be loaded from a config in the future. IMO this is fine as is.
src/mcp_agent/executor/workflow.py
Outdated
self._logger.debug( | ||
f"Proxy parameters: gateway_url={gw}, gateway_token={gt}" | ||
) | ||
|
||
if gw: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
get_status_result = await server.call_tool( | ||
"workflows-BasicAgentWorkflow-get_status", | ||
arguments={"run_id": run_id}, | ||
) | ||
|
||
execution = WorkflowExecution(**json.loads(run_result.content[0].text)) | ||
run_id = execution.run_id | ||
logger.info( | ||
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a duplicate code block at lines 105-114 that repeats the same workflow execution processing. The code retrieves get_status_result
, creates a WorkflowExecution
object, and logs the same message twice in succession. This duplication likely resulted from a copy-paste error during development.
The second block (starting at line 115) already handles the workflow status polling correctly, so the first duplicate block should be removed to prevent:
- Unnecessary API calls
- Variable reassignment that has no effect
- Redundant logging
Removing lines 105-114 would resolve this issue while preserving the intended functionality.
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/mcp_agent/cli/cloud/commands/logger/tail/main.py (1)
176-182
: Preserve CLIError exit codes (don’t downgrade to 5).Catching all Exceptions collapses CLIError.exit_code to 5. Handle CLIError explicitly before the broad except.
- except KeyboardInterrupt: + except KeyboardInterrupt: console.print("\n[yellow]Interrupted by user[/yellow]") sys.exit(0) + except CLIError as e: + console.print(f"[red]{e}[/red]") + raise typer.Exit(e.exit_code) except Exception as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(5)src/mcp_agent/app.py (1)
921-933
: Also guard against re-decoration when registering global workflow tasks.- if task_defn: # Engine-specific decorator available - if isinstance(target, MethodType): + if task_defn: # Engine-specific decorator available + if hasattr(target, "__temporal_activity_definition"): + task_callable = target + elif isinstance(target, MethodType): self_ref = target.__self__ @@ - else: - task_callable = task_defn(func, name=activity_name) + else: + task_callable = task_defn(func, name=activity_name)src/mcp_agent/logging/logger.py (1)
72-91
: Brittle Temporal detection and use of internals; simplify with public API and safe fallback.
- Don’t inspect
temporalio.worker._workflow_instance._WorkflowInstanceImpl
.- Detect workflow via
temporalio.workflow.in_workflow()
.- In workflow context, schedule forwarding via
workflow.create_task()
and fall back to stderr; outside workflow, safelyrun_until_complete
.@@ - else: - # If no loop is running, run it until the emit completes - if isinstance( - loop, temporalio.worker._workflow_instance._WorkflowInstanceImpl - ): - # Handle Temporal workflow environment where run_until_complete() is not implemented - # Prefer forwarding via the upstream session proxy using a workflow task, if available. - try: - from temporalio import workflow as _wf # type: ignore - from mcp_agent.executor.temporal.temporal_context import ( - get_execution_id as _get_exec_id, - ) - - upstream = getattr(event, "upstream_session", None) - if ( - upstream is None - and getattr(self, "_bound_context", None) is not None - ): - try: - upstream = getattr( - self._bound_context, "upstream_session", None - ) - except Exception: - upstream = None - - # Construct payload - async def _forward_via_proxy(): - # If we have an upstream session, use it first - if upstream is not None: - try: - level_map = { - "debug": "debug", - "info": "info", - "warning": "warning", - "error": "error", - "progress": "info", - } - level = level_map.get(event.type, "info") - logger_name = ( - event.namespace - if not event.name - else f"{event.namespace}.{event.name}" - ) - data = { - "message": event.message, - "namespace": event.namespace, - "name": event.name, - "timestamp": event.timestamp.isoformat(), - } - if event.data: - data["data"] = event.data - if event.trace_id or event.span_id: - data["trace"] = { - "trace_id": event.trace_id, - "span_id": event.span_id, - } - if event.context is not None: - try: - data["context"] = event.context.dict() - except Exception: - pass - - await upstream.send_log_message( # type: ignore[attr-defined] - level=level, data=data, logger=logger_name - ) - return - except Exception: - pass - - # Fallback: use activity gateway directly if execution_id is available - try: - exec_id = _get_exec_id() - if exec_id: - level = { - "debug": "debug", - "info": "info", - "warning": "warning", - "error": "error", - "progress": "info", - }.get(event.type, "info") - ns = event.namespace - msg = event.message - data = event.data or {} - # Call by activity name to align with worker registration - await _wf.execute_activity( - "mcp_forward_log", - exec_id, - level, - ns, - msg, - data, - schedule_to_close_timeout=5, - ) - return - except Exception as _e: - pass - - # If all else fails, fall back to stderr transport - self.event_bus.emit_with_stderr_transport(event) - - try: - _wf.create_task(_forward_via_proxy()) - return - except Exception: - # Could not create workflow task, fall through to stderr transport - pass - except Exception: - # If Temporal workflow module unavailable or any error occurs, fall through - pass - - # As a last resort, log to stdout/stderr as a fallback - self.event_bus.emit_with_stderr_transport(event) - else: - try: - loop.run_until_complete(self.event_bus.emit(event)) - except NotImplementedError: - pass + else: + # If no loop is running, prefer Temporal workflow-safe path when applicable. + try: + from temporalio import workflow as _wf # type: ignore + if _wf.in_workflow(): + from mcp_agent.executor.temporal.temporal_context import ( + get_execution_id as _get_exec_id, + ) + + upstream = getattr(event, "upstream_session", None) + if upstream is None and getattr(self, "_bound_context", None) is not None: + try: + upstream = getattr(self._bound_context, "upstream_session", None) + except Exception: + upstream = None + + async def _forward_via_proxy(): + # Prefer upstream session proxy + if upstream is not None: + try: + level_map = { + "debug": "debug", + "info": "info", + "warning": "warning", + "error": "error", + "progress": "info", + } + level = level_map.get(event.type, "info") + logger_name = ( + event.namespace + if not event.name + else f"{event.namespace}.{event.name}" + ) + data = { + "message": event.message, + "namespace": event.namespace, + "name": event.name, + "timestamp": event.timestamp.isoformat(), + } + if event.data: + data["data"] = event.data + if event.trace_id or event.span_id: + data["trace"] = { + "trace_id": event.trace_id, + "span_id": event.span_id, + } + if event.context is not None: + try: + data["context"] = event.context.dict() + except Exception: + pass + await upstream.send_log_message( # type: ignore[attr-defined] + level=level, data=data, logger=logger_name + ) + return + except Exception: + pass + # Fallback to activity gateway + try: + exec_id = _get_exec_id() + if exec_id: + level = { + "debug": "debug", + "info": "info", + "warning": "warning", + "error": "error", + "progress": "info", + }.get(event.type, "info") + await _wf.execute_activity( + "mcp_forward_log", + exec_id, + level, + event.namespace, + event.message, + event.data or {}, + schedule_to_close_timeout=5, + ) + except Exception: + pass + + # Best-effort: schedule forwarding; also emit to stderr immediately. + try: + _wf.create_task(_forward_via_proxy()) + except Exception: + pass + self.event_bus.emit_with_stderr_transport(event) + return + except Exception: + pass + + # Non-Temporal or unknown: block until emit completes, with safe fallback. + try: + loop.run_until_complete(self.event_bus.emit(event)) + except NotImplementedError: + self.event_bus.emit_with_stderr_transport(event)Also applies to: 77-191
♻️ Duplicate comments (15)
src/mcp_agent/app.py (1)
852-858
: Early return skips activity registration (still broken).Returning here prevents the activity from being registered in the task registry. Keep the original target but continue to registration.
- # prevent trying to decorate an already decorated function - if hasattr(target, "__temporal_activity_definition"): - self.logger.debug( - f"target {name} has __temporal_activity_definition" - ) - return target # Already decorated with @activity + # Prevent re-decoration, but still register the original activity + if hasattr(target, "__temporal_activity_definition"): + self.logger.debug( + "Skipping re-decoration for already-temporal activity", + data={"activity_name": activity_name}, + ) + task_callable = target + elif isinstance(target, MethodType): + self_ref = target.__self__ + @functools.wraps(func) + async def _bound_adapter(*a, **k): + return await func(self_ref, *a, **k) + _bound_adapter.__annotations__ = func.__annotations__.copy() + task_callable = task_defn(_bound_adapter, name=activity_name) + else: + task_callable = task_defn(func, name=activity_name)src/mcp_agent/executor/temporal/temporal_context.py (1)
5-8
: Replace process-global with ContextVar for safe fallback storage.Prevents accidental execution_id leakage across threads/tasks; matches the docstring.
@@ -from typing import Optional +from typing import Optional +from contextvars import ContextVar @@ -# Fallback global for non-Temporal contexts. This is best-effort only and -# used when neither workflow nor activity runtime is available. -_EXECUTION_ID: Optional[str] = None +# Fallback ContextVar for non-Temporal contexts/tests when neither workflow nor +# activity runtime is available. +_EXECUTION_ID_CV: ContextVar[Optional[str]] = ContextVar("execution_id", default=None) @@ def set_execution_id(execution_id: Optional[str]) -> None: - global _EXECUTION_ID - _EXECUTION_ID = execution_id + _EXECUTION_ID_CV.set(execution_id) @@ - # Fallback to module-global (primarily for non-Temporal contexts) - return _EXECUTION_ID + # Fallback to ContextVar (primarily for non-Temporal contexts) + return _EXECUTION_ID_CV.get()Also applies to: 10-13, 48-49
examples/mcp_agent_server/temporal/client.py (1)
50-51
: Fix client URL: 0.0.0.0 is a bind address, not a connect target.Use 127.0.0.1/localhost so the client can connect.
- url="http://0.0.0.0:8000/sse", + url="http://127.0.0.1:8000/sse",src/mcp_agent/logging/logger.py (1)
18-19
: Remove top-level Temporal import; make it optional and lazy.Avoid making
temporalio
a hard dependency for all logger users.-import temporalio +# Deliberately avoid importing temporalio at module import time.src/mcp_agent/server/app_server.py (11)
40-44
: Avoid module-level asyncio locks; lazy-init and prevent loop-binding bugsCreate locks on first use to bind to the active loop (prevents issues under multi-loop/hot-reload) and pave the way for safe cleanup.
-_RUN_SESSION_LOCK = asyncio.Lock() -_PENDING_PROMPTS_LOCK = asyncio.Lock() -_IDEMPOTENCY_KEYS_LOCK = asyncio.Lock() +_RUN_SESSION_LOCK: asyncio.Lock | None = None +_PENDING_PROMPTS_LOCK: asyncio.Lock | None = None +_IDEMPOTENCY_KEYS_LOCK: asyncio.Lock | None = NoneAdd after Line 46:
+def _lock_run() -> asyncio.Lock: + global _RUN_SESSION_LOCK + if _RUN_SESSION_LOCK is None: + _RUN_SESSION_LOCK = asyncio.Lock() + return _RUN_SESSION_LOCK + +def _lock_prompts() -> asyncio.Lock: + global _PENDING_PROMPTS_LOCK + if _PENDING_PROMPTS_LOCK is None: + _PENDING_PROMPTS_LOCK = asyncio.Lock() + return _PENDING_PROMPTS_LOCK + +def _lock_idem() -> asyncio.Lock: + global _IDEMPOTENCY_KEYS_LOCK + if _IDEMPOTENCY_KEYS_LOCK is None: + _IDEMPOTENCY_KEYS_LOCK = asyncio.Lock() + return _IDEMPOTENCY_KEYS_LOCKAnd replace usages:
-async with _RUN_SESSION_LOCK: +async with _lock_run():-async with _PENDING_PROMPTS_LOCK: +async with _lock_prompts():-async with _IDEMPOTENCY_KEYS_LOCK: +async with _lock_idem():
335-341
: Default-deny auth for internal routes; allow only with token, loopback, or explicit dev flagCurrent logic allows unauthenticated access when MCP_GATEWAY_TOKEN is unset.
- gw_token = os.environ.get("MCP_GATEWAY_TOKEN") - if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: + gw_token = os.environ.get("MCP_GATEWAY_TOKEN") + allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" + header_token = request.headers.get("X-MCP-Gateway-Token") + is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") + if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)): return JSONResponse( {"ok": False, "error": "unauthorized"}, status_code=401 )Apply the same pattern at Lines 497-503 and 536-537, and add it to the request route (see next comment).
343-350
: Bound idempotency memory growthKeep only the last N keys per execution_id to avoid unbounded growth.
if idempotency_key: - async with _IDEMPOTENCY_KEYS_LOCK: + async with _IDEMPOTENCY_KEYS_LOCK: seen = _IDEMPOTENCY_KEYS_SEEN.setdefault(execution_id or "", set()) if idempotency_key in seen: return JSONResponse({"ok": True, "idempotent": True}) seen.add(idempotency_key) + # Bound memory: keep only last 1000 keys + if len(seen) > 1000: + while len(seen) > 1000: + seen.pop()
402-485
: Add auth to /internal/session/.../request routeThis route currently has no auth check, unlike notify/log.
async def _relay_request(request: Request): + # Optional shared-secret auth (default deny) + gw_token = os.environ.get("MCP_GATEWAY_TOKEN") + allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" + header_token = request.headers.get("X-MCP-Gateway-Token") + is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") + if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)): + return JSONResponse({"error": "unauthorized"}, status_code=401)
497-503
: Harden auth on /internal/workflows/logMatch default-deny behavior for consistency and safety.
- gw_token = os.environ.get("MCP_GATEWAY_TOKEN") - if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: + gw_token = os.environ.get("MCP_GATEWAY_TOKEN") + allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" + header_token = request.headers.get("X-MCP-Gateway-Token") + is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") + if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)): return JSONResponse( {"ok": False, "error": "unauthorized"}, status_code=401 )
536-537
: Harden auth on /internal/human/promptsMake this route default-deny as well.
- if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: + allow_insecure = os.environ.get("MCP_GATEWAY_ALLOW_INSECURE") == "true" + header_token = request.headers.get("X-MCP-Gateway-Token") + is_loopback = request.client and request.client.host in ("127.0.0.1", "::1") + if (gw_token and header_token != gw_token) or (not gw_token and not (allow_insecure or is_loopback)): return JSONResponse({"error": "unauthorized"}, status_code=401)
554-559
: Store run_id in pending prompt metadataPrevents signaling the wrong run.
async with _PENDING_PROMPTS_LOCK: _PENDING_PROMPTS[request_id] = { "workflow_id": metadata.get("workflow_id"), + "run_id": metadata.get("run_id"), "execution_id": execution_id, "signal_name": metadata.get("signal_name", "human_input"), "session_id": metadata.get("session_id"), }
Ensure the upstream SystemActivities.request_user_input includes run_id in metadata.
868-875
: Validate workflow identifiers before creating Temporal handleAvoids None being passed to get_workflow_handle.
client = await executor.ensure_client() - handle = client.get_workflow_handle( - workflow_id=info.get("workflow_id"), run_id=info.get("run_id") - ) + workflow_id = info.get("workflow_id") + run_id = info.get("run_id") + if not workflow_id or not run_id: + return {"ok": False, "error": "missing workflow identification"} + handle = client.get_workflow_handle(workflow_id=workflow_id, run_id=run_id)
1393-1403
: Bug: X-Forwarded-Proto is not a URL; fix gateway_url reconstructionPrevent setting gateway_url to bare "http"/"https".
- gateway_url = ( - h.get("X-MCP-Gateway-URL") - or h.get("X-Forwarded-Url") - or h.get("X-Forwarded-Proto") - ) + gateway_url = h.get("X-MCP-Gateway-URL") or h.get("X-Forwarded-Url") # Best-effort reconstruction if only proto/host provided - if gateway_url is None: + if not gateway_url: proto = h.get("X-Forwarded-Proto") or "http" host = h.get("X-Forwarded-Host") or h.get("Host") if host: gateway_url = f"{proto}://{host}"
1414-1419
: Secret leakage: remove gateway_token from Temporal memoTemporal memos are not encrypted; do not persist secrets there.
- if gateway_url or gateway_token: + if gateway_url: workflow_memo = { "gateway_url": gateway_url, - "gateway_token": gateway_token, }Pass the token via per-run rpc metadata/headers instead of memo (plumb through TemporalExecutor.start_workflow).
1475-1485
: Also purge idempotency keys on terminal statesAvoid lingering per-execution memory after a run completes.
if state in ("completed", "error", "cancelled"): try: await _unregister_session(run_id) except Exception: pass + # Drop idempotency keys for this execution + try: + exec_id = _RUN_EXECUTION_ID_REGISTRY.get(run_id) + if exec_id is not None: + async with _IDEMPOTENCY_KEYS_LOCK: + _IDEMPOTENCY_KEYS_SEEN.pop(exec_id, None) + except Exception: + pass
🧹 Nitpick comments (6)
src/mcp_agent/cli/cloud/commands/logger/tail/main.py (2)
337-339
: Be tolerant of “data: ” with a space.- data_content = line.removeprefix("data:") + data_content = line.removeprefix("data:").lstrip()
72-77
: Avoid shadowing built-in “format”.Consider renaming the option/param to output_format for clarity and to avoid shadowing.
tests/executor/temporal/test_execution_id_and_interceptor.py (1)
35-41
: Avoid class-level mutable state in tests.-class Input: - headers = {} +class Input: + def __init__(self): + self.headers = {}examples/mcp_agent_server/temporal/client.py (1)
81-89
: Optional: validate/normalize server log level input.Consider mapping synonyms (warn→warning, etc.) and validating against allowed enum before calling
set_logging_level
.tests/mcp/test_mcp_aggregator.py (1)
915-1277
: Solid coverage of allowed_tools filtering across scenarios; consider small DRY tweaks.The suite exercises: allow-list, None, empty-set, missing registry, multi-server, exact-match. You can reduce duplication by extracting
mock_fetch_capabilities
and tool lists into helpers/fixtures or parameterizing cases.src/mcp_agent/server/app_server.py (1)
322-568
: Optional: validate request bodies and bound payload sizesReturn 400 on missing/invalid fields and consider a small size cap to avoid large-body abuse.
- Validate that method is a non-empty string; params are dicts.
- Consider rejecting bodies > e.g., 256KB for these internal routes.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (22)
examples/mcp_agent_server/temporal/client.py
(3 hunks)pyproject.toml
(1 hunks)src/mcp_agent/app.py
(4 hunks)src/mcp_agent/cli/cloud/commands/auth/whoami/main.py
(0 hunks)src/mcp_agent/cli/cloud/commands/logger/__init__.py
(1 hunks)src/mcp_agent/cli/cloud/commands/logger/configure/__init__.py
(1 hunks)src/mcp_agent/cli/cloud/commands/logger/configure/main.py
(4 hunks)src/mcp_agent/cli/cloud/commands/logger/tail/__init__.py
(1 hunks)src/mcp_agent/cli/cloud/commands/logger/tail/main.py
(7 hunks)src/mcp_agent/cli/cloud/commands/logger/utils.py
(2 hunks)src/mcp_agent/cli/cloud/main.py
(2 hunks)src/mcp_agent/executor/temporal/__init__.py
(7 hunks)src/mcp_agent/executor/temporal/interceptor.py
(1 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(1 hunks)src/mcp_agent/executor/temporal/temporal_context.py
(1 hunks)src/mcp_agent/logging/listeners.py
(1 hunks)src/mcp_agent/logging/logger.py
(6 hunks)src/mcp_agent/mcp/mcp_aggregator.py
(1 hunks)src/mcp_agent/server/app_server.py
(19 hunks)src/mcp_agent/workflows/deep_orchestrator/README.md
(1 hunks)tests/executor/temporal/test_execution_id_and_interceptor.py
(1 hunks)tests/mcp/test_mcp_aggregator.py
(7 hunks)
💤 Files with no reviewable changes (1)
- src/mcp_agent/cli/cloud/commands/auth/whoami/main.py
✅ Files skipped from review due to trivial changes (9)
- src/mcp_agent/mcp/mcp_aggregator.py
- pyproject.toml
- src/mcp_agent/cli/cloud/commands/logger/tail/init.py
- src/mcp_agent/workflows/deep_orchestrator/README.md
- src/mcp_agent/cli/cloud/commands/logger/configure/init.py
- src/mcp_agent/cli/cloud/commands/logger/init.py
- src/mcp_agent/logging/listeners.py
- src/mcp_agent/cli/cloud/commands/logger/utils.py
- src/mcp_agent/cli/cloud/commands/logger/configure/main.py
🚧 Files skipped from review as they are similar to previous changes (3)
- src/mcp_agent/cli/cloud/main.py
- src/mcp_agent/executor/temporal/interceptor.py
- src/mcp_agent/executor/temporal/init.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/server/app_server.py
examples/mcp_agent_server/temporal/client.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/server/app_server.py
examples/mcp_agent_server/temporal/client.py
📚 Learning: 2025-09-05T14:31:48.115Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.115Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/server/app_server.py
examples/mcp_agent_server/temporal/client.py
src/mcp_agent/logging/logger.py
src/mcp_agent/app.py
🧬 Code graph analysis (9)
tests/executor/temporal/test_execution_id_and_interceptor.py (3)
src/mcp_agent/executor/temporal/temporal_context.py (2)
get_execution_id
(15-49)set_execution_id
(10-12)src/mcp_agent/executor/temporal/interceptor.py (1)
context_from_header
(40-55)src/mcp_agent/mcp/client_proxy.py (3)
log_via_proxy
(33-67)notify_via_proxy
(102-127)request_via_proxy
(130-153)
src/mcp_agent/executor/temporal/temporal_context.py (2)
src/mcp_agent/app.py (1)
workflow
(419-452)src/mcp_agent/executor/workflow.py (1)
run_id
(136-141)
src/mcp_agent/server/app_server.py (5)
src/mcp_agent/core/context.py (3)
mcp
(102-103)Context
(57-103)run_async
(282-285)src/mcp_agent/executor/workflow.py (3)
run_id
(136-141)executor
(121-126)run_async
(196-382)src/mcp_agent/executor/temporal/session_proxy.py (6)
request
(96-109)request
(302-305)send_log_message
(154-189)send_progress_notification
(191-209)notify
(77-94)notify
(299-300)src/mcp_agent/app.py (5)
logger
(190-207)tool
(712-758)context
(146-151)executor
(162-163)workflow
(419-452)src/mcp_agent/executor/temporal/__init__.py (3)
uuid
(453-460)TemporalExecutor
(58-475)ensure_client
(260-275)
src/mcp_agent/cli/cloud/commands/logger/tail/main.py (4)
src/mcp_agent/cli/cloud/commands/logger/utils.py (2)
parse_app_identifier
(12-30)resolve_server_url
(33-100)src/mcp_agent/cli/auth/main.py (1)
load_credentials
(32-46)src/mcp_agent/cli/exceptions.py (1)
CLIError
(4-9)src/mcp_agent/cli/auth/models.py (1)
UserCredentials
(10-64)
examples/mcp_agent_server/temporal/client.py (5)
src/mcp_agent/config.py (1)
MCPServerSettings
(51-112)src/mcp_agent/executor/workflow.py (4)
executor
(121-126)WorkflowExecution
(67-74)run
(166-177)run_id
(136-141)src/mcp_agent/mcp/gen_client.py (1)
gen_client
(16-41)src/mcp_agent/mcp/mcp_agent_client_session.py (1)
MCPAgentClientSession
(72-445)examples/mcp_agent_server/asyncio/client.py (2)
on_server_log
(59-64)make_session
(67-77)
src/mcp_agent/executor/temporal/session_proxy.py (5)
src/mcp_agent/core/context.py (2)
mcp
(102-103)Context
(57-103)src/mcp_agent/executor/temporal/system_activities.py (3)
SystemActivities
(14-96)relay_notify
(67-80)relay_request
(83-96)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(15-49)src/mcp_agent/executor/task_registry.py (1)
get_activity
(25-28)src/mcp_agent/executor/temporal/__init__.py (1)
execute
(193-209)
src/mcp_agent/logging/logger.py (4)
src/mcp_agent/app.py (5)
workflow
(419-452)executor
(162-163)context
(146-151)logger
(190-207)session_id
(186-187)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(15-49)src/mcp_agent/logging/transport.py (3)
get
(309-316)emit_with_stderr_transport
(436-450)emit
(410-434)src/mcp_agent/executor/temporal/session_proxy.py (1)
send_log_message
(154-189)
tests/mcp/test_mcp_aggregator.py (2)
tests/mcp/test_mcp_connection_manager.py (2)
DummySession
(15-32)initialize
(22-26)src/mcp_agent/mcp/mcp_aggregator.py (2)
initialize
(150-196)load_server
(329-458)
src/mcp_agent/app.py (1)
src/mcp_agent/logging/logger.py (2)
set_default_bound_context
(538-540)debug
(255-263)
🔇 Additional comments (8)
src/mcp_agent/executor/temporal/session_proxy.py (2)
164-181
: Confirm expected logging level format at the gateway.If the gateway expects “INFO/ERROR/…” or lowercased strings, using str(level) could produce “LoggingLevel.INFO”. Consider level.name if it’s an Enum, or a mapping.
- str(level), + getattr(level, "name", str(level)),
106-110
: Propagate request errors if execute() returns an exception.- return await self._executor.execute(act, exec_id, method, params or {}) + res = await self._executor.execute(act, exec_id, method, params or {}) + if isinstance(res, BaseException): + return {"error": str(res)} + return resLikely an incorrect or invalid review comment.
src/mcp_agent/app.py (2)
196-206
: LGTM: bind context into logger.Binding context and providing a default bound context will help upstream log routing.
237-241
: LGTM: default bound context initialization.This aligns with the logging module’s default-context support and is safe behind try/except.
tests/executor/temporal/test_execution_id_and_interceptor.py (1)
1-113
: Solid coverage for Temporal context and proxy helpers.The tests exercise workflow/activity IDs, header propagation, and happy/error paths for the proxy helpers.
src/mcp_agent/executor/temporal/temporal_context.py (1)
35-46
: Activity path LGTM.Covers activity runtime via public
activity.info()
, with safe exception guards.examples/mcp_agent_server/temporal/client.py (1)
55-74
: Nice: server log callback wired via custom ClientSession factory.Clean demonstration hook; matches gen_client’s factory signature.
tests/mcp/test_mcp_aggregator.py (1)
871-910
: Good scaffolding for per-server filtering tests.
MockServerConfig
andDummyContextWithServerRegistry
make the tests clear and deterministic.
get_status_result = await server.call_tool( | ||
"workflows-BasicAgentWorkflow-get_status", | ||
arguments={"run_id": run_id}, | ||
) | ||
|
||
execution = WorkflowExecution(**json.loads(run_result.content[0].text)) | ||
run_id = execution.run_id | ||
logger.info( | ||
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate parsing/log line; likely copy/paste error.
You re-parse run_result
and log “Started …” a second time right after get_status. Remove the duplicate block.
- get_status_result = await server.call_tool(
- "workflows-BasicAgentWorkflow-get_status",
- arguments={"run_id": run_id},
- )
-
- execution = WorkflowExecution(**json.loads(run_result.content[0].text))
- run_id = execution.run_id
- logger.info(
- f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}"
- )
+ get_status_result = await server.call_tool(
+ "workflows-BasicAgentWorkflow-get_status",
+ arguments={"run_id": run_id},
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
get_status_result = await server.call_tool( | |
"workflows-BasicAgentWorkflow-get_status", | |
arguments={"run_id": run_id}, | |
) | |
execution = WorkflowExecution(**json.loads(run_result.content[0].text)) | |
run_id = execution.run_id | |
logger.info( | |
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" | |
) | |
get_status_result = await server.call_tool( | |
"workflows-BasicAgentWorkflow-get_status", | |
arguments={"run_id": run_id}, | |
) |
🤖 Prompt for AI Agents
In examples/mcp_agent_server/temporal/client.py around lines 105 to 114, there
is a duplicate block that re-parses run_result into execution and logs "Started
BasicAgentWorkflow..." immediately after calling get_status; remove that
duplicated parsing/logging block (delete the re-instantiation of
WorkflowExecution from run_result and the subsequent logger.info) and, if you
need workflow status information, parse and/or log from get_status_result
instead so the original start log remains only once.
buffer = "" | ||
async for chunk in response.aiter_text(): | ||
buffer += chunk | ||
lines = buffer.split('\n') | ||
lines = buffer.split("\n") | ||
|
||
for line in lines[:-1]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streaming memory leak: buffer never trimmed.
After processing lines[:-1], buffer should be reset to the trailing partial line. Otherwise buffer grows unbounded.
async for chunk in response.aiter_text():
buffer += chunk
lines = buffer.split("\n")
for line in lines[:-1]:
if line.startswith("data:"):
data_content = line.removeprefix("data:")
+ # keep only the unfinished tail
+ buffer = lines[-1]
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
buffer = "" | |
async for chunk in response.aiter_text(): | |
buffer += chunk | |
lines = buffer.split('\n') | |
lines = buffer.split("\n") | |
for line in lines[:-1]: | |
buffer = "" | |
async for chunk in response.aiter_text(): | |
buffer += chunk | |
lines = buffer.split("\n") | |
for line in lines[:-1]: | |
if line.startswith("data:"): | |
data_content = line.removeprefix("data:") | |
# keep only the unfinished tail | |
buffer = lines[-1] |
🤖 Prompt for AI Agents
In src/mcp_agent/cli/cloud/commands/logger/tail/main.py around lines 331 to 336,
the streaming buffer is never trimmed so it can grow unbounded; after iterating
and processing lines[:-1], set buffer to the trailing partial line (lines[-1])
so only the unfinished fragment is retained, and after the async for completes,
if buffer is non-empty emit/process it as the final line to avoid losing the
last partial payload.
exec_id = get_execution_id() | ||
if not exec_id: | ||
return False | ||
|
||
if _in_workflow_runtime(): | ||
try: | ||
act = self._context.task_registry.get_activity("mcp_relay_notify") | ||
await self._executor.execute(act, exec_id, method, params or {}) | ||
return True | ||
except Exception: | ||
return False | ||
# Non-workflow (activity/asyncio) | ||
return bool(await self._sys_acts.relay_notify(exec_id, method, params or {})) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle activity-returned exceptions from executor.execute().
TemporalExecutor.execute may surface errors by returning a BaseException. Treat that as failure, not success.
- await self._executor.execute(act, exec_id, method, params or {})
- return True
+ res = await self._executor.execute(act, exec_id, method, params or {})
+ return False if isinstance(res, BaseException) else True
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
exec_id = get_execution_id() | |
if not exec_id: | |
return False | |
if _in_workflow_runtime(): | |
try: | |
act = self._context.task_registry.get_activity("mcp_relay_notify") | |
await self._executor.execute(act, exec_id, method, params or {}) | |
return True | |
except Exception: | |
return False | |
# Non-workflow (activity/asyncio) | |
return bool(await self._sys_acts.relay_notify(exec_id, method, params or {})) | |
exec_id = get_execution_id() | |
if not exec_id: | |
return False | |
if _in_workflow_runtime(): | |
try: | |
act = self._context.task_registry.get_activity("mcp_relay_notify") | |
res = await self._executor.execute(act, exec_id, method, params or {}) | |
return False if isinstance(res, BaseException) else True | |
except Exception: | |
return False | |
# Non-workflow (activity/asyncio) | |
return bool(await self._sys_acts.relay_notify(exec_id, method, params or {})) |
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/session_proxy.py around lines 82 to 95, the
call to self._executor.execute(...) currently treats any non-exceptional return
as success, but TemporalExecutor.execute can return a BaseException instance to
indicate failure; update the try block to capture the result of await
self._executor.execute(...), check if isinstance(result, BaseException) and
return False in that case, otherwise return True, while still catching and
returning False on raised exceptions.
"messages": [m.model_dump(by_alias=True, mode="json") for m in messages], | ||
"maxTokens": max_tokens, | ||
} | ||
if system_prompt is not None: | ||
params["systemPrompt"] = system_prompt | ||
if include_context is not None: | ||
params["includeContext"] = include_context | ||
if temperature is not None: | ||
params["temperature"] = temperature | ||
if stop_sequences is not None: | ||
params["stopSequences"] = stop_sequences | ||
if metadata is not None: | ||
params["metadata"] = metadata | ||
if model_preferences is not None: | ||
params["modelPreferences"] = model_preferences.model_dump( | ||
by_alias=True, mode="json" | ||
) | ||
if related_request_id is not None: | ||
# Threading ID through JSON-RPC metadata is handled by gateway; include for completeness | ||
params["related_request_id"] = related_request_id | ||
|
||
result = await self.request("sampling/createMessage", params) | ||
return types.CreateMessageResult.model_validate(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure Pydantic payloads are JSON-serializable (include_context).
- if include_context is not None:
- params["includeContext"] = include_context
+ if include_context is not None:
+ try:
+ params["includeContext"] = include_context.model_dump(by_alias=True, mode="json")
+ except Exception:
+ params["includeContext"] = include_context
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"messages": [m.model_dump(by_alias=True, mode="json") for m in messages], | |
"maxTokens": max_tokens, | |
} | |
if system_prompt is not None: | |
params["systemPrompt"] = system_prompt | |
if include_context is not None: | |
params["includeContext"] = include_context | |
if temperature is not None: | |
params["temperature"] = temperature | |
if stop_sequences is not None: | |
params["stopSequences"] = stop_sequences | |
if metadata is not None: | |
params["metadata"] = metadata | |
if model_preferences is not None: | |
params["modelPreferences"] = model_preferences.model_dump( | |
by_alias=True, mode="json" | |
) | |
if related_request_id is not None: | |
# Threading ID through JSON-RPC metadata is handled by gateway; include for completeness | |
params["related_request_id"] = related_request_id | |
result = await self.request("sampling/createMessage", params) | |
return types.CreateMessageResult.model_validate(result) | |
"messages": [m.model_dump(by_alias=True, mode="json") for m in messages], | |
"maxTokens": max_tokens, | |
} | |
if system_prompt is not None: | |
params["systemPrompt"] = system_prompt | |
if include_context is not None: | |
try: | |
params["includeContext"] = include_context.model_dump(by_alias=True, mode="json") | |
except Exception: | |
params["includeContext"] = include_context | |
if temperature is not None: | |
params["temperature"] = temperature | |
if stop_sequences is not None: | |
params["stopSequences"] = stop_sequences | |
if metadata is not None: | |
params["metadata"] = metadata | |
if model_preferences is not None: | |
params["modelPreferences"] = model_preferences.model_dump( | |
by_alias=True, mode="json" | |
) | |
if related_request_id is not None: | |
# Threading ID through JSON-RPC metadata is handled by gateway; include for completeness | |
params["related_request_id"] = related_request_id | |
result = await self.request("sampling/createMessage", params) | |
return types.CreateMessageResult.model_validate(result) |
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/session_proxy.py around lines 245 to 267, the
code adds include_context directly to the params which may be a Pydantic model
and not JSON-serializable; detect if include_context has model_dump (or is a
BaseModel) and replace params["includeContext"] =
include_context.model_dump(by_alias=True, mode="json") (or otherwise serialize
it to a plain dict/JSON-friendly structure) before calling self.request so the
payload is fully JSON-serializable.
async def elicit( | ||
self, | ||
message: str, | ||
requestedSchema: types.ElicitRequestedSchema, | ||
related_request_id: types.RequestId | None = None, | ||
) -> types.ElicitResult: | ||
params: Dict[str, Any] = { | ||
"message": message, | ||
"requestedSchema": requestedSchema, | ||
} | ||
if related_request_id is not None: | ||
params["related_request_id"] = related_request_id | ||
result = await self.request("elicitation/create", params) | ||
return types.ElicitResult.model_validate(result) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure Pydantic payloads are JSON-serializable (requestedSchema).
- params: Dict[str, Any] = {
- "message": message,
- "requestedSchema": requestedSchema,
- }
+ params: Dict[str, Any] = {"message": message}
+ try:
+ params["requestedSchema"] = requestedSchema.model_dump(by_alias=True, mode="json")
+ except Exception:
+ params["requestedSchema"] = requestedSchema
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def elicit( | |
self, | |
message: str, | |
requestedSchema: types.ElicitRequestedSchema, | |
related_request_id: types.RequestId | None = None, | |
) -> types.ElicitResult: | |
params: Dict[str, Any] = { | |
"message": message, | |
"requestedSchema": requestedSchema, | |
} | |
if related_request_id is not None: | |
params["related_request_id"] = related_request_id | |
result = await self.request("elicitation/create", params) | |
return types.ElicitResult.model_validate(result) | |
async def elicit( | |
self, | |
message: str, | |
requestedSchema: types.ElicitRequestedSchema, | |
related_request_id: types.RequestId | None = None, | |
) -> types.ElicitResult: | |
- params: Dict[str, Any] = { | |
- "message": message, | |
- "requestedSchema": requestedSchema, | |
params: Dict[str, Any] = {"message": message} | |
try: | |
params["requestedSchema"] = requestedSchema.model_dump(by_alias=True, mode="json") | |
except Exception: | |
params["requestedSchema"] = requestedSchema | |
if related_request_id is not None: | |
params["related_request_id"] = related_request_id | |
result = await self.request("elicitation/create", params) | |
return types.ElicitResult.model_validate(result) |
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/session_proxy.py around lines 269-283, the
requestedSchema passed into the request may be a Pydantic model (or other
non-JSON-serializable object); convert it to plain JSON-serializable primitives
before adding to params. Detect Pydantic v2 models via hasattr(requestedSchema,
"model_dump") and call requestedSchema.model_dump() (fallback to .dict() for
v1), otherwise if it's already a dict/list leave it, then set
params["requestedSchema"] to that primitive value so the payload is
JSON-serializable before calling self.request.
async def _unregister_session(run_id: str) -> None: | ||
async with _RUN_SESSION_LOCK: | ||
execution_id = _RUN_EXECUTION_ID_REGISTRY.pop(run_id, None) | ||
if execution_id: | ||
_RUN_SESSION_REGISTRY.pop(execution_id, None) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Also purge idempotency keys when unregistering a session
Avoid leaks after a run is cleaned up.
async def _unregister_session(run_id: str) -> None:
- async with _RUN_SESSION_LOCK:
+ async with _RUN_SESSION_LOCK:
execution_id = _RUN_EXECUTION_ID_REGISTRY.pop(run_id, None)
if execution_id:
_RUN_SESSION_REGISTRY.pop(execution_id, None)
+ # Also drop idempotency keys for this execution
+ try:
+ async with _IDEMPOTENCY_KEYS_LOCK:
+ _IDEMPOTENCY_KEYS_SEEN.pop(execution_id, None)
+ except Exception:
+ pass
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def _unregister_session(run_id: str) -> None: | |
async with _RUN_SESSION_LOCK: | |
execution_id = _RUN_EXECUTION_ID_REGISTRY.pop(run_id, None) | |
if execution_id: | |
_RUN_SESSION_REGISTRY.pop(execution_id, None) | |
async def _unregister_session(run_id: str) -> None: | |
async with _RUN_SESSION_LOCK: | |
execution_id = _RUN_EXECUTION_ID_REGISTRY.pop(run_id, None) | |
if execution_id: | |
_RUN_SESSION_REGISTRY.pop(execution_id, None) | |
# Also drop idempotency keys for this execution | |
try: | |
async with _IDEMPOTENCY_KEYS_LOCK: | |
_IDEMPOTENCY_KEYS_SEEN.pop(execution_id, None) | |
except Exception: | |
pass |
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 53 to 58, when unregistering
a session the code pops the execution_id and session registry but does not
remove any idempotency keys, which can leak state; after retrieving execution_id
(inside the same _RUN_SESSION_LOCK block) also remove any idempotency entries
associated with that run: look up and pop the idempotency entry/entries from the
idempotency registry (using execution_id and/or run_id as appropriate for your
storage key), ensuring you handle None safely and keep the operation under the
same lock so cleanup is atomic.
def _in_workflow_runtime() -> bool: | ||
"""Return True if currently executing inside a Temporal workflow sandbox.""" | ||
try: | ||
return _twf._Runtime.current() is not None # type: ignore[attr-defined] | ||
except Exception: | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code accesses the private Temporal API _twf._Runtime.current()
(note the leading underscore). While this works currently, it relies on implementation details that could change in future Temporal releases without notice.
Consider:
- Checking if there's a public API alternative for detecting workflow context
- Adding a comment acknowledging the risk and explaining why this approach was chosen
- Wrapping this in a version-specific adapter that can be updated if the API changes
This pattern appears in several places throughout the PR, so addressing it consistently would improve long-term maintainability.
def _in_workflow_runtime() -> bool: | |
"""Return True if currently executing inside a Temporal workflow sandbox.""" | |
try: | |
return _twf._Runtime.current() is not None # type: ignore[attr-defined] | |
except Exception: | |
return False | |
def _in_workflow_runtime() -> bool: | |
"""Return True if currently executing inside a Temporal workflow sandbox.""" | |
try: | |
# NOTE: This uses Temporal's private API which may change in future releases. | |
# We use this approach because there's no public API available for detecting | |
# if code is running inside a workflow sandbox as of this implementation. | |
# If this breaks in a future Temporal SDK update, we'll need to update this check. | |
return _twf._Runtime.current() is not None # type: ignore[attr-defined] | |
except Exception: | |
return False | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
if getattr(_wf, "_Runtime").current() is not None: # type: ignore[attr-defined] | ||
return _wf.info().run_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code accesses the private Temporal API _Runtime.current()
which isn't part of the public interface. While this works today, it creates a dependency on implementation details that could change in future Temporal SDK versions. Consider:
- Using a documented public API if available
- Adding a comment acknowledging the risk
- Wrapping this in a try/except to gracefully handle API changes
If this approach is necessary, consider reaching out to the Temporal team for guidance on a more stable detection method for workflow runtime contexts.
if getattr(_wf, "_Runtime").current() is not None: # type: ignore[attr-defined] | |
return _wf.info().run_id | |
# Using private Temporal API (_Runtime) to detect if we're in a workflow context | |
# This is currently the only reliable way to detect this, but may break in future SDK versions | |
try: | |
if getattr(_wf, "_Runtime").current() is not None: # type: ignore[attr-defined] | |
return _wf.info().run_id | |
except (AttributeError, TypeError): | |
# Gracefully handle case where the private API changes in future Temporal SDK versions | |
pass |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
src/mcp_agent/logging/logger.py
Outdated
if getattr(_wf, "_Runtime").current() is not None: # type: ignore[attr-defined] | ||
in_temporal_workflow = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code accesses Temporal's private API through getattr(_wf, "_Runtime").current()
with a type ignore comment. This approach appears in multiple places and creates a dependency on Temporal's internal implementation details. Consider using a more stable detection method or working with the Temporal team to establish a supported public API for this functionality. If the private API changes in future Temporal releases, this code will likely break.
if getattr(_wf, "_Runtime").current() is not None: # type: ignore[attr-defined] | |
in_temporal_workflow = True | |
try: | |
import temporalio.workflow | |
in_temporal_workflow = temporalio.workflow.in_workflow() | |
except (ImportError, AttributeError): | |
in_temporal_workflow = False |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
gw_token = os.environ.get("MCP_GATEWAY_TOKEN") | ||
if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: | ||
return JSONResponse( | ||
{"ok": False, "error": "unauthorized"}, status_code=401 | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security Recommendation: The token comparison in this code is vulnerable to timing attacks. When comparing security tokens, it's important to use constant-time comparison to prevent attackers from inferring token values by measuring response times.
Consider replacing:
if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token:
With Python's built-in constant-time comparison:
import secrets
if gw_token and not secrets.compare_digest(request.headers.get("X-MCP-Gateway-Token", ""), gw_token):
This ensures that the time taken to compare tokens doesn't leak information about how close an attacker's guess is to the actual token value.
gw_token = os.environ.get("MCP_GATEWAY_TOKEN") | |
if gw_token and request.headers.get("X-MCP-Gateway-Token") != gw_token: | |
return JSONResponse( | |
{"ok": False, "error": "unauthorized"}, status_code=401 | |
) | |
gw_token = os.environ.get("MCP_GATEWAY_TOKEN") | |
if gw_token and not secrets.compare_digest(request.headers.get("X-MCP-Gateway-Token", ""), gw_token): | |
return JSONResponse( | |
{"ok": False, "error": "unauthorized"}, status_code=401 | |
) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
src/mcp_agent/executor/temporal/session_proxy.py (4)
108-112
: Defensive handling for executor returning an exception-like valueWhile decorated activities usually raise on failure, guard against execute() returning a BaseException to keep request()’s contract predictable.
- act = self._context.task_registry.get_activity("mcp_relay_request") - return await self._executor.execute(act, exec_id, method, params or {}) + act = self._context.task_registry.get_activity("mcp_relay_request") + res = await self._executor.execute(act, exec_id, method, params or {}) + if isinstance(res, BaseException): + raise res + return res
86-96
: Propagate activity return value; don’t always return TrueInside workflow, notify() ignores the activity’s boolean result and always returns True. Return the actual bool and treat BaseException (if ever returned) as failure.
- try: - act = self._context.task_registry.get_activity("mcp_relay_notify") - await self._executor.execute(act, exec_id, method, params or {}) - return True - except Exception: - return False + try: + act = self._context.task_registry.get_activity("mcp_relay_notify") + res = await self._executor.execute(act, exec_id, method, params or {}) + return False if isinstance(res, BaseException) else bool(res) + except Exception: + return False
254-256
: Ensure include_context is JSON-serializableinclude_context may be a Pydantic model. Serialize before sending to the gateway to avoid runtime serialization errors.
- if include_context is not None: - params["includeContext"] = include_context + if include_context is not None: + try: + params["includeContext"] = include_context.model_dump(by_alias=True, mode="json") # type: ignore[attr-defined] + except Exception: + params["includeContext"] = include_context
279-283
: Serialize requestedSchema for elicit()requestedSchema may be a Pydantic model. Convert to plain JSON-friendly primitives first.
- params: Dict[str, Any] = { - "message": message, - "requestedSchema": requestedSchema, - } + params: Dict[str, Any] = {"message": message} + try: + params["requestedSchema"] = requestedSchema.model_dump(by_alias=True, mode="json") # type: ignore[attr-defined] + except Exception: + params["requestedSchema"] = requestedSchema
🧹 Nitpick comments (6)
src/mcp_agent/executor/temporal/session_proxy.py (3)
171-185
: Normalize log payload (level, data) before forwarding
- Use enum.value when available for level.
- Ensure data is a dict to match proxy expectations.
- await self._executor.execute( + await self._executor.execute( act, exec_id, - str(level), + getattr(level, "value", str(level)), namespace or (logger or "mcp_agent"), - message or "", - (data or {}), + message or "", + (data if isinstance(data, dict) else {"message": str(data)}), )
190-194
: Normalize non-workflow log notify payloadMatch the normalization above for consistency and fewer surprises at the gateway.
- params: Dict[str, Any] = {"level": str(level), "data": data, "logger": logger} + level_str = getattr(level, "value", str(level)) + payload = data if isinstance(data, dict) else {"message": str(data)} + params: Dict[str, Any] = {"level": level_str, "data": payload, "logger": logger}
289-294
: Use public API to detect workflow runtimeAvoid relying on private _Runtime; use temporalio.workflow.in_workflow().
def _in_workflow_runtime() -> bool: """Return True if currently executing inside a Temporal workflow sandbox.""" - try: - return _twf._Runtime.current() is not None # type: ignore[attr-defined] - except Exception: - return False + try: + return _twf.in_workflow() + except Exception: + return Falsesrc/mcp_agent/executor/temporal/__init__.py (2)
526-527
: Add tracing interceptor to Worker when tracing is enabledParity with client setup; enables end-to-end spans at the worker.
- interceptors=[ContextPropagationInterceptor()], + interceptors=[TracingInterceptor(), ContextPropagationInterceptor()] + if running_app.context.tracing_enabled + else [ContextPropagationInterceptor()],
284-286
: Document workflow_memo parameter in start_workflow()Add to the docstring so callers know how to pass per-run gateway overrides.
Example docstring addition:
Args: ... workflow_memo (Optional[Dict[str, Any]]): Values to store in the workflow memo. Used to pass per-run configuration (e.g., gateway_url/token) that workers and activities can read.src/mcp_agent/executor/temporal/interceptor.py (1)
123-128
: Minor consistency: use super() consistently for client outbound callsOther methods call super(); use the same style here for uniformity.
- return await self.next.start_workflow_update(input) + return await super().start_workflow_update(input)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
src/mcp_agent/executor/temporal/__init__.py
(7 hunks)src/mcp_agent/executor/temporal/interceptor.py
(1 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(1 hunks)src/mcp_agent/executor/workflow.py
(5 hunks)src/mcp_agent/logging/logger.py
(6 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/mcp_agent/executor/workflow.py
- src/mcp_agent/logging/logger.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-05T14:31:48.115Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.115Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/executor/temporal/interceptor.py
src/mcp_agent/executor/temporal/__init__.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
src/mcp_agent/executor/temporal/__init__.py
🧬 Code graph analysis (3)
src/mcp_agent/executor/temporal/interceptor.py (2)
src/mcp_agent/executor/temporal/temporal_context.py (2)
get_execution_id
(15-49)set_execution_id
(10-12)src/mcp_agent/executor/temporal/__init__.py (1)
start_workflow
(277-392)
src/mcp_agent/executor/temporal/session_proxy.py (6)
src/mcp_agent/core/context.py (2)
mcp
(102-103)Context
(57-103)src/mcp_agent/executor/temporal/system_activities.py (3)
SystemActivities
(14-96)relay_notify
(67-80)relay_request
(83-96)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(15-49)src/mcp_agent/executor/task_registry.py (1)
get_activity
(25-28)src/mcp_agent/executor/temporal/__init__.py (1)
execute
(193-209)src/mcp_agent/logging/listeners.py (1)
send_log_message
(20-26)
src/mcp_agent/executor/temporal/__init__.py (3)
src/mcp_agent/app.py (4)
executor
(162-163)logger
(190-207)context
(146-151)workflow_task
(803-879)src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(58-93)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(14-96)forward_log
(18-38)request_user_input
(41-64)relay_notify
(67-80)relay_request
(83-96)
🔇 Additional comments (2)
src/mcp_agent/executor/temporal/__init__.py (1)
269-272
: Client interceptors wiring looks goodContext propagation is always enabled; tracing is conditionally added. LGTM.
src/mcp_agent/executor/temporal/interceptor.py (1)
39-56
: Good fix: restore previous execution_id after inbound handlingPrevents context bleed across tasks. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/mcp_agent/executor/temporal/workflow_signal.py (2)
1-6
: Add asyncio/time imports for non-Temporal fallback (see next comment)Required for the proposed fallback path in wait_for_signal.
Apply:
+import asyncio +import time from contextvars import ContextVar from datetime import timedelta from typing import Any, Callable, Optional, TYPE_CHECKING
94-127
: Fix test failure: provide non-Temporal fallback in wait_for_signalCI fails with RuntimeError from this guard when tests call wait_for_signal outside a workflow. Offer an asyncio-based fallback so unit tests (and activity/asyncio callers) can wait on the mailbox deterministically with a timeout.
Apply:
- if not workflow.in_workflow(): - raise RuntimeError("wait_for_signal must be called from within a workflow") + if not workflow.in_workflow(): + # Asyncio fallback for tests and non-workflow contexts + mailbox = self._mailbox_ref.get() + if mailbox is None: + raise RuntimeError( + "Signal mailbox not initialized for this context. Please call attach_to_workflow first." + ) + current_ver = ( + min_version if min_version is not None else mailbox.version(signal.name) + ) + deadline = ( + (time.monotonic() + float(timeout_seconds)) + if timeout_seconds is not None + else None + ) + # Poll with small sleep to avoid busy-wait + while True: + if mailbox.version(signal.name) > current_ver: + return mailbox.value(signal.name) + if deadline is not None and time.monotonic() >= deadline: + raise TimeoutError(f"Timeout waiting for signal {signal.name}") + await asyncio.sleep(0.05)
♻️ Duplicate comments (5)
src/mcp_agent/executor/workflow.py (1)
762-765
: Redact gateway_token in logs (credential leak)Debug log prints the token in cleartext. Please redact before logging.
Apply:
- self._logger.debug( - f"Proxy parameters: gateway_url={gateway_url}, gateway_token={gateway_token}" - ) + redacted = ( + (gateway_token[:4] + "…") if isinstance(gateway_token, str) and len(gateway_token) >= 4 else "(redacted)" + ) + self._logger.debug( + f"Proxy parameters: gateway_url={gateway_url}, gateway_token={redacted}" + )src/mcp_agent/app.py (3)
237-242
: Good: set a default bound context post-initializeEstablishing a safe default for loggers created after init is the right move and matches prior guidance to avoid unsafe globals.
851-871
: Fixed: avoid early return while still registering pre-decorated activitiesThe double-decoration guard now keeps already-decorated Temporal activities without skipping registration. This resolves the discoverability issue.
922-943
: Mirrored guard for global workflow tasksSame fix applied for globally registered tasks; prevents redecorating while ensuring registry consistency.
Also applies to: 945-949
src/mcp_agent/executor/temporal/session_proxy.py (1)
82-96
: Report activity execution failures correctly
TemporalExecutor.execute()
may return aBaseException
instance. Treat that as failure instead of returning True.- await self._executor.execute(act, exec_id, method, params or {}) - return True + res = await self._executor.execute(act, exec_id, method, params or {}) + return False if isinstance(res, BaseException) else True
🧹 Nitpick comments (7)
src/mcp_agent/executor/temporal/__init__.py (2)
125-133
: Be careful executing sync callables inside workflowsCalling sync functions directly in a workflow can introduce non-determinism if they touch time, randomness, I/O, env, etc. Consider wrapping truly non-deterministic portions with workflow.side_effect or move them into activities.
269-272
: ContextPropagationInterceptor wired on both client and workerGood—this ensures execution-id flow across boundaries. Optional: if tracing is enabled, consider also adding TracingInterceptor to Worker interceptors for symmetry.
Also applies to: 526-527
src/mcp_agent/executor/temporal/workflow_signal.py (1)
179-192
: Avoid private Temporal exception typeCatching workflow._NotInWorkflowEventLoopError relies on a private symbol. You can use an in_workflow() split to choose between external handle APIs without the private exception.
Apply:
- try: - # First try the in-workflow path - wf_handle = workflow.get_external_workflow_handle( - workflow_id=signal.workflow_id, run_id=signal.run_id - ) - except workflow._NotInWorkflowEventLoopError: - # We're on a worker thread / activity - if not self._executor: - raise RuntimeError("TemporalExecutor reference needed to emit signals") - await self._executor.ensure_client() - wf_handle = self._executor.client.get_workflow_handle( - workflow_id=signal.workflow_id, run_id=signal.run_id - ) + if workflow.in_workflow(): + wf_handle = workflow.get_external_workflow_handle( + workflow_id=signal.workflow_id, run_id=signal.run_id + ) + else: + if not self._executor: + raise RuntimeError("TemporalExecutor reference needed to emit signals") + await self._executor.ensure_client() + wf_handle = self._executor.client.get_workflow_handle( + workflow_id=signal.workflow_id, run_id=signal.run_id + )src/mcp_agent/app.py (1)
364-372
: Typo: “Updatre” → “Update”Minor comment spelling.
- # Updatre progress display before logging is shut down + # Update progress display before logging is shut downsrc/mcp_agent/logging/logger.py (1)
196-199
: Don’t drop logs on NotImplementedError; fall back to stderr transportIf
run_until_complete
isn’t supported (rare, non-Temporal), the event is lost. Fallback keeps logs visible.- except NotImplementedError: - pass + except NotImplementedError: + self.event_bus.emit_with_stderr_transport(event)src/mcp_agent/executor/temporal/session_proxy.py (2)
254-256
: Ensure include_context is JSON-serializablePydantic models should be dumped before sending across the gateway.
- if include_context is not None: - params["includeContext"] = include_context + if include_context is not None: + try: + params["includeContext"] = include_context.model_dump(by_alias=True, mode="json") + except Exception: + params["includeContext"] = include_context
279-283
: Serialize requestedSchema to primitivesSame rationale: gateway expects JSON-serializable payloads.
- params: Dict[str, Any] = { - "message": message, - "requestedSchema": requestedSchema, - } + params: Dict[str, Any] = {"message": message} + try: + params["requestedSchema"] = requestedSchema.model_dump(by_alias=True, mode="json") + except Exception: + params["requestedSchema"] = requestedSchema
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
src/mcp_agent/app.py
(5 hunks)src/mcp_agent/executor/temporal/__init__.py
(11 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(1 hunks)src/mcp_agent/executor/temporal/temporal_context.py
(1 hunks)src/mcp_agent/executor/temporal/workflow_signal.py
(2 hunks)src/mcp_agent/executor/workflow.py
(4 hunks)src/mcp_agent/logging/logger.py
(6 hunks)src/mcp_agent/mcp/client_proxy.py
(1 hunks)src/mcp_agent/tracing/token_counter.py
(1 hunks)src/mcp_agent/tracing/token_tracking_decorator.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/mcp_agent/executor/temporal/temporal_context.py
- src/mcp_agent/mcp/client_proxy.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-05T14:31:48.115Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.115Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/tracing/token_tracking_decorator.py
src/mcp_agent/tracing/token_counter.py
src/mcp_agent/executor/temporal/workflow_signal.py
src/mcp_agent/executor/temporal/__init__.py
src/mcp_agent/executor/temporal/session_proxy.py
src/mcp_agent/app.py
src/mcp_agent/executor/workflow.py
src/mcp_agent/logging/logger.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
src/mcp_agent/executor/temporal/__init__.py
src/mcp_agent/executor/workflow.py
🧬 Code graph analysis (5)
src/mcp_agent/executor/temporal/__init__.py (4)
src/mcp_agent/app.py (5)
executor
(162-163)workflow_signal
(454-502)workflow
(419-452)context
(146-151)workflow_task
(803-880)src/mcp_agent/executor/temporal/workflow_signal.py (1)
TemporalSignalHandler
(22-202)src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(58-93)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(14-96)forward_log
(18-38)request_user_input
(41-64)relay_notify
(67-80)relay_request
(83-96)
src/mcp_agent/executor/temporal/session_proxy.py (5)
src/mcp_agent/core/context.py (2)
mcp
(102-103)Context
(57-103)src/mcp_agent/executor/temporal/system_activities.py (3)
SystemActivities
(14-96)relay_notify
(67-80)relay_request
(83-96)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(15-49)src/mcp_agent/executor/task_registry.py (1)
get_activity
(25-28)src/mcp_agent/executor/temporal/__init__.py (1)
execute
(193-209)
src/mcp_agent/app.py (1)
src/mcp_agent/logging/logger.py (1)
set_default_bound_context
(546-548)
src/mcp_agent/executor/workflow.py (2)
src/mcp_agent/app.py (5)
executor
(162-163)workflow
(419-452)context
(146-151)upstream_session
(170-171)upstream_session
(174-175)src/mcp_agent/executor/temporal/session_proxy.py (1)
SessionProxy
(22-286)
src/mcp_agent/logging/logger.py (4)
src/mcp_agent/app.py (5)
workflow
(419-452)executor
(162-163)context
(146-151)logger
(190-207)session_id
(186-187)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(15-49)src/mcp_agent/logging/transport.py (3)
get
(309-316)emit_with_stderr_transport
(436-450)emit
(410-434)src/mcp_agent/executor/temporal/session_proxy.py (1)
send_log_message
(158-193)
🪛 GitHub Actions: Pull Request Checks
src/mcp_agent/executor/temporal/workflow_signal.py
[error] 95-95: Step 'make coverage' failed during pytest run. RuntimeError: wait_for_signal must be called from within a workflow (raised during test_wait_for_signal in tests/executor/temporal/test_signal_handler.py). Command: uv run coverage run -m pytest tests -m 'not integration'.
🔇 Additional comments (10)
src/mcp_agent/tracing/token_counter.py (1)
830-835
: Good switch to public Temporal APIUsing workflow.in_workflow() avoids private internals and matches our lazy-import guidance. The replay guard remains intact.
src/mcp_agent/tracing/token_tracking_decorator.py (1)
41-44
: in_workflow() check looks correct and minimalNice replacement of the private runtime probe; replay detection remains behind the Temporal gate.
src/mcp_agent/executor/temporal/__init__.py (3)
203-206
: Workflow-only guards are correctexecute/execute_many/execute_streaming now enforce workflow.in_workflow(). This prevents misuse from activities/asyncio paths.
Also applies to: 221-224, 238-242
284-285
: Memo pass-through addedworkflow_memo plumbing to TemporalClient.start_workflow looks correct and backward compatible.
Also applies to: 376-386
506-514
: Registering SystemActivities at worker startupForward-log, user-input, and relay activities are registered clearly; aligns with gateway proxying design.
src/mcp_agent/executor/workflow.py (2)
221-241
: Memo plumbing to Temporal start is correct__mcp_agent_workflow_memo extraction and pass-through to executor.start_workflow looks good.
783-801
: SessionProxy binding on initializeProviding a virtual upstream_session when absent is a solid default; also binding app logger context is helpful for log forwarding.
src/mcp_agent/app.py (1)
191-207
: Logger bound-context handling looks goodBinding/updating
_bound_context
on first access and later updates is correct and aligns with the new default-bound-context mechanism.src/mcp_agent/logging/logger.py (2)
76-90
: Nice: lazy Temporal detection and workflow-safe forwarding pathLazy-import + workflow forwarding via activity/notify avoids a hard dependency and preserves determinism.
Also applies to: 96-195
224-245
: Default-bound-context fallback is clean and thread-safeUsing
_default_bound_context
in event construction and exposingset_default_bound_context()
looks solid.Also applies to: 510-549
MemoryObjectReceiveStream, | ||
MemoryObjectSendStream, | ||
) | ||
from temporalio import workflow as _twf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid hard dependency on Temporal in module scope
Top-level from temporalio import workflow as _twf
makes this module import fail when Temporal isn’t installed. Lazy import inside _in_workflow_runtime()
(and anywhere else needed).
-from temporalio import workflow as _twf
@@
-def _in_workflow_runtime() -> bool:
+def _in_workflow_runtime() -> bool:
"""Return True if currently executing inside a Temporal workflow sandbox."""
try:
- return _twf.in_workflow()
+ from temporalio import workflow as _twf # lazy, optional
+ return _twf.in_workflow()
except Exception:
return False
Also applies to: 289-295
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/session_proxy.py around line 11 (and
similarly lines ~289-295), there's a top-level import "from temporalio import
workflow as _twf" which causes import failure if Temporal isn't installed;
change this to perform a lazy import inside the helper/function(s) that need it
(e.g., _in_workflow_runtime() and any other call sites), by importing
temporalio.workflow locally within those functions, handling ImportError
appropriately (return False or raise a clear error) so module import no longer
hard-depends on Temporal being present.
if _in_workflow_runtime(): | ||
act = self._context.task_registry.get_activity("mcp_relay_request") | ||
return await self._executor.execute(act, exec_id, method, params or {}) | ||
return await self._system_activities.relay_request( | ||
exec_id, method, params or {} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Request path: normalize executor errors to a response payload
When execute()
returns a BaseException
, return a structured error dict instead of passing the exception object through.
- act = self._context.task_registry.get_activity("mcp_relay_request")
- return await self._executor.execute(act, exec_id, method, params or {})
+ act = self._context.task_registry.get_activity("mcp_relay_request")
+ res = await self._executor.execute(act, exec_id, method, params or {})
+ if isinstance(res, BaseException):
+ return {"error": str(res)}
+ return res
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if _in_workflow_runtime(): | |
act = self._context.task_registry.get_activity("mcp_relay_request") | |
return await self._executor.execute(act, exec_id, method, params or {}) | |
return await self._system_activities.relay_request( | |
exec_id, method, params or {} | |
) | |
if _in_workflow_runtime(): | |
act = self._context.task_registry.get_activity("mcp_relay_request") | |
res = await self._executor.execute(act, exec_id, method, params or {}) | |
if isinstance(res, BaseException): | |
return {"error": str(res)} | |
return res | |
return await self._system_activities.relay_request( | |
exec_id, method, params or {} | |
) |
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/session_proxy.py around lines 108 to 113,
normalize executor errors returned from self._executor.execute: if execute(...)
returns an instance of BaseException, do not return the exception object
directly; instead convert it into a structured response dict (for example
include keys like error.type, error.message, and optionally error.trace or
error.args) and return that dict. Ensure non-exception successful returns are
returned unchanged and preserve existing call paths for
_system_activities.relay_request.
self.event_bus.emit_with_stderr_transport(event) | ||
|
||
try: | ||
_wf.create_task(_forward_via_proxy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The create_task()
call creates a fire-and-forget task without awaiting it or handling exceptions. If the task fails, the exception will be silently dropped, potentially causing lost log messages in Temporal workflows.
Consider either:
- Awaiting the task directly:
await _forward_via_proxy()
- Adding proper exception handling:
task = _wf.create_task(_forward_via_proxy())
task.add_done_callback(lambda t: t.exception()) # Prevents unobserved exceptions
This is particularly important in Temporal workflows where determinism is required and lost logs can be difficult to debug.
_wf.create_task(_forward_via_proxy()) | |
task = _wf.create_task(_forward_via_proxy()) | |
task.add_done_callback(lambda t: t.exception()) # Prevents unobserved exceptions |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tests/executor/temporal/test_signal_handler.py (2)
56-66
: Stubbing in_workflow is good; also reset the ContextVar and assert wait_condition awaited.Avoid leaking ContextVar state across tests and make the wait explicit.
@@ - # Patch the handler's ContextVar to point to the mock_workflow's mailbox - handler._mailbox_ref.set(mock_workflow._signal_mailbox) - signal = Signal(name="test_signal", payload="test_value") - mock_workflow._signal_mailbox.push(signal.name, signal.payload) - with patch("temporalio.workflow.wait_condition", AsyncMock()): - result = await handler.wait_for_signal(signal) - assert result == "test_value" + # Patch the handler's ContextVar to point to the mock_workflow's mailbox + token = handler._mailbox_ref.set(mock_workflow._signal_mailbox) + try: + signal = Signal(name="test_signal", payload="test_value") + mock_workflow._signal_mailbox.push(signal.name, signal.payload) + with patch( + "temporalio.workflow.wait_condition", + new=AsyncMock(return_value=None), + ) as wait_mock: + result = await handler.wait_for_signal(signal) + assert result == "test_value" + wait_mock.assert_awaited() + finally: + handler._mailbox_ref.reset(token)
69-96
: Make intent explicit: ensure external handle path isn’t touched when not in-workflow.Since in_workflow=False should bypass workflow.get_external_workflow_handle, assert it explicitly.
@@ async def test_signal_outside_workflow( mock_get_external, _mock_in_wf, handler, mock_executor ): @@ mock_executor.client.get_workflow_handle = MagicMock(return_value=mock_handle) await handler.signal(signal) + mock_get_external.assert_not_called() mock_executor.ensure_client.assert_awaited_once() mock_executor.client.get_workflow_handle.assert_called_once_with( workflow_id="workflow-id", run_id="run-id" ) mock_handle.signal.assert_awaited_once_with("test_signal", "test_value")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
tests/executor/temporal/test_execution_id_and_interceptor.py
(1 hunks)tests/executor/temporal/test_signal_handler.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/executor/temporal/test_execution_id_and_interceptor.py
handle = client.get_workflow_handle( | ||
workflow_id=info.get("workflow_id"), run_id=info.get("run_id") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a mismatch between the keys stored in the _PENDING_PROMPTS
dictionary and the keys being accessed. The code stores execution_id
in the dictionary:
_PENDING_PROMPTS[request_id] = {
"workflow_id": metadata.get("workflow_id"),
"execution_id": execution_id,
"signal_name": metadata.get("signal_name", "human_input"),
"session_id": metadata.get("session_id"),
}
But then attempts to access run_id
which doesn't exist:
handle = client.get_workflow_handle(
workflow_id=info.get("workflow_id"), run_id=info.get("run_id")
)
This will cause the workflow handle lookup to fail. The code should use info.get("execution_id")
instead of info.get("run_id")
to correctly retrieve the stored execution ID.
handle = client.get_workflow_handle( | |
workflow_id=info.get("workflow_id"), run_id=info.get("run_id") | |
) | |
handle = client.get_workflow_handle( | |
workflow_id=info.get("workflow_id"), run_id=info.get("execution_id") | |
) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
try: | ||
data["context"] = event.context.dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code calls event.context.dict()
which is a Pydantic v1 method. In Pydantic v2, this method was renamed to model_dump()
. This will cause an AttributeError
at runtime when using Pydantic v2.
Consider updating this line to:
data["context"] = event.context.model_dump()
Or make it version-compatible with a try/except:
try:
# Pydantic v2
data["context"] = event.context.model_dump()
except AttributeError:
# Pydantic v1 fallback
data["context"] = event.context.dict()
try: | |
data["context"] = event.context.dict() | |
try: | |
try: | |
# Pydantic v2 | |
data["context"] = event.context.model_dump() | |
except AttributeError: | |
# Pydantic v1 fallback | |
data["context"] = event.context.dict() |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/mcp_agent/logging/logger.py (1)
62-76
: Detect Temporal before touching asyncio loop to avoid non-determinism in workflows.Currently, if loop.is_running() returns True inside a workflow, asyncio.create_task() is used before the Temporal check, risking non-deterministic behavior. Detect in_workflow() up-front and short-circuit to the Temporal-safe path.
Apply:
def _emit_event(self, event: Event): """Emit an event by running it in the event loop.""" - loop = self._ensure_event_loop() - try: - is_running = loop.is_running() - except NotImplementedError: - # Handle Temporal workflow environment where is_running() is not implemented - # Default to assuming the loop is not running - is_running = False - - if is_running: - # If we're in a thread with a running loop, schedule the coroutine - asyncio.create_task(self.event_bus.emit(event)) - else: - # If no loop is running, run it until the emit completes - # Detect Temporal workflow runtime without hard dependency - # If inside Temporal workflow sandbox, avoid run_until_complete and use workflow-safe forwarding - in_temporal_workflow = False - try: - from temporalio import workflow as _wf # type: ignore - try: - in_temporal_workflow = bool(_wf.in_workflow()) - except Exception: - in_temporal_workflow = False - except Exception: - in_temporal_workflow = False + loop = self._ensure_event_loop() + # Detect Temporal workflow first to avoid any direct asyncio scheduling + in_temporal_workflow = False + try: + from temporalio import workflow as _wf # type: ignore + try: + in_temporal_workflow = bool(_wf.in_workflow()) + except Exception: + in_temporal_workflow = False + except Exception: + in_temporal_workflow = False + + if not in_temporal_workflow: + try: + is_running = loop.is_running() + except NotImplementedError: + is_running = False + + if is_running: + asyncio.create_task(self.event_bus.emit(event)) + return + # Non-Temporal, no running loop: safe to block + try: + loop.run_until_complete(self.event_bus.emit(event)) + except NotImplementedError: + # Fallback to stderr if loop forbids blocking + self.event_bus.emit_with_stderr_transport(event) + returnAlso applies to: 90-196
♻️ Duplicate comments (3)
src/mcp_agent/logging/logger.py (3)
11-11
: Good fix: timedelta for Temporal activity timeout.Using timedelta resolves the SDK type requirement for schedule_to_close_timeout.
184-189
: Observe background task failures in workflows.Unobserved exceptions in the created workflow task can be dropped. Add a done callback or wrap the coroutine to surface errors without breaking determinism.
- try: - _wf.create_task(_forward_via_proxy()) - return + try: + t = _wf.create_task(_forward_via_proxy()) + # Surface exceptions to workflow logs without raising + try: + t.add_done_callback(lambda tt: tt.exception()) + except Exception: + pass + return
197-200
: Don’t drop logs on NotImplementedError; fallback to stderr.If run_until_complete isn’t supported, we currently pass silently. Emit to stderr to avoid losing the event.
- except NotImplementedError: - pass + except NotImplementedError: + self.event_bus.emit_with_stderr_transport(event)
🧹 Nitpick comments (6)
src/mcp_agent/logging/logger.py (6)
153-178
: Consider explicit start_to_close timeout (and retry policy) for activity call.Only schedule_to_close_timeout is set. Adding start_to_close (and, if appropriate, a retry policy) avoids indefinite execution if an activity starts but hangs.
- await _wf.execute_activity( + from datetime import timedelta as _td + await _wf.execute_activity( "mcp_forward_log", exec_id, level, ns, msg, data, - schedule_to_close_timeout=timedelta(seconds=5), + schedule_to_close_timeout=_td(seconds=5), + start_to_close_timeout=_td(seconds=5), )
114-121
: Deduplicate level mapping.The same mapping from EventType to string appears twice. Hoist to a module-constant to reduce drift.
+LEVEL_MAP = { + "debug": "debug", + "info": "info", + "warning": "warning", + "error": "error", + "progress": "info", +} ... - level_map = { - "debug": "debug", - "info": "info", - "warning": "warning", - "error": "error", - "progress": "info", - } - level = level_map.get(event.type, "info") + level = LEVEL_MAP.get(event.type, "info") ... - level = { - "debug": "debug", - "info": "info", - "warning": "warning", - "error": "error", - "progress": "info", - }.get(event.type, "info") + level = LEVEL_MAP.get(event.type, "info")Also applies to: 157-164
264-314
: Type hints: make context Optional for public methods.Use EventContext | None for accuracy and consistency with event().
- def debug( + def debug( self, message: str, - name: str | None = None, - context: EventContext = None, + name: str | None = None, + context: EventContext | None = None, **data, ): ... - def info( + def info( self, message: str, - name: str | None = None, - context: EventContext = None, + name: str | None = None, + context: EventContext | None = None, **data, ): ... - def warning( + def warning( self, message: str, - name: str | None = None, - context: EventContext = None, + name: str | None = None, + context: EventContext | None = None, **data, ): ... - def error( + def error( self, message: str, - name: str | None = None, - context: EventContext = None, + name: str | None = None, + context: EventContext | None = None, **data, ): ... - def progress( + def progress( self, message: str, name: str | None = None, percentage: float = None, - context: EventContext = None, + context: EventContext | None = None, **data, ):
513-550
: Default-bound context API: solid addition; update get_logger docstring.The global + setter are straightforward. Minor nit: get_logger’s docstring says “context: Deprecated/ignored” but you still use it to bind; update the doc to reflect actual behavior.
def get_logger(namespace: str, session_id: str | None = None, context=None) -> Logger: @@ - context: Deprecated/ignored. Present for backwards compatibility. + context: Optional bound context (e.g., app/session object). If omitted, + falls back to the module-level default-bound context.
41-50
: Constructor typing: annotate bound_context for clarity.Add an explicit type for bound_context to align with downstream usage.
- def __init__( - self, namespace: str, session_id: str | None = None, bound_context=None - ): + def __init__( + self, namespace: str, session_id: str | None = None, bound_context: Any | None = None + ):
181-196
: Workflow fallback should avoid AsyncEventBus queueing.emit_with_stderr_transport prints and also initializes/queues via asyncio.create_task, which may be disallowed inside workflows. For the Temporal path’s last-resort fallback, print directly to stderr instead of calling into the bus.
- # As a last resort, log to stdout/stderr as a fallback - self.event_bus.emit_with_stderr_transport(event) + # As a last resort in workflows, print only (avoid asyncio tasks) + import sys as _sys + print(f"[{event.type}] {event.namespace}: {event.message}", file=_sys.stderr)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/mcp_agent/logging/logger.py
(6 hunks)src/mcp_agent/server/app_server.py
(19 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/mcp_agent/server/app_server.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-05T14:31:48.115Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.115Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/logging/logger.py
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/logging/logger.py
🧬 Code graph analysis (1)
src/mcp_agent/logging/logger.py (6)
src/mcp_agent/app.py (5)
workflow
(419-452)executor
(162-163)context
(146-151)logger
(190-207)session_id
(186-187)src/mcp_agent/executor/temporal/temporal_context.py (1)
get_execution_id
(15-49)src/mcp_agent/logging/transport.py (3)
get
(309-316)emit_with_stderr_transport
(436-450)emit
(410-434)src/mcp_agent/executor/temporal/session_proxy.py (1)
send_log_message
(158-193)src/mcp_agent/logging/listeners.py (1)
send_log_message
(20-26)tests/logging/test_upstream_logging.py (1)
send_log_message
(15-23)
🔇 Additional comments (3)
src/mcp_agent/logging/logger.py (3)
225-246
: Nice: safe default-bound-context fallback without self-imports.Cleanly prefers logger-bound context and then _default_bound_context; avoids brittle global lookups.
371-421
: Idempotent listener registration on reconfigure looks good.Re-checking for an existing MCPUpstreamLoggingListener before adding avoids duplication when configure() is called multiple times.
444-461
: Listener setup is robust.Instance-based duplication guard + try/except around optional import is a good balance between safety and flexibility.
Temporal workers run outside the original MCP request context; they need a reliable way to:
Overview of changes
Core idea is the introduction of a SessionProxy, which derives from
mcp.server.session.ServerSession
and for all intents and purposes appears as a server session inside a Temporal context, but in reality proxies requests through the MCPApp that actually has the upstream session to the MCP client.There are a lot of moving parts to get this to work, including:
Execution ID and routing
get_execution_id()
now derives from Temporal runtime:workflow.info().run_id
activity.info().workflow_run_id
ContextInterceptor
__execution_id
header from current context.Session proxying
SessionProxy
subclassesmcp.server.session.ServerSession
."mcp_forward_log"
activity (deterministic).rpc
facade provided forrequest/notify
parity.Server gateway endpoints on App server
/internal/workflows/log
,/internal/session/by-run/{run_id}/notify
,/internal/session/by-run/{run_id}/request
,/internal/human/prompts
.X-MCP-Gateway-Token
authentication.Logging improvements
upstream_session
automatically).get_current_context
fallback.Tests
Security
X-MCP-Gateway-Token
.To Test
uv run examples/mcp_agent_server/temporal/client.py --server-log-level debug
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests
Chores