-
Notifications
You must be signed in to change notification settings - Fork 768
Get elicitation and sampling to work with signal mailbox #514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds an asynchronous request flow across Temporal workflow, activities, gateway proxy, and app server: activities can dispatch async gateway requests with a generated signal name; workflows await a matching signal for the response. Introduces a new internal async-request route, centralized auth checks, helper routing, and updated signatures for relay and proxy functions. Synchronous flow remains. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant WF as Temporal Workflow (SessionProxy)
participant ACT as SystemActivities.relay_request
participant GW as App Server (internal)
participant CL as Connected Client
rect rgb(240,248,255)
note over WF: Async request path
WF->>ACT: execute(make_async_call=True, method, params)
ACT->>GW: async-request(workflow_id, execution_id, method, params, signal_name)
GW-->>ACT: 202 Accepted (ack)
ACT-->>WF: signal_name
par Client handling
GW->>CL: RPC async method/params
CL-->>GW: Result or error
GW-->>WF: Signal signal_name with payload
end
WF-->>WF: wait_for_signal(signal_name)
WF-->>WF: Resume with payload
end
sequenceDiagram
autonumber
participant CALLER as Non-workflow Caller
participant ACT as SystemActivities.relay_request
participant GW as App Server (internal)
participant CL as Connected Client
rect rgb(245,245,245)
note over CALLER: Synchronous request path (unchanged)
CALLER->>ACT: relay_request(make_async_call=False, method, params)
ACT->>GW: request(execution_id, method, params)
GW->>CL: RPC method/params
CL-->>GW: Result
GW-->>ACT: Result JSON
ACT-->>CALLER: Result JSON
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/mcp_agent/executor/temporal/system_activities.py (1)
92-133
: Don’t ignore enqueue failures in async path.If request_via_proxy returns {"error": ...}, the workflow will hang forever waiting for a signal that will never arrive. Fail fast.
if make_async_call: # Create a unique signal name for this request signal_name = f"mcp_rpc_{method}_{uuid.uuid4().hex}" - await request_via_proxy( + ack = await request_via_proxy( make_async_call=True, execution_id=execution_id, method=method, params=params or {}, signal_name=signal_name, gateway_url=gateway_url, gateway_token=gateway_token, ) + if isinstance(ack, dict) and ack.get("error"): + raise RuntimeError(f"async-request enqueue failed: {ack.get('error')}") return signal_name
🧹 Nitpick comments (6)
src/mcp_agent/server/app_server.py (3)
392-395
: Standardize unauthorized payload shape.Other internal endpoints use {"ok": False, "error": "..."}; keep it consistent to avoid client regressions.
Use the diff above which already aligns the payload.
604-636
: Session binding logic LGTM.Rebinding on success is a nice touch; small nit: consider logging when register_session=False for parity.
662-666
: Unify auth checks across all internal endpoints./internal/workflows/log and /internal/human/prompts still implement bespoke auth. Please switch them to _check_gateway_auth for consistency.
Apply:
@@ async def _internal_workflows_log(request: Request): - # Optional shared-secret auth - gw_token = os.environ.get("MCP_GATEWAY_TOKEN") - if gw_token: - bearer = request.headers.get("Authorization", "") - ... - return JSONResponse( - {"ok": False, "error": "unauthorized"}, status_code=401 - ) + auth_err = _check_gateway_auth(request) + if auth_err: + return auth_err @@ async def _internal_human_prompts(request: Request): - # Optional shared-secret auth - gw_token = os.environ.get("MCP_GATEWAY_TOKEN") - if gw_token: - bearer = request.headers.get("Authorization", "") - ... - return JSONResponse({"error": "unauthorized"}, status_code=401) + auth_err = _check_gateway_auth(request) + if auth_err: + return auth_errsrc/mcp_agent/mcp/client_proxy.py (2)
195-220
: Honor “no timeout” when MCP_GATEWAY_REQUEST_TIMEOUT <= 0.Currently only parse errors become no-timeout; 0 or negative values should too.
- timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT") - if timeout_str is None: - timeout = httpx.Timeout(None) - else: - try: - timeout = float(str(timeout_str).strip()) - except Exception: - timeout = httpx.Timeout(None) + timeout_str = os.environ.get("MCP_GATEWAY_REQUEST_TIMEOUT") + timeout_val = None + if timeout_str is not None: + try: + timeout_val = float(str(timeout_str).strip()) + except Exception: + timeout_val = None + timeout = httpx.Timeout(None) if (timeout_val is None or timeout_val <= 0) else timeout_valAlso consider aligning with MCP_GATEWAY_TIMEOUT naming used elsewhere, or documenting both.
221-252
: Same <=0 timeout handling for sync path.Mirror the logic to avoid accidental immediate timeouts.
- if timeout_str is None: - timeout_float = None # no timeout by default; activity timeouts still apply - else: - try: - timeout_float = float(str(timeout_str).strip()) - except Exception: - timeout_float = None + timeout_float = None + if timeout_str is not None: + try: + timeout_float = float(str(timeout_str).strip()) + except Exception: + timeout_float = None @@ - if timeout_float is None: + if timeout_float is None or timeout_float <= 0: timeout = httpx.Timeout(None) else: timeout = timeout_floatsrc/mcp_agent/executor/temporal/session_proxy.py (1)
118-123
: Remove duplicate import of workflow alias._module import exists at Line 12; drop the inner import.
- from temporalio import workflow as _twf # type: ignore
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/mcp_agent/executor/temporal/session_proxy.py
(1 hunks)src/mcp_agent/executor/temporal/system_activities.py
(2 hunks)src/mcp_agent/mcp/client_proxy.py
(2 hunks)src/mcp_agent/server/app_server.py
(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
src/mcp_agent/executor/temporal/system_activities.py (1)
src/mcp_agent/mcp/client_proxy.py (1)
request_via_proxy
(255-277)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/temporal/session_proxy.py (3)
request
(108-148)request
(347-350)send_request
(170-191)src/mcp_agent/app.py (3)
logger
(204-221)context
(160-165)executor
(176-177)src/mcp_agent/executor/temporal/workflow_signal.py (1)
signal
(146-195)
src/mcp_agent/executor/temporal/session_proxy.py (6)
src/mcp_agent/app.py (2)
workflow
(433-466)executor
(176-177)src/mcp_agent/executor/task_registry.py (1)
get_activity
(25-28)src/mcp_agent/executor/temporal/__init__.py (1)
execute
(212-228)src/mcp_agent/executor/workflow.py (1)
executor
(118-123)src/mcp_agent/executor/temporal/workflow_signal.py (1)
wait_for_signal
(71-127)src/mcp_agent/executor/temporal/system_activities.py (1)
relay_request
(93-132)
🔇 Additional comments (8)
src/mcp_agent/server/app_server.py (2)
535-550
: Helper looks good.Good RPC-first approach with clean logging; clear return semantics.
551-603
: Typed fallback mapping LGTM.Controlled allow‑list for typed methods is clear and safe.
src/mcp_agent/mcp/client_proxy.py (3)
151-168
: Wrapper and docstring LGTM.Clear async vs sync behavior and return type contract.
175-185
: Activity-context workflow_id inference is correct.Good guardrail for misuse outside activity.
254-278
: Backward-compat wrapper LGTM.Signature preservation for positional args is helpful.
src/mcp_agent/executor/temporal/session_proxy.py (2)
144-148
: Sync fallback call LGTM.Parameter order matches SystemActivities.relay_request(make_async_call=False, ...).
133-141
: Confirm executor.wait_for_signal signature accepts run_id and signal_description.Found async wait_for_signal implementations in src/mcp_agent/executor/executor.py (around lines ~186 and ~403) and the Signal object exposes .description (src/mcp_agent/executor/workflow_signal.py:225). Verify executor.wait_for_signal accepts a signal_description kwarg (or else construct/pass a Signal with description) — call site: src/mcp_agent/executor/temporal/session_proxy.py:135.
src/mcp_agent/executor/temporal/system_activities.py (1)
4-4
: uuid import LGTM.
signal_name = await self._executor.execute( | ||
act, | ||
True, # make_async_call | ||
exec_id, | ||
method, | ||
params or {}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate activity errors returned by executor.execute.
TemporalExecutor.execute may return a BaseException; handle it to avoid waiting on a bogus signal_name.
- signal_name = await self._executor.execute(
+ result = await self._executor.execute(
act,
True, # make_async_call
exec_id,
method,
params or {},
)
+ if isinstance(result, BaseException):
+ raise result
+ signal_name = result # expected to be str
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
signal_name = await self._executor.execute( | |
act, | |
True, # make_async_call | |
exec_id, | |
method, | |
params or {}, | |
) | |
result = await self._executor.execute( | |
act, | |
True, # make_async_call | |
exec_id, | |
method, | |
params or {}, | |
) | |
if isinstance(result, BaseException): | |
raise result | |
signal_name = result # expected to be str |
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/session_proxy.py around lines 125 to 131, the
value returned by self._executor.execute may be a BaseException and the code
currently treats it as a valid signal_name; detect when the result is an
exception and re-raise it (or otherwise propagate it) instead of proceeding to
await/use it as a signal name. Update the code to check isinstance(result,
BaseException) right after the await and raise the exception if so, ensuring
calling context observes the error rather than waiting on a bogus signal_name.
# Helper function for shared authentication across internal endpoints | ||
def _check_gateway_auth(request: Request) -> JSONResponse | None: | ||
gw_token = os.environ.get("MCP_GATEWAY_TOKEN") | ||
if not gw_token: | ||
return None | ||
bearer = request.headers.get("Authorization", "") | ||
bearer_token = ( | ||
bearer.split(" ", 1)[1] | ||
if bearer.lower().startswith("bearer ") | ||
else "" | ||
) | ||
header_tok = request.headers.get("X-MCP-Gateway-Token", "") | ||
if not ( | ||
secrets.compare_digest(header_tok, gw_token) | ||
or secrets.compare_digest(bearer_token, gw_token) | ||
): | ||
return JSONResponse({"error": "unauthorized"}, status_code=401) | ||
return None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make gateway auth mandatory (current logic leaves routes unauthenticated if MCP_GATEWAY_TOKEN is unset).
As written, internal endpoints are publicly callable whenever MCP_GATEWAY_TOKEN isn’t set. That’s a security risk for signal injection and request relaying.
Apply:
- def _check_gateway_auth(request: Request) -> JSONResponse | None:
+ def _check_gateway_auth(request: Request) -> JSONResponse | None:
gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
- if not gw_token:
- return None
+ if not gw_token:
+ # Require explicit opt-out (dev only) to run without auth
+ if os.environ.get("MCP_GATEWAY_AUTH_OPTIONAL", "").lower() not in ("1", "true", "yes"):
+ return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)
+ return None
@@
- if not (
+ if not (
secrets.compare_digest(header_tok, gw_token)
or secrets.compare_digest(bearer_token, gw_token)
):
- return JSONResponse({"error": "unauthorized"}, status_code=401)
+ return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401)
return None
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Helper function for shared authentication across internal endpoints | |
def _check_gateway_auth(request: Request) -> JSONResponse | None: | |
gw_token = os.environ.get("MCP_GATEWAY_TOKEN") | |
if not gw_token: | |
return None | |
bearer = request.headers.get("Authorization", "") | |
bearer_token = ( | |
bearer.split(" ", 1)[1] | |
if bearer.lower().startswith("bearer ") | |
else "" | |
) | |
header_tok = request.headers.get("X-MCP-Gateway-Token", "") | |
if not ( | |
secrets.compare_digest(header_tok, gw_token) | |
or secrets.compare_digest(bearer_token, gw_token) | |
): | |
return JSONResponse({"error": "unauthorized"}, status_code=401) | |
return None | |
# Helper function for shared authentication across internal endpoints | |
def _check_gateway_auth(request: Request) -> JSONResponse | None: | |
gw_token = os.environ.get("MCP_GATEWAY_TOKEN") | |
if not gw_token: | |
# Require explicit opt-out (dev only) to run without auth | |
if os.environ.get("MCP_GATEWAY_AUTH_OPTIONAL", "").lower() not in ("1", "true", "yes"): | |
return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401) | |
return None | |
bearer = request.headers.get("Authorization", "") | |
bearer_token = ( | |
bearer.split(" ", 1)[1] | |
if bearer.lower().startswith("bearer ") | |
else "" | |
) | |
header_tok = request.headers.get("X-MCP-Gateway-Token", "") | |
if not ( | |
secrets.compare_digest(header_tok, gw_token) | |
or secrets.compare_digest(bearer_token, gw_token) | |
): | |
return JSONResponse({"ok": False, "error": "unauthorized"}, status_code=401) | |
return None |
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 361 to 379, the helper
_check_gateway_auth currently treats missing MCP_GATEWAY_TOKEN as "no auth
required" which allows unauthenticated access; change the logic so that if
os.environ.get("MCP_GATEWAY_TOKEN") is falsy the function immediately returns a
401 JSONResponse (e.g., {"error": "unauthorized"}) instead of None, and
otherwise continue to validate the Authorization and X-MCP-Gateway-Token headers
using secrets.compare_digest; this ensures routes are rejected when the gateway
token is not configured.
@mcp_server.custom_route( | ||
"/internal/session/by-run/{workflow_id}/{execution_id}/async-request", | ||
methods=["POST"], | ||
include_in_schema=False, | ||
) | ||
async def _relay_async_request(request: Request): | ||
"""Start an async RPC to the connected client and signal the workflow with the result. | ||
Body: { method: str, params: dict, signal_name: str } | ||
Path: workflow_id, execution_id (run_id) | ||
""" | ||
body = await request.json() | ||
execution_id = request.path_params.get("execution_id") | ||
workflow_id = request.path_params.get("workflow_id") | ||
method = body.get("method") | ||
params = body.get("params") or {} | ||
signal_name = body.get("signal_name") | ||
|
||
# Auth | ||
auth_err = _check_gateway_auth(request) | ||
if auth_err: | ||
return auth_err | ||
|
||
if not signal_name: | ||
return JSONResponse({"error": "missing_signal_name"}, status_code=400) | ||
|
||
async def _do_async(): | ||
result: Dict[str, Any] | None = None | ||
error: str | None = None | ||
try: | ||
# Try latest session first | ||
latest_session = _get_fallback_upstream_session() | ||
if latest_session is not None: | ||
try: | ||
result = await _try_session_request( | ||
latest_session, | ||
method, | ||
params, | ||
execution_id, | ||
log_prefix="async-request", | ||
register_session=True, | ||
) | ||
except Exception as e_latest: | ||
try: | ||
logger.warning( | ||
f"[async-request] latest session failed for execution_id={execution_id} method={method}: {e_latest}" | ||
) | ||
except Exception: | ||
pass | ||
|
||
# Fallback to mapped session | ||
if result is None: | ||
session = await _get_session(execution_id) | ||
if not session: | ||
error = "session_not_available" | ||
else: | ||
try: | ||
result = await _try_session_request( | ||
session, | ||
method, | ||
params, | ||
execution_id, | ||
log_prefix="async-request", | ||
register_session=False, | ||
) | ||
except Exception as e_sess: | ||
error = str(e_sess) | ||
except Exception as e: | ||
error = str(e) | ||
|
||
# Signal the workflow with the result or error | ||
try: | ||
app = _get_attached_app(mcp_server) | ||
if app and app.context and getattr(app.context, "executor", None): | ||
executor = app.context.executor | ||
client = getattr(executor, "client", None) | ||
if client and workflow_id and execution_id: | ||
handle = client.get_workflow_handle( | ||
workflow_id=workflow_id, run_id=execution_id | ||
) | ||
payload = result if error is None else {"error": error} | ||
await handle.signal(signal_name, payload) | ||
except Exception as se: | ||
try: | ||
logger.error(f"[async-request] failed to signal workflow: {se}") | ||
except Exception: | ||
pass | ||
|
||
asyncio.create_task(_do_async()) | ||
return JSONResponse( | ||
{"status": "received", "execution_id": execution_id, "signal_name": signal_name} | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure Temporal client before signaling; validate signal names.
- executor.client may be None; call ensure_client() before get_workflow_handle to avoid runtime failures.
- Basic signal_name validation avoids accidental whitespace/newline issues.
async def _relay_async_request(request: Request):
@@
- async def _do_async():
+ async def _do_async():
result: Dict[str, Any] | None = None
error: str | None = None
@@
- # Signal the workflow with the result or error
+ # Signal the workflow with the result or error
try:
app = _get_attached_app(mcp_server)
if app and app.context and getattr(app.context, "executor", None):
executor = app.context.executor
- client = getattr(executor, "client", None)
+ await executor.ensure_client()
+ client = getattr(executor, "client", None)
if client and workflow_id and execution_id:
handle = client.get_workflow_handle(
workflow_id=workflow_id, run_id=execution_id
)
payload = result if error is None else {"error": error}
await handle.signal(signal_name, payload)
@@
- asyncio.create_task(_do_async())
+ # Validate signal name quickly
+ if not isinstance(signal_name, str) or not signal_name.strip() or any(c in signal_name for c in ("\r", "\n")):
+ return JSONResponse({"error": "invalid_signal_name"}, status_code=400)
+ asyncio.create_task(_do_async())
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@mcp_server.custom_route( | |
"/internal/session/by-run/{workflow_id}/{execution_id}/async-request", | |
methods=["POST"], | |
include_in_schema=False, | |
) | |
async def _relay_async_request(request: Request): | |
"""Start an async RPC to the connected client and signal the workflow with the result. | |
Body: { method: str, params: dict, signal_name: str } | |
Path: workflow_id, execution_id (run_id) | |
""" | |
body = await request.json() | |
execution_id = request.path_params.get("execution_id") | |
workflow_id = request.path_params.get("workflow_id") | |
method = body.get("method") | |
params = body.get("params") or {} | |
signal_name = body.get("signal_name") | |
# Auth | |
auth_err = _check_gateway_auth(request) | |
if auth_err: | |
return auth_err | |
if not signal_name: | |
return JSONResponse({"error": "missing_signal_name"}, status_code=400) | |
async def _do_async(): | |
result: Dict[str, Any] | None = None | |
error: str | None = None | |
try: | |
# Try latest session first | |
latest_session = _get_fallback_upstream_session() | |
if latest_session is not None: | |
try: | |
result = await _try_session_request( | |
latest_session, | |
method, | |
params, | |
execution_id, | |
log_prefix="async-request", | |
register_session=True, | |
) | |
except Exception as e_latest: | |
try: | |
logger.warning( | |
f"[async-request] latest session failed for execution_id={execution_id} method={method}: {e_latest}" | |
) | |
except Exception: | |
pass | |
# Fallback to mapped session | |
if result is None: | |
session = await _get_session(execution_id) | |
if not session: | |
error = "session_not_available" | |
else: | |
try: | |
result = await _try_session_request( | |
session, | |
method, | |
params, | |
execution_id, | |
log_prefix="async-request", | |
register_session=False, | |
) | |
except Exception as e_sess: | |
error = str(e_sess) | |
except Exception as e: | |
error = str(e) | |
# Signal the workflow with the result or error | |
try: | |
app = _get_attached_app(mcp_server) | |
if app and app.context and getattr(app.context, "executor", None): | |
executor = app.context.executor | |
client = getattr(executor, "client", None) | |
if client and workflow_id and execution_id: | |
handle = client.get_workflow_handle( | |
workflow_id=workflow_id, run_id=execution_id | |
) | |
payload = result if error is None else {"error": error} | |
await handle.signal(signal_name, payload) | |
except Exception as se: | |
try: | |
logger.error(f"[async-request] failed to signal workflow: {se}") | |
except Exception: | |
pass | |
asyncio.create_task(_do_async()) | |
return JSONResponse( | |
{"status": "received", "execution_id": execution_id, "signal_name": signal_name} | |
) | |
@mcp_server.custom_route( | |
"/internal/session/by-run/{workflow_id}/{execution_id}/async-request", | |
methods=["POST"], | |
include_in_schema=False, | |
) | |
async def _relay_async_request(request: Request): | |
"""Start an async RPC to the connected client and signal the workflow with the result. | |
Body: { method: str, params: dict, signal_name: str } | |
Path: workflow_id, execution_id (run_id) | |
""" | |
body = await request.json() | |
execution_id = request.path_params.get("execution_id") | |
workflow_id = request.path_params.get("workflow_id") | |
method = body.get("method") | |
params = body.get("params") or {} | |
signal_name = body.get("signal_name") | |
# Auth | |
auth_err = _check_gateway_auth(request) | |
if auth_err: | |
return auth_err | |
if not signal_name: | |
return JSONResponse({"error": "missing_signal_name"}, status_code=400) | |
async def _do_async(): | |
result: Dict[str, Any] | None = None | |
error: str | None = None | |
try: | |
# Try latest session first | |
latest_session = _get_fallback_upstream_session() | |
if latest_session is not None: | |
try: | |
result = await _try_session_request( | |
latest_session, | |
method, | |
params, | |
execution_id, | |
log_prefix="async-request", | |
register_session=True, | |
) | |
except Exception as e_latest: | |
try: | |
logger.warning( | |
f"[async-request] latest session failed for execution_id={execution_id} method={method}: {e_latest}" | |
) | |
except Exception: | |
pass | |
# Fallback to mapped session | |
if result is None: | |
session = await _get_session(execution_id) | |
if not session: | |
error = "session_not_available" | |
else: | |
try: | |
result = await _try_session_request( | |
session, | |
method, | |
params, | |
execution_id, | |
log_prefix="async-request", | |
register_session=False, | |
) | |
except Exception as e_sess: | |
error = str(e_sess) | |
except Exception as e: | |
error = str(e) | |
# Signal the workflow with the result or error | |
try: | |
app = _get_attached_app(mcp_server) | |
if app and app.context and getattr(app.context, "executor", None): | |
executor = app.context.executor | |
await executor.ensure_client() | |
client = getattr(executor, "client", None) | |
if client and workflow_id and execution_id: | |
handle = client.get_workflow_handle( | |
workflow_id=workflow_id, run_id=execution_id | |
) | |
payload = result if error is None else {"error": error} | |
await handle.signal(signal_name, payload) | |
except Exception as se: | |
try: | |
logger.error(f"[async-request] failed to signal workflow: {se}") | |
except Exception: | |
pass | |
# Validate signal name quickly | |
if not isinstance(signal_name, str) or not signal_name.strip() or any(c in signal_name for c in ("\r", "\n")): | |
return JSONResponse({"error": "invalid_signal_name"}, status_code=400) | |
asyncio.create_task(_do_async()) | |
return JSONResponse( | |
{"status": "received", "execution_id": execution_id, "signal_name": signal_name} | |
) |
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 855 to 947, the code may call
executor.client.get_workflow_handle when executor.client is None and passes
unvalidated signal_name; update the signaling block to (1) validate signal_name
by trimming and ensuring it's non-empty and contains no newlines/only allowed
characters before proceeding, and (2) call executor.ensure_client() (or
equivalent) and re-check client existence before calling get_workflow_handle; if
ensure_client() raises or client is still None, log and skip signaling
gracefully to avoid runtime failures.
@saqadri was this intentional to close? It got my hopes up that this might fix the resume signal issue noticed in the mcp_agent_server temporal example |
Haha, @rholinshead this is just some work I was collaborating with @roman-van-der-krogt on — his PRs, #507 and #502 have applied these changes. I still haven’t looked at the resume issue |
Ah, ok -- was thinking that the proxying might fix it as well since it seems the mailbox version isn't getting bumped (at least for the mailbox instance that the run is waiting on). I can retest once it gets merged & deployed just to see |
@roman-van-der-krogt did you try out the PauseResumeWorkflow in the temporal example during your testing? Probably can keep those separate issues, but I think Ryan found that the resume signal wasn't working. |
I did not, but I can have a look at it after cleaning up #507 |
Summary
workflows can truly pause and resume later.
deterministic state (SignalMailbox) and per-request signal names.
backward compatible.
Problems Identified
workflow context is not persisted/replayed, and breaks determinism.
deterministic; awaiting arbitrary async calls is risky.
replayable.
timeouts/retries; we want async server->client flow that pauses the workflow.
What Changed
payload.
then signals the workflow with the result.
signature and defaults to synchronous behavior, unless make_async_call=True
is passed.
Code Pointers (What to Improve / Review)
mcp_relay_request
with
make_async_call=True
, getssignal_name
, then waits viacontext.executor.wait_for_signal(signal_name, workflow_id, run_id)
.result.
method, params).
mcp_rpc_{method}_{uuid}
), call the async server endpoint, returnsignal_name
./_request_via_proxy_impl
with full kwargs.request_via_proxy(*args, **kwargs)
wrapper preserves positionalcalling (
request_via_proxy("run", "m", {})
) to keep tests working./internal/session/by-run/{workflow_id}/{execution_id}/ async-request
(determinesworkflow_id
from Temporal activity context),returns immediately (None), signal carries the result later.
/internal/session/by-run/{execution_id}/ request
), returns JSON result.POST /internal/session/by-run/{workflow_id}/ {execution_id}/async-request
:(prefers latest > mapped), then
signal
the workflow withsignal_name
andthe result payload.
_check_gateway_auth
,_handle_request_via_rpc
,_handle_specific_request
,_try_session_request
(used by both sync/asyncroutes).
/request
and/notify
endpoints preserved and refactoredfor clarity.
and SignalMailbox for deterministic signal handling (no awaits, no globals).
Recommendations / TODOs
workflows. Use the workflow’s SignalMailbox.
values into the mailbox; wait_condition observes mailbox version changes
(deterministic).
like mcp_rpc__.
async behavior when actually inside a Temporal workflow.
_check_gateway_auth usage).
via signal.
Compatibility Notes
request_via_proxy(..., make_async_call=True, signal_name=...) from activity
context.
detects workflow runtime; no changes needed at call sites in workflows.
Why This Is Temporal-Compliant
by Temporal.
(mailbox version), not module-level globals.
deterministically.
Potential Follow-ups
handle) and per-request signal correlation.
roll out gradually.
If you want, I can put this into a branch and push a minimal PR body with
these bullets and references to the updated functions so your teammate has
clear pointers.
Summary by CodeRabbit
New Features
Refactor