Skip to content

Conversation

rholinshead
Copy link
Member

@rholinshead rholinshead commented Sep 8, 2025

Summary by CodeRabbit

  • New Features

    • Workflow actions now accept either run_id or workflow_id; when only workflow_id is provided, operations target the latest run.
    • Unified status endpoint supports lookups by run_id or workflow_id.
    • In-memory registry can list all workflows.
  • Refactor

    • Simplified public tools: removed per-workflow get_status; use the unified status tool instead.
    • IDs are used consistently across run, status, resume, and cancel flows.
  • Documentation

    • Updated help and docs to describe optional IDs and “latest run” behavior.
  • Tests

    • Expanded coverage for ID-based lookups and latest-run semantics.
  • Chores

    • Example configs use environment-resolved binaries.

Testing

make lint
make format
make tests
  • Ran all examples in examples/mcp_agent_server/asyncio and examples/mcp_agent_server/temporal, including running and connecting to sse server in mcp inspector to manually call workflow tools with run_id and with workflow_id, verifying they work with each ID
  • Ran all examples in examples/temporal, a couple needing fixing. Note router.py example is failing with NotFoundError on main and here

Copy link

coderabbitai bot commented Sep 8, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Across registries and server endpoints, run_id and workflow_id parameters are made optional with “latest run” resolution when only workflow_id is given. Temporal and InMemory implementations, core interfaces, and server tools are updated accordingly. Tests reflect new semantics; per-workflow get_status tools are removed. Examples and minor CLI formatting are adjusted.

Changes

Cohort / File(s) Summary
Core registry interface
src/mcp_agent/executor/workflow_registry.py
Public methods now accept optional run_id/workflow_id; lookups resolve latest run by workflow_id; improved logging; list_workflows added; minor registration/unregistration adjustments.
Temporal registry implementation
src/mcp_agent/executor/temporal/workflow_registry.py
Methods accept optional run_id/workflow_id; derive/validate identifiers; support latest-run resolution; merge local and Temporal status; updated register/unregister mapping behavior.
InMemory registry tests
tests/executor/test_inmemory_workflow_registry.py
New suite covering registration, latest-run retrieval by workflow_id, resume/cancel/status by either ID, error cases, listing, and unregister semantics.
Temporal registry tests
tests/executor/temporal/test_workflow_registry.py
Updated calls to keyword args; new cases for workflow_id-based operations, latest-run selection, missing/bad IDs, and status retrieval paths.
Server app endpoints and flow
src/mcp_agent/server/app_server.py
get_workflow_status/resume/cancel accept optional run_id/workflow_id; internal resolution updated; per-workflow get_status removed; run tool returns IDs; wait/status helpers accept workflow_id.
Server tests
tests/server/test_app_server.py, tests/server/test_tool_decorators.py
Expectations updated to remove per-workflow get_status; status polling switched to registry; tool counts reflect only run tools; comments/assertions adjusted.
Examples
examples/temporal/router.py, examples/mcp_agent_server/asyncio/mcp_agent.config.yaml
Router workflow run signature simplified (no input); LLM injection via llm_factory; start call updated; config uses generic binaries and simplified args.
Workflow rating enum
src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py
QualityRating base changed from str to int; numeric values unchanged.
CLI/client formatting
src/mcp_agent/cli/cloud/commands/workflows/runs/main.py, src/mcp_agent/cli/mcp_app/api_client.py
Whitespace/formatting only; no functional changes.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    actor Client
    participant Server as app_server
    participant Registry as WorkflowRegistry
    participant Backend as Backend Impl<br/>(InMemory/Temporal)

    Client->>Server: get_workflow_status(run_id?, workflow_id?)
    Server->>Registry: get_workflow(run_id?, workflow_id?)
    alt run_id provided
        Registry-->>Server: workflow (by run_id)
    else workflow_id only
        Registry->>Registry: resolve latest run_id for workflow_id
        Registry-->>Server: workflow (latest)
    else neither provided
        Registry-->>Server: ValueError / None
    end
    Server->>Backend: fetch status using resolved IDs
    Backend-->>Server: status
    Server-->>Client: merged status
    Note over Server,Registry: Same resolution used for resume/cancel
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • petersonbill64
  • roman-van-der-krogt
  • saqadri

Poem

I hop through runs where IDs glide,
If run’s unknown, by flow I ride.
Latest tracks I sniff and find,
Cancel, resume—both now aligned.
Tools pared down, the path made clear—
Workflows whisper, “Status here.”
Thump-thump—ship it, carrots near! 🥕🐇


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 979c064 and 1384124.

📒 Files selected for processing (5)
  • src/mcp_agent/cli/cloud/commands/workflows/runs/main.py (6 hunks)
  • src/mcp_agent/cli/mcp_app/api_client.py (0 hunks)
  • src/mcp_agent/executor/temporal/workflow_registry.py (5 hunks)
  • src/mcp_agent/server/app_server.py (13 hunks)
  • tests/executor/temporal/test_workflow_registry.py (6 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/workflow-tools

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
src/mcp_agent/cli/cloud/main.py (1)

167-169: Bug: duplicate "apps" subcommand collides with existing apps group.

You register an apps group at Line 123 and again here as an alias for servers. This creates a name collision/override in Typer, breaking one of the groups.

Apply this diff to remove the conflicting alias:

-# Alias for servers - apps should behave identically
-app.add_typer(app_cmd_servers, name="apps", help="Manage MCP Apps (alias for servers)")
src/mcp_agent/executor/temporal/workflow_registry.py (2)

34-47: Validate identifiers on register; fix workflow_id derivation in unregister.

  • register() writes None keys into maps if run_id/workflow_id aren’t provided; later lookups will fail.
  • unregister() uses workflow.name; prefer workflow.id (and fall back to name) for consistency with other code.

Apply this diff:

@@
     async def register(
         self,
         workflow: "Workflow",
         run_id: str | None = None,
         workflow_id: str | None = None,
         task: Optional["asyncio.Task"] = None,
     ) -> None:
-        self._local_workflows[run_id] = workflow
-
-        # Add run_id to the list for this workflow_id
-        if workflow_id not in self._workflow_ids:
-            self._workflow_ids[workflow_id] = []
-        self._workflow_ids[workflow_id].append(run_id)
+        # Derive IDs if not provided
+        if run_id is None:
+            run_id = getattr(workflow, "run_id", None)
+            if callable(run_id):
+                run_id = run_id()
+        if workflow_id is None:
+            workflow_id = getattr(workflow, "id", None) or getattr(workflow, "name", None)
+            if callable(workflow_id):
+                workflow_id = workflow_id()
+        if not run_id or not workflow_id:
+            raise ValueError("Both run_id and workflow_id must be provided or derivable from workflow.")
+
+        self._local_workflows[run_id] = workflow
+        # Track latest run per workflow_id
+        self._workflow_ids.setdefault(workflow_id, []).append(run_id)
@@
-            workflow = self._local_workflows[run_id]
-            workflow_id = workflow.name if workflow_id is None else workflow_id
+            workflow = self._local_workflows[run_id]
+            if workflow_id is None:
+                workflow_id = getattr(workflow, "id", None) or getattr(workflow, "name", None)

Also applies to: 48-53


172-201: Return type mismatch: get_workflow_status() returns False but is annotated Optional[Dict].

Return None on error, not False, to match the signature and InMemory behavior.

Apply this diff:

@@
-        if not (run_id or workflow_id):
-            raise ValueError("Either run_id or workflow_id must be provided.")
+        if not (run_id or workflow_id):
+            raise ValueError("Either run_id or workflow_id must be provided.")
@@
-        if not workflow_id:
+        if not workflow_id:
             logger.error(
                 f"Cannot get status: workflow_id not found for run_id {run_id or 'unknown'}"
             )
-            return False
+            return None
@@
-        if not run_id:
+        if not run_id:
             # Get the run_id from the workflow_ids dict if we have a workflow_id
             run_ids = self._workflow_ids.get(workflow_id, [])
             if run_ids:
                 run_id = run_ids[-1]  # Use the latest run
@@
-        if not run_id:
+        if not run_id:
             logger.error(
                 f"Cannot get status: run_id not found for workflow_id {workflow_id}"
             )
-            return False
+            return None
src/mcp_agent/server/app_server.py (5)

774-836: resume_workflow never returns; also use registry resolver and widen payload type.

  • Function returns None today despite -> bool.
  • Using ctx.request_context.lifespan_context breaks when running with externally provided FastMCP; use _resolve_workflow_registry(ctx).
  • payload should be Any | None to align with registries and allow dict payloads.

Apply:

@@
-    async def resume_workflow(
-        ctx: MCPContext,
-        run_id: str | None = None,
-        workflow_id: str | None = None,
-        signal_name: str | None = "resume",
-        payload: str | None = None,
-    ) -> bool:
+    async def resume_workflow(
+        ctx: MCPContext,
+        run_id: str | None = None,
+        workflow_id: str | None = None,
+        signal_name: str | None = "resume",
+        payload: Any | None = None,
+    ) -> bool:
@@
-        server_context: ServerContext = ctx.request_context.lifespan_context
-        workflow_registry = server_context.workflow_registry
+        workflow_registry = _resolve_workflow_registry(ctx)
@@
         else:
             logger.error(
                 f"Failed to signal workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' and payload '{payload}'"
             )
+        return result

839-887: cancel_workflow never returns; also use registry resolver.

Same issues as resume_workflow: returns None today and couples to lifespan_context.

Apply:

@@
-    async def cancel_workflow(
-        ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None
-    ) -> bool:
+    async def cancel_workflow(
+        ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None
+    ) -> bool:
@@
-        server_context: ServerContext = ctx.request_context.lifespan_context
-        workflow_registry = server_context.workflow_registry
+        workflow_registry = _resolve_workflow_registry(ctx)
@@
         else:
             logger.error(
                 f"Failed to cancel workflow {workflow_id or 'unknown'} with ID {run_id or 'unknown'}"
             )
+        return result

1272-1283: Broken call: _workflow_status no longer accepts workflow_name kwarg.

This will raise TypeError at runtime. Pass workflow_id instead.

Apply:

-    async def get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
-        _set_upstream_from_request_ctx_if_available(ctx)
-        return await _workflow_status(ctx, run_id=run_id, workflow_name=workflow_name)
+    async def get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
+        _set_upstream_from_request_ctx_if_available(ctx)
+        return await _workflow_status(ctx, run_id=run_id, workflow_id=workflow_name)

1393-1404: Gateway URL reconstruction can capture only a scheme (“http”/“https”).

Including X-Forwarded-Proto in the OR chain prevents proper host reconstruction, yielding gateway_url="https" when only proto is present.

Apply:

-                        gateway_url = (
-                            h.get("X-MCP-Gateway-URL")
-                            or h.get("X-Forwarded-Url")
-                            or h.get("X-Forwarded-Proto")
-                        )
-                        # Best-effort reconstruction if only proto/host provided
-                        if gateway_url is None:
-                            proto = h.get("X-Forwarded-Proto") or "http"
-                            host = h.get("X-Forwarded-Host") or h.get("Host")
-                            if host:
-                                gateway_url = f"{proto}://{host}"
+                        gateway_url = (
+                            h.get("X-MCP-Gateway-URL") or h.get("X-Forwarded-Url")
+                        )
+                        # Best-effort reconstruction from proto/host if needed
+                        if not gateway_url:
+                            proto = h.get("X-Forwarded-Proto") or "http"
+                            host = h.get("X-Forwarded-Host") or h.get("Host")
+                            if host:
+                                gateway_url = f"{proto}://{host}"

401-435: Missing shared-secret auth on /internal/session/by-run/{execution_id}/request.

Other internal routes gate on MCP_GATEWAY_TOKEN; this one does not. Anyone who guesses an execution_id could drive client RPCs.

Apply:

         async def _relay_request(request: Request):
@@
             body = await request.json()
             execution_id = request.path_params.get("execution_id")
             method = body.get("method")
             params = body.get("params") or {}
 
+            # Optional shared-secret auth
+            gw_token = os.environ.get("MCP_GATEWAY_TOKEN")
+            if gw_token and not secrets.compare_digest(
+                request.headers.get("X-MCP-Gateway-Token", ""), gw_token
+            ):
+                return JSONResponse({"error": "unauthorized"}, status_code=401)
+
             session = await _get_session(execution_id)
🧹 Nitpick comments (11)
src/mcp_agent/executor/temporal/workflow_registry.py (1)

20-33: Optional: add an asyncio.Lock to guard local maps.

Temporal registry mutates _local_workflows/_workflow_ids without a lock; in mixed-task contexts, minimal locking like InMemory improves consistency.

Apply this diff:

@@
 class TemporalWorkflowRegistry(WorkflowRegistry):
@@
     def __init__(self, executor: "TemporalExecutor"):
         super().__init__()
         self._executor = executor
         # We still keep a local cache for fast lookups, but the source of truth is Temporal
         self._local_workflows: Dict[str, "Workflow"] = {}  # run_id -> workflow
         self._workflow_ids: Dict[str, List[str]] = {}  # workflow_id -> list of run_ids
+        self._lock = asyncio.Lock()

And wrap register/unregister mutations with async with self._lock:.

src/mcp_agent/executor/workflow_registry.py (2)

60-69: Doc nits: align wording with behavior (status vs cancel).

  • get_workflow(): docs updated—looks good.
  • cancel_workflow(): docs OK.
  • get_workflow_status(): “workflow_id: The ID of the workflow to cancel” should say “to query.”

Apply this diff:

@@
-            workflow_id: The ID of the workflow to cancel
+            workflow_id: The ID of the workflow to query

Also applies to: 98-101, 114-117


224-239: Consistent, minimal implementations for resume/cancel/status/list; consider small ergonomics.

  • Behavior is consistent and concise.
  • Minor: optionally factor shared ID-resolution into a helper to avoid duplication across these methods and the Temporal registry.

Also applies to: 240-251, 252-263, 266-266

src/mcp_agent/server/app_server.py (2)

1046-1050: Optional: pass workflow_id to _wait_for_completion for faster resolution.

You already have it in result_ids; passing it can avoid an extra registry lookup hop.

-                    result = await _wait_for_completion(ctx, run_id)
+                    result = await _wait_for_completion(ctx, run_id, workflow_id=result_ids.get("workflow_id"))

1469-1491: Status fallback may return non-dict (False/None).

Downstream you handle via try/except, but returning a consistent dict (or raising ToolError) would simplify callers and types.

Consider normalizing non-success paths to a dict like {"status": "ERROR", "error": "..."} rather than False/None.

tests/executor/temporal/test_workflow_registry.py (2)

101-105: Test intent/comment mismatch.

This test passes a run_id="nonexistent", so the failure is due to unknown run_id, not “workflow_id missing”.

-# Should return error status if workflow_id is missing
+# Should return False when the run_id does not exist

146-174: Consider adding latest-run tests for resume/cancel by workflow_id.

You test get_workflow(workflow_id) latest-run, but not that resume/cancel pick the latest run when multiple exist.

Would you like me to add parametrized tests that register two runs and assert get_workflow_handle is called with the last run_id?

tests/executor/test_inmemory_workflow_registry.py (1)

33-46: Optional: add latest-run behavior tests for resume/cancel by workflow_id.

Mirror Temporal tests to ensure the in-memory registry chooses the newest run for control ops too.

I can add tests that register two runs under the same workflow_id, call resume/cancel with only workflow_id, and assert the last run’s workflow gets the call.

LLMS.txt (3)

2664-2705: Specify precedence and error semantics; adopt keyword-only parameters

Base registry docs mirror Temporal changes but still leave gaps:

  • Precedence when both IDs are given.

  • Error type when neither is provided (raise ValueError vs. return None).

  • Return type for resume/cancel: bool hides failure causes.

  • Document: if both provided, prefer run_id; raise ValueError on mismatch.

  • Raise ValueError when both IDs are None.

  • Consider returning a Result object with reason (not_found, invalid_state, signaled), or log a structured error alongside False.

Also update signatures to keyword-only:

- def get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:
+ def get_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:

- def resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:
+ def resume_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:

- def cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ def cancel_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:

- def get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
+ def get_workflow_status(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:

2731-2741: In-memory “latest” selection and async-safety

For InMemoryWorkflowRegistry, define “latest” consistently with Temporal (e.g., by workflow.state.updated_at; prefer active). Guard internal maps with an asyncio.Lock to avoid races when listing/looking up while registering/unregistering.


2734-2738: Return richer errors for resume/cancel

Returning bool loses context (not found vs. invalid state). Either raise a specific exception or return a small dict: {ok: bool, reason: 'not_found'|'invalid_state'}.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9ca52f2 and b4c1e52.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (7)
  • LLMS.txt (4 hunks)
  • src/mcp_agent/cli/cloud/main.py (1 hunks)
  • src/mcp_agent/executor/temporal/workflow_registry.py (3 hunks)
  • src/mcp_agent/executor/workflow_registry.py (5 hunks)
  • src/mcp_agent/server/app_server.py (12 hunks)
  • tests/executor/temporal/test_workflow_registry.py (5 hunks)
  • tests/executor/test_inmemory_workflow_registry.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
tests/executor/temporal/test_workflow_registry.py (2)
src/mcp_agent/executor/temporal/workflow_registry.py (6)
  • get_workflow (63-74)
  • cancel_workflow (128-170)
  • register (34-46)
  • resume_workflow (76-126)
  • _get_temporal_workflow_status (244-301)
  • get_workflow_status (172-215)
tests/executor/temporal/test_signal_handler.py (2)
  • mock_workflow (41-44)
  • mock_executor (31-32)
src/mcp_agent/executor/temporal/workflow_registry.py (3)
src/mcp_agent/executor/workflow.py (2)
  • run_id (133-138)
  • id (126-130)
src/mcp_agent/server/app_server.py (1)
  • resume_workflow (775-835)
src/mcp_agent/executor/temporal/__init__.py (1)
  • ensure_client (260-275)
tests/executor/test_inmemory_workflow_registry.py (1)
src/mcp_agent/executor/workflow_registry.py (17)
  • InMemoryWorkflowRegistry (150-274)
  • register (30-46)
  • register (162-187)
  • get_workflow (60-73)
  • get_workflow (211-222)
  • resume_workflow (76-95)
  • resume_workflow (224-238)
  • cancel_workflow (98-111)
  • cancel_workflow (240-250)
  • get_workflow_status (114-127)
  • get_workflow_status (252-262)
  • unregister (49-57)
  • unregister (189-209)
  • list_workflow_statuses (130-137)
  • list_workflow_statuses (264-271)
  • list_workflows (140-147)
  • list_workflows (273-274)
src/mcp_agent/server/app_server.py (2)
src/mcp_agent/executor/temporal/workflow_registry.py (3)
  • resume_workflow (76-126)
  • cancel_workflow (128-170)
  • get_workflow (63-74)
src/mcp_agent/executor/workflow_registry.py (6)
  • resume_workflow (76-95)
  • resume_workflow (224-238)
  • cancel_workflow (98-111)
  • cancel_workflow (240-250)
  • get_workflow (60-73)
  • get_workflow (211-222)
src/mcp_agent/executor/workflow_registry.py (2)
src/mcp_agent/executor/workflow.py (2)
  • run_id (133-138)
  • cancel (403-424)
src/mcp_agent/server/app_server.py (3)
  • resume_workflow (775-835)
  • cancel_workflow (838-886)
  • get_workflow_status (741-772)
🔇 Additional comments (14)
src/mcp_agent/cli/cloud/main.py (1)

111-113: LGTM: style-only change to deploy command registration.

No functional differences; keeps the same handler and help.

src/mcp_agent/executor/temporal/workflow_registry.py (1)

63-75: LGTM: identifier resolution logic for get_workflow.

Prefers run_id, otherwise latest run for workflow_id; matches PR intent.

src/mcp_agent/executor/workflow_registry.py (2)

211-223: LGTM: get_workflow resolves by run_id or latest run for workflow_id.

Matches abstract contract and server expectations.


162-178: Ignore attribute access suggestion
workflow.id and workflow.run_id are both decorated with @property, so attribute access correctly invokes their getters. The callable-safety diff is unnecessary.

Likely an incorrect or invalid review comment.

src/mcp_agent/server/app_server.py (3)

742-745: API surface update looks good.

Allowing either run_id or workflow_id at the tool boundary is consistent with registry APIs.


753-759: Doc tweaks read clearly.

The “latest run for workflow_id” behavior is spelled out and matches registry semantics.


769-773: Good guardrail for missing identifiers.

Raising ToolError when both IDs are absent avoids ambiguous routing.

tests/executor/temporal/test_workflow_registry.py (3)

48-57: Temporal handle signaling path verified.

Good use of AsyncMock; asserts ensure correct signal name and payload.


117-176: Nice coverage for workflow_id-based resolution.

Covers latest-run lookup and handle invocation with both IDs.


214-238: Status merge shape looks good.

Asserting the merged local status plus temporal sub-doc prevents regressions.

tests/executor/test_inmemory_workflow_registry.py (4)

21-31: In-memory registry: workflow_id lookup tests look solid.

Latest-run selection is exercised and matches implementation.


56-82: Good resume path checks.

Asserting await arguments keeps the contract tight.


118-124: Error handling contract verified.

ValueError on missing identifiers matches registry behavior.


198-207: List APIs covered well.

Both statuses and instances are validated succinctly.

Comment on lines +4487 to 4516
**Function:** `get_workflow_status(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]`

- **Description**: Get the status of a running workflow. Provides detailed information about a workflow instance including its current state, whether it's running or completed, and any results or errors encountered. Args: workflow_name: The name of the workflow to check. run_id: The ID of the workflow instance to check, received from workflows/run or workflows/runs/list. Returns: A dictionary with comprehensive information about the workflow status.
- **Description**: Get the status of a running workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, returns status for the latest run for that workflow. Provides detailed information about a workflow instance including its current state, whether it's running or completed, and any results or errors encountered.
- **Parameters**
- `ctx` (MCPContext)
- `workflow_name` (str): The name of the workflow to check.
- `run_id` (str): The ID of the workflow instance to check, received from workflows/run or workflows/runs/list.
- `run_id` (str | None, optional): Optional run ID of the workflow to check. If omitted, the server will use the latest run for the workflow_id provided.
- `workflow_id` (str | None, optional): Optional workflow identifier (usually the tool/workflow name). If omitted, the server will infer it from the run metadata when possible.
- **Returns**
- `Dict[str, Any]`: A dictionary with comprehensive information about the workflow status.

**Function:** `resume_workflow(ctx: MCPContext, run_id: str, workflow_name: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool`
**Function:** `resume_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool`

- **Description**: Resume a paused workflow. Args: run_id: The ID of the workflow to resume, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to resume. signal_name: Optional name of the signal to send to resume the workflow. This will default to "resume", but can be a custom signal name if the workflow was paused on a specific signal. payload: Optional payload to provide the workflow upon resumption. For example, if a workflow is waiting for human input, this can be the human input. Returns: True if the workflow was resumed, False otherwise.
- **Description**: Resume a paused workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, resumes the latest run for that workflow.
- **Parameters**
- `ctx` (MCPContext)
- `run_id` (str): The ID of the workflow to resume, received from workflows/run or workflows/runs/list.
- `workflow_name` (str | None, optional): The name of the workflow to resume.
- `run_id` (str | None, optional): The ID of the workflow to resume, received from workflows/run or workflows/runs/list. If not specified, the latest run for the workflow_id will be used.
- `workflow_id` (str | None, optional): The ID of the workflow to resume, received from workflows/run or workflows/runs/list.
- `signal_name` (str | None, optional): Optional name of the signal to send to resume the workflow. This will default to "resume", but can be a custom signal name if the workflow was paused on a specific signal.
- `payload` (str | None, optional): Optional payload to provide the workflow upon resumption. For example, if a workflow is waiting for human input, this can be the human input.
- **Returns**
- `bool`: True if the workflow was resumed, False otherwise.

**Function:** `cancel_workflow(ctx: MCPContext, run_id: str, workflow_name: str | None = None) -> bool`
**Function:** `cancel_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> bool`

- **Description**: Cancel a running workflow. Args: run_id: The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to cancel. Returns: True if the workflow was cancelled, False otherwise.
- **Description**: Cancel a running workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, cancels the latest run for that workflow.
- **Parameters**
- `ctx` (MCPContext)
- `run_id` (str): The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list.
- `workflow_name` (str | None, optional): The name of the workflow to cancel.
- `run_id` (str | None, optional): The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list. If not provided, will attempt to cancel the latest run for the provided workflow ID.
- `workflow_id` (str | None, optional): The ID of the workflow to cancel, received from workflows/run or workflows/runs/list.
- **Returns**
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Server endpoints: validate inputs, document precedence, and use keyword-only params

  • Enforce that at least one ID is provided; 400 on neither.
  • If both provided and conflict, 400 with message; otherwise prefer run_id.
  • Make params keyword-only to match registry.
  • Response should include which IDs were used and whether selection was “latest”.

Proposed signatures:

- async def get_workflow_status(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]:
+ async def get_workflow_status(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]:

- async def resume_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool:
+ async def resume_workflow(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool:

- async def cancel_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ async def cancel_workflow(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:

Also clarify in docs that workflow_id refers to Workflow.id (workflow type instance ID), not a tool name.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In LLMS.txt around lines 4487-4516, the server endpoints for
get_workflow_status/resume_workflow/cancel_workflow need input validation and
clearer parameter semantics: make run_id and workflow_id keyword-only, return
400 if neither is provided, if both are provided validate they refer to the same
instance and return 400 on conflict (clear error message), otherwise prefer
run_id when both are present; when workflow_id is used without run_id resolve
the latest run and mark selection="latest" in the response; include in the
response which identifier was used (run_id or workflow_id) and document that
workflow_id refers to Workflow.id (the workflow type/instance ID), not a tool
name.

Comment on lines 129 to 155
self, run_id: str | None = None, workflow_id: str | None = None
) -> bool:
if not (run_id or workflow_id):
raise ValueError("Either run_id or workflow_id must be provided.")

# Ensure the Temporal client is connected
await self._executor.ensure_client()

try:
# Get the workflow from local registry
workflow = await self.get_workflow(run_id)
workflow_id = (
workflow.name if workflow and workflow_id is None else workflow_id
)
workflow = await self.get_workflow(run_id, workflow_id)
if workflow and not workflow_id:
workflow_id = workflow.name

# For temporal operations, we need to have both workflow_id and run_id
if not workflow_id:
# In Temporal, we need both workflow_id and run_id to target a specific run
logger.error(
f"Workflow with run_id {run_id} not found in local registry and workflow_id not provided"
f"Cannot cancel workflow: workflow_id not found for run_id {run_id or 'unknown'}"
)
return False

if not run_id:
# Get the run_id from the workflow_ids dict if we have a workflow_id
run_ids = self._workflow_ids.get(workflow_id, [])
if run_ids:
run_id = run_ids[-1] # Use the latest run

if not run_id:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Mirror the same id inference fix in cancel().

Avoid workflow.name; prefer workflow.id (fallback to name). Keep latest-run logic identical to resume().

Apply this diff:

@@
-            workflow = await self.get_workflow(run_id, workflow_id)
-            if workflow and not workflow_id:
-                workflow_id = workflow.name
+            workflow = await self.get_workflow(run_id, workflow_id)
+            if workflow and not workflow_id:
+                workflow_id = getattr(workflow, "id", None) or getattr(workflow, "name", None)
@@
-            if not run_id:
-                # Get the run_id from the workflow_ids dict if we have a workflow_id
-                run_ids = self._workflow_ids.get(workflow_id, [])
-                if run_ids:
-                    run_id = run_ids[-1]  # Use the latest run
+            if not run_id:
+                run_ids = self._workflow_ids.get(workflow_id, [])
+                if run_ids:
+                    run_id = run_ids[-1]

Also applies to: 156-171

🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/workflow_registry.py around lines 129-155
(and likewise 156-171), the cancel() method should mirror the id inference used
by resume(): when get_workflow returns a workflow prefer workflow.id and fall
back to workflow.name, and when run_id is missing use the latest run from
self._workflow_ids[workflow_id] (run_ids[-1]). Update the logic so workflow_id
is assigned from workflow.id if present, otherwise workflow.name, and ensure the
run_id lookup and fallback behavior exactly match resume()’s latest-run logic.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
src/mcp_agent/server/app_server.py (4)

776-786: resume_workflow does not return a value (declared -> bool).

The function logs success/failure but never returns the result, so callers get None.

Apply this diff:

@@
 async def resume_workflow(
     ctx: MCPContext,
     run_id: str | None = None,
     workflow_id: str | None = None,
     signal_name: str | None = "resume",
-    payload: str | None = None,
+    payload: Any | None = None,
 ) -> bool:
@@
     if result:
         logger.debug(
             f"Signaled workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' and payload '{payload}'"
         )
     else:
         logger.error(
             f"Failed to signal workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' and payload '{payload}'"
         )
+    return bool(result)

Also widened payload type to Any to match registry and Temporal signal expectations.

Also applies to: 807-836


1396-1405: gateway_url fallback can capture only a proto (e.g., "https").

Using X-Forwarded-Proto in the primary chain prevents proper URL reconstruction below. Remove it from the initial pick and use it only during reconstruction.

Apply this diff:

-                        gateway_url = (
-                            h.get("X-MCP-Gateway-URL")
-                            or h.get("X-Forwarded-Url")
-                            or h.get("X-Forwarded-Proto")
-                        )
+                        gateway_url = (
+                            h.get("X-MCP-Gateway-URL")
+                            or h.get("X-Forwarded-Url")
+                        )
                         # Best-effort reconstruction if only proto/host provided
                         if gateway_url is None:
                             proto = h.get("X-Forwarded-Proto") or "http"
                             host = h.get("X-Forwarded-Host") or h.get("Host")
                             if host:
                                 gateway_url = f"{proto}://{host}"

1282-1285: Per-workflow get_status passes unsupported parameter name.

_workflow_status no longer accepts workflow_name. This will raise TypeError.

Apply this diff:

-    async def get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
-        _set_upstream_from_request_ctx_if_available(ctx)
-        return await _workflow_status(ctx, run_id=run_id, workflow_name=workflow_name)
+    async def get_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
+        _set_upstream_from_request_ctx_if_available(ctx)
+        return await _workflow_status(ctx, run_id=run_id)

1459-1476: _workflow_status uses workflow.id as an attribute, not a method.

Workflow.id is a method (per executor base), so workflow_id gets a bound method object, breaking downstream lookups.

Apply this diff:

-    if not workflow_id:
-        workflow = await workflow_registry.get_workflow(
-            run_id=run_id, workflow_id=workflow_id
-        )
-        workflow_id = workflow.id if workflow else None
+    if not workflow_id:
+        workflow = await workflow_registry.get_workflow(
+            run_id=run_id, workflow_id=workflow_id
+        )
+        if workflow:
+            # Prefer method if available; fall back to known attrs
+            wid = getattr(workflow, "id", None)
+            workflow_id = wid() if callable(wid) else getattr(workflow, "workflow_id", None)
🧹 Nitpick comments (9)
examples/mcp_agent_server/asyncio/mcp_agent.config.yaml (2)

10-12: Relying on PATH for uvx—document prerequisite or add a fallback.

Switching to "uvx" is fine, but it assumes uv is installed on PATH. Consider documenting this in the example’s README or supporting a fallback (e.g., pipx) to reduce setup friction.


14-16: Pin the server-filesystem version to avoid surprise upgrades.

Unpinned "@modelcontextprotocol/server-filesystem" can pick up breaking changes. Recommend pinning a semver range.

src/mcp_agent/server/app_server.py (7)

753-759: Clarify behavior when both run_id and workflow_id are provided.

Docstring explains each param well; add one line on precedence/mismatch handling for clarity.


816-818: Avoid logging full payloads at info; downgrade or redact.

Signal payloads may include sensitive data. Log at debug and/or log only type/size.

Apply this diff:

-        logger.info(
-            f"Resuming workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' and payload '{payload}'"
-        )
+        logger.debug(
+            f"Resuming workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' "
+            f"(payload_type={type(payload).__name__})"
+        )
@@
-            logger.debug(
-                f"Signaled workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}' and payload '{payload}'"
-            )
+            logger.debug(
+                f"Signaled workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'} with signal '{signal_name}'"
+            )

Also applies to: 830-835


839-889: Cancel tool: return handling is correct; minor log consistency.

Consider making success/error messages parallel (same ID ordering/phrasing) for grep-ability.

Apply this diff:

-            logger.error(
-                f"Failed to cancel workflow {workflow_id or 'unknown'} with ID {run_id or 'unknown'}"
-            )
+            logger.error(
+                f"Failed to cancel workflow ID {workflow_id or 'unknown'}, run ID {run_id or 'unknown'}"
+            )

1047-1052: Thread workflow_id through to _wait_for_completion for robustness.

You already return workflow_id from _workflow_run; pass it here to disambiguate if run_id collisions ever arise across engines.

Apply this diff:

-                    result_ids = await _workflow_run(ctx, bound_wname, kwargs)
-                    run_id = result_ids["run_id"]
-                    result = await _wait_for_completion(ctx, run_id)
+                    result_ids = await _workflow_run(ctx, bound_wname, kwargs)
+                    run_id = result_ids["run_id"]
+                    wf_id = result_ids.get("workflow_id")
+                    result = await _wait_for_completion(ctx, run_id, workflow_id=wf_id)

1371-1380: Avoid mutating caller-supplied run_parameters in-place.

Copy before injecting internal keys to prevent side-effects for callers reusing the dict.

Apply this diff:

-        run_parameters = run_parameters or {}
+        run_parameters = dict(run_parameters or {})

1417-1422: Storing gateway_token in Temporal memo—assess secret exposure.

Temporal memos are durable and broadly visible; persisting tokens there increases blast radius. Prefer an opaque, short-lived capability token or an execution_id that the gateway maps to a token in memory/DB with TTL.

I can propose a minimal design for ephemeral tokens if helpful.


810-815: Use the common resolver for the registry to match other tools.

Accessing ctx.request_context.lifespan_context directly can fail under attached-server mode. Reuse _resolve_workflow_registry.

Apply this diff:

-        server_context: ServerContext = ctx.request_context.lifespan_context
-        workflow_registry = server_context.workflow_registry
+        workflow_registry = _resolve_workflow_registry(ctx)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b4c1e52 and 9eefba1.

📒 Files selected for processing (3)
  • examples/mcp_agent_server/asyncio/mcp_agent.config.yaml (1 hunks)
  • src/mcp_agent/executor/workflow_registry.py (5 hunks)
  • src/mcp_agent/server/app_server.py (12 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/mcp_agent/executor/workflow_registry.py
🧰 Additional context used
🧬 Code graph analysis (1)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/workflow.py (2)
  • run_id (133-138)
  • id (126-130)
src/mcp_agent/executor/workflow_registry.py (6)
  • resume_workflow (76-95)
  • resume_workflow (224-240)
  • cancel_workflow (98-111)
  • cancel_workflow (242-254)
  • get_workflow (60-73)
  • get_workflow (211-222)
src/mcp_agent/executor/temporal/workflow_registry.py (3)
  • resume_workflow (76-126)
  • cancel_workflow (128-170)
  • get_workflow (63-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks / test
🔇 Additional comments (3)
src/mcp_agent/server/app_server.py (3)

742-745: API accepts either run_id or workflow_id—LGTM.

Signature change aligns with the registry semantics and improves ergonomics.


769-773: Good guard for missing identifiers.

Raising ToolError when both IDs are None is correct.


963-968: _wait_for_completion now supports workflow_id—LGTM.

Passing workflow_id to registry and status polling improves disambiguation across multiple runs.

Also applies to: 985-990, 1014-1016

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/mcp_agent/executor/temporal/workflow_registry.py (3)

149-167: Allow cancellation by workflow_id alone (Temporal should resolve the active/latest run)

Avoid failing when run_id isn’t locally known; build the handle with only workflow_id when necessary.

Apply this diff:

-            if not run_id:
-                logger.error(
-                    f"Cannot cancel workflow: run_id not found for workflow_id {workflow_id}"
-                )
-                return False
-
-            # Get the handle and cancel the workflow
-            handle = self._executor.client.get_workflow_handle(
-                workflow_id=workflow_id, run_id=run_id
-            )
+            # Get the handle and cancel the workflow (prefer specific run; otherwise latest)
+            if run_id:
+                handle = self._executor.client.get_workflow_handle(
+                    workflow_id=workflow_id, run_id=run_id
+                )
+            else:
+                handle = self._executor.client.get_workflow_handle(
+                    workflow_id=workflow_id
+                )
             await handle.cancel()

173-201: Return type mismatch (bool) and brittle run_id resolution — return None and fall back to Temporal describe()

get_workflow_status() is annotated to return Optional[Dict[str, Any]], but returns False on error. Also, requiring run_id from the local cache defeats the workflow_id-only API after restarts.

Apply this diff:

-        if not (run_id or workflow_id):
-            raise ValueError("Either run_id or workflow_id must be provided.")
+        if not (run_id or workflow_id):
+            raise ValueError("Either run_id or workflow_id must be provided.")
@@
-        if not workflow_id:
+        if not workflow_id:
             logger.error(
-                f"Cannot get status: workflow_id not found for run_id {run_id or 'unknown'}"
+                f"Cannot get status: workflow_id not found for run_id {run_id or 'unknown'}"
             )
-            return False
+            return None
@@
-        if not run_id:
-            # Get the run_id from the workflow_ids dict if we have a workflow_id
-            run_ids = self._workflow_ids.get(workflow_id, [])
-            if run_ids:
-                run_id = run_ids[-1]  # Use the latest run
+        if not run_id:
+            # Try local cache first
+            run_ids = self._workflow_ids.get(workflow_id, [])
+            if run_ids:
+                run_id = run_ids[-1]
+            else:
+                # Fall back to Temporal to resolve the latest run for this workflow_id
+                try:
+                    await self._executor.ensure_client()
+                    handle = self._executor.client.get_workflow_handle(workflow_id=workflow_id)
+                    desc = await handle.describe()
+                    # Temporal's response shape differs by version; try common attributes
+                    run_id = getattr(desc, "run_id", getattr(getattr(desc, "execution", None), "run_id", None))
+                except Exception as e:
+                    logger.error(f"Cannot get status: failed to resolve run_id for workflow_id {workflow_id}: {e}")
+                    return None
@@
-        if not run_id:
+        if not run_id:
             logger.error(
-                f"Cannot get status: run_id not found for workflow_id {workflow_id}"
+                f"Cannot get status: run_id not found for workflow_id {workflow_id}"
             )
-            return False
+            return None

101-118: Use Temporal’s native latest-run resolution by omitting run_id
Temporal’s Client.get_workflow_handle(workflow_id) without run_id automatically targets the latest run, removing the need for local cache fallback and error handling (docs.temporal.io, python.temporal.io).
Apply this diff:

-            if not run_id:
-                logger.error(
-                    f"Cannot resume workflow: run_id not found for workflow_id {workflow_id}"
-                )
-                return False
-
-            # Get the handle and send the signal
-            handle = self._executor.client.get_workflow_handle(
-                workflow_id=workflow_id, run_id=run_id
-            )
+            # Get the handle (prefer specific run when available; fall back to latest)
+            if run_id:
+                handle = self._executor.client.get_workflow_handle(
+                    workflow_id=workflow_id, run_id=run_id
+                )
+            else:
+                handle = self._executor.client.get_workflow_handle(
+                    workflow_id=workflow_id
+                )
             await handle.signal(signal_name, payload)
♻️ Duplicate comments (1)
src/mcp_agent/executor/temporal/workflow_registry.py (1)

138-141: Mirror the workflow.id callable fix in cancel()

Same issue as resume(): avoid storing a bound method as workflow_id.

Apply this diff:

-            if workflow and not workflow_id:
-                workflow_id = workflow.id or workflow.name
+            if workflow and not workflow_id:
+                wid = getattr(workflow, "id", None)
+                if callable(wid):
+                    wid = wid()
+                workflow_id = wid or getattr(workflow, "name", None)
🧹 Nitpick comments (2)
src/mcp_agent/executor/temporal/workflow_registry.py (2)

221-223: Unify workflow.id access in list_workflow_statuses()

Same callable-vs-attribute issue; make it robust.

Apply this diff:

-            workflow_id = workflow.id or workflow.name
+            wid = getattr(workflow, "id", None)
+            if callable(wid):
+                wid = wid()
+            workflow_id = wid or getattr(workflow, "name", None)

41-47: Guard against None identifiers during register/unregister to avoid None keys and cache corruption

Storing None in _local_workflows/_workflow_ids will collide/overwrite entries across runs.

Apply this diff:

     async def register(
         self,
         workflow: "Workflow",
         run_id: str | None = None,
         workflow_id: str | None = None,
         task: Optional["asyncio.Task"] = None,
     ) -> None:
-        self._local_workflows[run_id] = workflow
+        if run_id is None or workflow_id is None:
+            logger.warning("register called without run_id or workflow_id; skipping local indexing")
+            return
+        self._local_workflows[run_id] = workflow
@@
-        if workflow_id not in self._workflow_ids:
+        if workflow_id not in self._workflow_ids:
             self._workflow_ids[workflow_id] = []
         self._workflow_ids[workflow_id].append(run_id)

And in unregister() prefer id() when available:

-            workflow_id = workflow.name if workflow_id is None else workflow_id
+            if workflow_id is None:
+                wid = getattr(workflow, "id", None)
+                if callable(wid):
+                    wid = wid()
+                workflow_id = wid or getattr(workflow, "name", None)

Also applies to: 48-58

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9eefba1 and 4130341.

📒 Files selected for processing (1)
  • src/mcp_agent/executor/temporal/workflow_registry.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/mcp_agent/executor/temporal/workflow_registry.py (3)
src/mcp_agent/executor/workflow.py (2)
  • run_id (133-138)
  • id (126-130)
src/mcp_agent/server/app_server.py (1)
  • resume_workflow (775-835)
src/mcp_agent/executor/temporal/__init__.py (1)
  • ensure_client (260-275)
🔇 Additional comments (2)
src/mcp_agent/executor/temporal/workflow_registry.py (2)

64-74: LGTM: cache-backed lookup for latest run by workflow_id

Validation + latest-run resolution via local cache looks correct for the intended behavior.


90-93: No change needed: workflow.id is a property
The Workflow class defines id with @property (src/mcp_agent/executor/workflow.py:125–127), so workflow.id correctly returns a str | None and isn’t a bound method.

Likely an incorrect or invalid review comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/mcp_agent/server/app_server.py (1)

101-110: Bug: membership check uses context.workflows (likely a MagicMock) instead of the real workflows dict.

This will short-circuit registration outside the managed lifespan path. Use self.workflows for both the check and the write.

Here’s the corrected snippet (outside the changed hunk):

def register_workflow(self, workflow_name: str, workflow_cls: Type[Workflow]):
    """Register a workflow class."""
    if workflow_name not in self.workflows:
        self.workflows[workflow_name] = workflow_cls
        registered_workflow_tools = _get_registered_workflow_tools(self.mcp)
        if workflow_name not in registered_workflow_tools:
            create_workflow_specific_tools(self.mcp, workflow_name, workflow_cls)
            registered_workflow_tools.add(workflow_name)
♻️ Duplicate comments (3)
LLMS.txt (3)

2314-2328: Make run_id/workflow_id keyword-only and add deterministic selection + conflict checks (Temporal registry).

Current signatures accept positional args, “latest run” isn’t defined, and conflicts aren’t handled when both IDs are provided. Enforce keyword-only, centralize resolution, define “latest” deterministically, and 400/raise on conflicts. This mirrors prior feedback.

- def get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:
+ def get_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:

- def resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:
+ def resume_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:

- def cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ def cancel_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:

- def get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
+ def get_workflow_status(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:

Add a shared resolver used by all four:

+ def _resolve_target(self, *, run_id: str | None, workflow_id: str | None) -> tuple[str, str]:
+   if not run_id and not workflow_id:
+     raise ValueError("Either run_id or workflow_id is required.")
+   # If both provided, verify they refer to the same run (prefer run_id but error on mismatch)
+   if run_id and workflow_id:
+     resolved = self._lookup_by_run_id(run_id)
+     if not resolved or resolved.id() != workflow_id:
+       raise ValueError("run_id and workflow_id refer to different runs.")
+     return workflow_id, run_id
+   if workflow_id and not run_id:
+     run_id = self._latest_run_id_for(workflow_id)
+     if not run_id:
+       raise ValueError(f"No runs found for workflow_id={workflow_id}.")
+   return (workflow_id or self._lookup_by_run_id(run_id).id(), run_id)
+
+ def _latest_run_id_for(self, workflow_id: str) -> str | None:
+   """
+   Deterministically pick latest run:
+   1) latest active by updated_at desc
+   2) else most recent start_time
+   3) tie-break by run_id lexicographically
+   """
+   ...

Also update docstrings to spell out the above semantics and the conflict behavior.


2664-2705: Propagate keyword-only + helper to abstract base; document/validate contracts.

Base API should require keyword-only params, define the contract for “latest”, and mandate raising on (run_id, workflow_id) mismatch. Provide a protected helper so all registries are consistent.

- def get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:
+ def get_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:

- def resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:
+ def resume_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:

- def cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ def cancel_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:

- def get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
+ def get_workflow_status(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
+ def _resolve_target(self, *, run_id: str | None, workflow_id: str | None) -> tuple[str, str]:
+   """Common resolution + validation. See subclass overrides for data sources."""
+   raise NotImplementedError

Verification script to ensure no positional callers remain:

#!/bin/bash
rg -nP '\b(get_workflow|resume_workflow|cancel_workflow|get_workflow_status)\s*\(\s*[^*]' -g "src/**/*.py" -C2

4487-4516: Server endpoints: require keyword-only, validate inputs, prefer run_id, and surface selection metadata.

Return 400 if neither ID is provided; if both and mismatch, 400 with clear message; otherwise prefer run_id. Include selection="latest" and id_used in responses. Also make params keyword-only. This aligns with earlier feedback.

- async def get_workflow_status(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]:
+ async def get_workflow_status(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]:

- async def resume_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool:
+ async def resume_workflow(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool:

- async def cancel_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ async def cancel_workflow(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ def _validate_ids_or_400(run_id: str | None, workflow_id: str | None) -> tuple[str | None, str | None]:
+   if not run_id and not workflow_id:
+     raise ToolError("Either run_id or workflow_id is required.", status_code=400)
+   return run_id, workflow_id
+
+ # Example inside get_workflow_status:
+ run_id, workflow_id = _validate_ids_or_400(run_id, workflow_id)
+ wf = registry.get_workflow(run_id=run_id, workflow_id=workflow_id)
+ if not wf:
+   raise ToolError("Workflow not found.", status_code=404)
+ status = wf.get_status()
+ status["selection"] = "latest" if workflow_id and not run_id else "explicit"
+ status["id_used"] = {"run_id": wf.run_id(), "workflow_id": wf.id()}
+ return status

Also clarify in endpoint docs that workflow_id is the Workflow.id (type instance ID), not a tool name.

🧹 Nitpick comments (1)
src/mcp_agent/server/app_server.py (1)

922-974: Minor: pass workflow_id during grace-loop lookup.

You pass workflow_id on the fast path; do the same in the grace loop for consistency.

-                wf = await registry.get_workflow(run_id)
+                wf = await registry.get_workflow(run_id, workflow_id)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4130341 and da7ebdd.

📒 Files selected for processing (3)
  • LLMS.txt (4 hunks)
  • src/mcp_agent/server/app_server.py (15 hunks)
  • tests/server/test_app_server.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/workflow.py (5)
  • run_id (133-138)
  • get_status (580-621)
  • resume (366-401)
  • cancel (403-424)
  • id (126-130)
src/mcp_agent/executor/temporal/workflow_registry.py (3)
  • resume_workflow (76-126)
  • cancel_workflow (128-170)
  • get_workflow (63-74)
src/mcp_agent/executor/workflow_registry.py (7)
  • resume_workflow (76-95)
  • resume_workflow (224-240)
  • cancel_workflow (98-111)
  • cancel_workflow (242-254)
  • get_workflow (60-73)
  • get_workflow (211-222)
  • WorkflowRegistry (20-147)
🔇 Additional comments (8)
tests/server/test_app_server.py (3)

311-317: Good update: expect the new per-workflow tools and count.

The assertions reflect the added resume/cancel tools and verify all 8 registrations. LGTM.

Also applies to: 319-321


347-351: Idempotency on dynamic registration looks correct.

The expectation of 4 tools for workflow3 (run, get_status, resume, cancel) matches the new behavior.


406-411: Persistent registration across SSE requests validated.

Solid coverage to ensure resume/cancel persist too.

src/mcp_agent/server/app_server.py (5)

671-673: Expose resume/cancel in discovery.

Adding per-workflow endpoints and capabilities is correct and aligns with the new API surface.

Also applies to: 676-681


744-775: Top-level get_status dual-ID contract is sound.

Accepting either run_id or workflow_id with an explicit ToolError on neither is the right UX.


822-848: Top-level cancel dual-ID contract is sound.

Validation mirrors get_status. Looks good.


1223-1227: Run tool return description is correct.

Docstring now clearly states the returned IDs. LGTM.


1489-1510: Status helper: dual-ID validation and backfill are correct.

Gracefully resolving workflow_id from run_id before delegating to the registry is good.

Comment on lines +2731 to 2741
**Function:** `InMemoryWorkflowRegistry.get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']`


**Function:** `InMemoryWorkflowRegistry.resume_workflow(self, run_id: str, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`
**Function:** `InMemoryWorkflowRegistry.resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`


**Function:** `InMemoryWorkflowRegistry.cancel_workflow(self, run_id: str, workflow_id: str | None = None) -> bool`
**Function:** `InMemoryWorkflowRegistry.cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool`


**Function:** `InMemoryWorkflowRegistry.get_workflow_status(self, run_id: str, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`
**Function:** `InMemoryWorkflowRegistry.get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

InMemory registry: implement deterministic latest-run and thread-safety.

Ensure latest-run selection follows the documented order and is deterministic; guard internal maps with a lock to avoid races under asyncio.

- def get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:
+ def get_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:
    ...
- def resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:
+ def resume_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:
    ...
- def cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ def cancel_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:
    ...
- def get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
+ def get_workflow_status(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
    ...
+ # Example selection helper
+ def _latest_run_id_for(self, workflow_id: str) -> str | None:
+   with self._lock:
+     runs = self._index.get(workflow_id, [])
+     if not runs:
+       return None
+     runs = sorted(
+       runs,
+       key=lambda r: (r.status.get("active", False), r.status.get("updated_at") or 0, r.run_id),
+       reverse=True,
+     )
+     return runs[0].run_id

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 1258 to 1294
@mcp.tool(
name=f"workflows-{workflow_name}-resume",
description=f"""
Resume a paused {workflow_name} workflow.
Args:
run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run.
If not provided, will use the latest run for the provided workflow_id.
workflow_id: The workflow ID of the running workflow, received from workflows/{workflow_name}/run.
If not provided, will use the specific run ID requested.
signal_name: Optional name of the signal to send to resume the workflow.
This will default to "resume", but can be a custom signal name
if the workflow was paused on a specific signal.
payload: Optional payload to provide the workflow upon resumption.
For example, if a workflow is waiting for human input,
this can be the human input.
Returns:
True if the workflow was resumed, False otherwise.
""",
)
async def resume(
ctx: MCPContext,
run_id: str | None = None,
workflow_id: str | None = None,
signal_name: str | None = "resume",
payload: str | None = None,
) -> Dict[str, Any]:
_set_upstream_from_request_ctx_if_available(ctx)
return await _workflow_resume(
ctx,
run_id=run_id,
workflow_id=workflow_id,
signal_name=signal_name,
payload=payload,
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Per-workflow resume: wrong return type, narrow payload type, and missing validation.

  • Function annotates Dict[str, Any] but returns bool.
  • payload should be Any | None.
  • Add ToolError when both IDs are missing (mirror top-level tool).
-    async def resume(
+    async def resume(
         ctx: MCPContext,
         run_id: str | None = None,
         workflow_id: str | None = None,
         signal_name: str | None = "resume",
-        payload: str | None = None,
-    ) -> Dict[str, Any]:
+        payload: Any | None = None,
+    ) -> bool:
         _set_upstream_from_request_ctx_if_available(ctx)
+        if run_id is None and workflow_id is None:
+            raise ToolError("Either run_id or workflow_id must be provided.")
         return await _workflow_resume(
             ctx,
             run_id=run_id,
             workflow_id=workflow_id,
             signal_name=signal_name,
             payload=payload,
         )

Also update the docstring occurrences of “workflows/{workflow_name}/run” to “workflows-{workflow_name}-run”.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@mcp.tool(
name=f"workflows-{workflow_name}-resume",
description=f"""
Resume a paused {workflow_name} workflow.
Args:
run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run.
If not provided, will use the latest run for the provided workflow_id.
workflow_id: The workflow ID of the running workflow, received from workflows/{workflow_name}/run.
If not provided, will use the specific run ID requested.
signal_name: Optional name of the signal to send to resume the workflow.
This will default to "resume", but can be a custom signal name
if the workflow was paused on a specific signal.
payload: Optional payload to provide the workflow upon resumption.
For example, if a workflow is waiting for human input,
this can be the human input.
Returns:
True if the workflow was resumed, False otherwise.
""",
)
async def resume(
ctx: MCPContext,
run_id: str | None = None,
workflow_id: str | None = None,
signal_name: str | None = "resume",
payload: str | None = None,
) -> Dict[str, Any]:
_set_upstream_from_request_ctx_if_available(ctx)
return await _workflow_resume(
ctx,
run_id=run_id,
workflow_id=workflow_id,
signal_name=signal_name,
payload=payload,
)
@mcp.tool(
name=f"workflows-{workflow_name}-resume",
description=f"""
Resume a paused {workflow_name} workflow.
Args:
run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run.
If not provided, will use the latest run for the provided workflow_id.
workflow_id: The workflow ID of the running workflow, received from workflows/{workflow_name}/run.
If not provided, will use the specific run ID requested.
signal_name: Optional name of the signal to send to resume the workflow.
This will default to "resume", but can be a custom signal name
if the workflow was paused on a specific signal.
payload: Optional payload to provide the workflow upon resumption.
For example, if a workflow is waiting for human input,
this can be the human input.
Returns:
True if the workflow was resumed, False otherwise.
""",
)
async def resume(
ctx: MCPContext,
run_id: str | None = None,
workflow_id: str | None = None,
signal_name: str | None = "resume",
payload: Any | None = None,
) -> bool:
_set_upstream_from_request_ctx_if_available(ctx)
if run_id is None and workflow_id is None:
raise ToolError("Either run_id or workflow_id must be provided.")
return await _workflow_resume(
ctx,
run_id=run_id,
workflow_id=workflow_id,
signal_name=signal_name,
payload=payload,
)

@rholinshead rholinshead changed the title [WIP] Update Workflow Tool Calls to Work with workflow_id Feature: Update Workflow Tool Calls to Work with workflow_id Sep 9, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (6)
src/mcp_agent/server/app_server.py (6)

1243-1250: Per-workflow get_status docs: fix endpoint names and Returns text

Use hyphenated endpoint names and describe status payload, not run IDs.

Apply:

-        Args:
-            run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run.
+        Args:
+            run_id: The run ID of the running workflow, received from workflows-{workflow_name}-run.
                 If not provided, will use the latest run for the provided workflow_id.
-            workflow_id: The workflow ID of the running workflow, received from workflows/{workflow_name}/run.
+            workflow_id: The workflow ID of the running workflow, received from workflows-{workflow_name}-run.
                 If not provided, will use the specific run ID requested.
-
-        Returns:
-            A dict with workflow_id and run_id for the started workflow run, can be passed to
-            workflows/get_status, workflows/resume, and workflows/cancel.
+        Returns:
+            A dictionary containing detailed status (e.g., status, running, completed, result, error).
#!/bin/bash
rg -n "workflows/\{workflow_name\}/run" src/mcp_agent/server/app_server.py

Also applies to: 1252-1257


1295-1315: Per-workflow cancel: fix endpoint names and add guard

Mirror top-level validation to fail fast at the tool boundary.

Apply:

@@
-            run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run.
+            run_id: The run ID of the running workflow, received from workflows-{workflow_name}-run.
@@
-            workflow_id: The workflow ID of the running workflow, received from workflows/{workflow_name}/run.
+            workflow_id: The workflow ID of the running workflow, received from workflows-{workflow_name}-run.
@@
     async def cancel(
         ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None
     ) -> bool:
         _set_upstream_from_request_ctx_if_available(ctx)
+        if run_id is None and workflow_id is None:
+            raise ToolError("Either run_id or workflow_id must be provided.")
         return await _workflow_cancel(ctx, run_id=run_id, workflow_id=workflow_id)

779-783: Broaden resume payload type to Any | None (matches registry and JSON payloads)

Current annotation is str | None; payloads may be dict/JSON. Align with WorkflowRegistry.resume_workflow.

Apply:

@@
-        payload: str | None = None,
+        payload: Any | None = None,
@@
-        payload: str | None = None,
+        payload: Any | None = None,

Run to confirm no other str-typed payloads remain:

#!/bin/bash
rg -nP 'payload:\s*str\s*\|\s*None' src/mcp_agent/server/app_server.py

Also applies to: 1279-1285


1259-1278: Per-workflow resume: fix endpoint names, broaden payload type, add guard

  • Update doc endpoints to hyphenated style.
  • payload should be Any | None.
  • Add ToolError if both IDs are missing (mirrors top-level).

Apply:

@@
-            run_id: The run ID of the running workflow, received from workflows/{workflow_name}/run.
+            run_id: The run ID of the running workflow, received from workflows-{workflow_name}-run.
@@
-            workflow_id: The workflow ID of the running workflow, received from workflows/{workflow_name}/run.
+            workflow_id: The workflow ID of the running workflow, received from workflows-{workflow_name}-run.
@@
-        payload: str | None = None,
+        payload: Any | None = None,
     ) -> bool:
         _set_upstream_from_request_ctx_if_available(ctx)
+        if run_id is None and workflow_id is None:
+            raise ToolError("Either run_id or workflow_id must be provided.")
         return await _workflow_resume(

Also applies to: 1279-1285


1529-1573: _workflow_resume: context-agnostic registry resolution, ToolError, and Any payload

  • Use _resolve_workflow_registry (works for attached or lifespan contexts).
  • Raise ToolError for invalid inputs/missing registry.
  • payload should be Any | None, not str | None.

Apply:

-async def _workflow_resume(
+async def _workflow_resume(
     ctx: MCPContext,
     run_id: str | None = None,
     workflow_id: str | None = None,
     signal_name: str | None = "resume",
-    payload: str | None = None,
+    payload: Any | None = None,
 ) -> bool:
@@
-    if run_id is None and workflow_id is None:
-        raise ValueError("Either run_id or workflow_id must be provided.")
-
-    server_context: ServerContext = ctx.request_context.lifespan_context
-    workflow_registry = server_context.workflow_registry
-
-    if not workflow_registry:
-        raise ValueError("Workflow registry not found for MCPApp Server.")
+    if run_id is None and workflow_id is None:
+        raise ToolError("Either run_id or workflow_id must be provided.")
+    workflow_registry = _resolve_workflow_registry(ctx)
+    if not workflow_registry:
+        raise ToolError("Workflow registry not found for MCPApp Server.")

1575-1612: _workflow_cancel: same fixes as resume (registry resolution and ToolError)

Mirror resume changes for consistent behavior across helpers.

Apply:

-    if run_id is None and workflow_id is None:
-        raise ValueError("Either run_id or workflow_id must be provided.")
-
-    server_context: ServerContext = ctx.request_context.lifespan_context
-    workflow_registry = server_context.workflow_registry
-
-    if not workflow_registry:
-        raise ValueError("Workflow registry not found for MCPApp Server.")
+    if run_id is None and workflow_id is None:
+        raise ToolError("Either run_id or workflow_id must be provided.")
+    workflow_registry = _resolve_workflow_registry(ctx)
+    if not workflow_registry:
+        raise ToolError("Workflow registry not found for MCPApp Server.")
🧹 Nitpick comments (5)
src/mcp_agent/server/app_server.py (5)

1006-1011: Pass workflow_id to _wait_for_completion for faster/safer resolution

You already added workflow_id plumbed through _wait_for_completion; use it from the run result to avoid ambiguity when multiple runs exist.

Apply:

-                    result_ids = await _workflow_run(ctx, bound_wname, kwargs)
-                    run_id = result_ids["run_id"]
-                    result = await _wait_for_completion(ctx, run_id)
+                    result_ids = await _workflow_run(ctx, bound_wname, kwargs)
+                    run_id = result_ids["run_id"]
+                    workflow_id = result_ids.get("workflow_id")
+                    result = await _wait_for_completion(ctx, run_id, workflow_id=workflow_id)

731-734: Update run_workflow Returns doc to include execution_id and hyphenated endpoints

The function returns execution_id as well.

Apply:

-        Returns:
-            A dict with workflow_id and run_id for the started workflow run, can be passed to
-            workflows/get_status, workflows/resume, and workflows/cancel.
+        Returns:
+            A dict with workflow_id, run_id, and execution_id for the started workflow run.
+            Use with workflows-get_status, workflows-resume, and workflows-cancel.

694-696: Fix Returns doc: this returns a list, not a dict

Minor clarity nit.

Apply:

-        Returns:
-            A dictionary mapping workflow instance IDs to their detailed status information.
+        Returns:
+            A list of dictionaries with workflow status information for each registered run.

1223-1227: Per-workflow run tool: include execution_id and hyphenated endpoint names

Return docs should mention execution_id and use workflows-{workflow_name}-* names.

Apply:

-        Returns:
-            A dict with workflow_id and run_id for the started workflow run, can be passed to
-            workflows/get_status, workflows/resume, and workflows/cancel.
+        Returns:
+            A dict with workflow_id, run_id, and execution_id for the started workflow run.
+            Use with workflows-{workflow_name}-get_status, workflows-{workflow_name}-resume, and workflows-{workflow_name}-cancel.

1516-1524: Unregister session using resolved run_id from status

If only workflow_id was provided, run_id may be None. Use status["id"] fallback.

Apply:

-        state = str(status.get("status", "")).lower()
-        if state in ("completed", "error", "cancelled"):
-            try:
-                await _unregister_session(run_id)
+        state = str(status.get("status", "")).lower()
+        if state in ("completed", "error", "cancelled"):
+            try:
+                rid = run_id or status.get("id")
+                if rid:
+                    await _unregister_session(rid)
             except Exception:
                 pass
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between da7ebdd and 7459ef3.

📒 Files selected for processing (1)
  • src/mcp_agent/server/app_server.py (15 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/workflow.py (5)
  • run_id (133-138)
  • get_status (580-621)
  • resume (366-401)
  • cancel (403-424)
  • id (126-130)
src/mcp_agent/executor/temporal/workflow_registry.py (3)
  • resume_workflow (76-126)
  • cancel_workflow (128-170)
  • get_workflow (63-74)
src/mcp_agent/executor/workflow_registry.py (7)
  • resume_workflow (76-95)
  • resume_workflow (224-240)
  • cancel_workflow (98-111)
  • cancel_workflow (242-254)
  • get_workflow (60-73)
  • get_workflow (211-222)
  • WorkflowRegistry (20-147)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks / test
🔇 Additional comments (6)
src/mcp_agent/server/app_server.py (6)

671-673: Per-workflow endpoints exposed in list_workflows — LGTM

Exposing per-workflow cancel/resume alongside run/get_status looks consistent with the rest of the surface.


744-747: Dual-ID support in get_workflow_status signature — LGTM

Accepting run_id | workflow_id at the tool surface aligns with the registry.


755-761: Docs read clearly for dual-ID semantics — LGTM

Arguments describe latest-run behavior when only workflow_id is provided.


771-775: Good guardrail: require at least one ID

Raising ToolError keeps tool-surface semantics consistent.


809-812: Validation parity — LGTM

Top-level resume enforces “either run_id or workflow_id” via ToolError.


822-823: Cancel tool: dual-ID + validation — LGTM

Return type bool and ToolError guard match expectations.

Also applies to: 844-848

@rholinshead rholinshead requested a review from saqadri September 9, 2025 18:06
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (1)

439-445: Fix: string-evaluator path crashes — use evaluator_llm.instruction

self.evaluator can be a str (per init), so self.evaluator.instruction raises AttributeError. Use the LLM’s instruction instead.

-        Evaluate the following response based on these criteria:
-        {self.evaluator.instruction}
+        Evaluate the following response based on these criteria:
+        {self.evaluator_llm.instruction}
src/mcp_agent/server/app_server.py (2)

668-676: Fix list_workflows tool_endpoints: advertise existing top-level tools, not per-workflow get_status/cancel/resume.

The server only registers per-workflow "run" endpoints. Listing non-existent per-workflow get_status/cancel/resume is misleading.

Apply:

-                endpoints = [
-                    f"workflows-{workflow_name}-run",
-                    f"workflows-{workflow_name}-get_status",
-                    f"workflows-{workflow_name}-cancel",
-                    f"workflows-{workflow_name}-resume",
-                ]
+                endpoints = [
+                    f"workflows-{workflow_name}-run",
+                    "workflows-get_status",
+                    "workflows-cancel",
+                    "workflows-resume",
+                ]

1387-1396: Don’t treat X-Forwarded-Proto as a URL.

Setting gateway_url to “https” blocks the later proto+host reconstruction.

Apply:

-                        gateway_url = (
-                            h.get("X-MCP-Gateway-URL")
-                            or h.get("X-Forwarded-Url")
-                            or h.get("X-Forwarded-Proto")
-                        )
+                        gateway_url = (
+                            h.get("X-MCP-Gateway-URL") or h.get("X-Forwarded-Url")
+                        )
+                        # If not explicitly provided, reconstruct from proto/host below
♻️ Duplicate comments (3)
src/mcp_agent/server/app_server.py (3)

1458-1460: Raise ToolError (not ValueError) for missing IDs in _workflow_status.

Keeps tool-surface error semantics consistent with other endpoints.

Apply:

-    if not (run_id or workflow_id):
-        raise ValueError("Either run_id or workflow_id must be provided.")
+    if not (run_id or workflow_id):
+        raise ToolError("Either run_id or workflow_id must be provided.")

775-779: Broaden resume payload type to Any | None (aligns with registry).

Payloads can be dict/JSON; typing as str is limiting and inconsistent with WorkflowRegistry.

Apply:

-        payload: str | None = None,
+        payload: Any | None = None,

805-813: Use context-agnostic registry resolution in workflows-resume.

Directly reaching ctx.request_context.lifespan_context breaks with externally attached FastMCP. Use _resolve_workflow_registry for both modes.

Apply:

-        server_context: ServerContext = ctx.request_context.lifespan_context
-        workflow_registry = server_context.workflow_registry
+        workflow_registry = _resolve_workflow_registry(ctx)
🧹 Nitpick comments (13)
src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (4)

247-257: Normalize telemetry: store both numeric and label for rating

Emit a stable int for metrics and a human-readable label for logs. Current code passes the enum object.

-                    span.add_event(
+                    span.add_event(
                         f"refinement.{refinement_count}.evaluation_result",
                         {
                             "attempt": refinement_count + 1,
-                            "rating": evaluation_result.rating,
+                            "rating": evaluation_result.rating.value,
+                            "rating_label": evaluation_result.rating.name,
                             "feedback": evaluation_result.feedback,
                             "needs_improvement": evaluation_result.needs_improvement,
                             "focus_areas": evaluation_result.focus_areas,
                             **eval_response_attributes,
                         },
                     )

269-275: Normalize “new_best_response” event attributes

Same consistency as above: record value and label.

-                    span.add_event(
+                    span.add_event(
                         "new_best_response",
                         {
-                            "rating": best_rating,
+                            "rating": best_rating.value,
+                            "rating_label": best_rating.name,
                             "refinement": refinement_count,
                         },
                     )

473-476: Prompt clarity: prefer rating name over enum repr

The current f-string will render QualityRating.GOOD. Use the label to reduce token noise.

-        Quality Rating: {feedback.rating}
+        Quality Rating: {feedback.rating.name}

2-2: Prefer IntEnum and verify JSON compatibility

Minor API hygiene: subclass IntEnum for integer semantics. Also, switching enum “value” to ints can change JSON (pydantic serializes enums by value). Verify no callers depended on string values.

-from enum import Enum
+from enum import IntEnum
@@
-class QualityRating(int, Enum):
+class QualityRating(IntEnum):
     """Enum for evaluation quality ratings"""

If downstream expects string ratings, consider emitting rating_label (see telemetry suggestions) or customizing serialization.

Also applies to: 25-31

src/mcp_agent/workflows/llm/augmented_llm_openai.py (2)

504-505: Avoid clobbering core payload keys when merging metadata.

Blindly update-ing can override model, messages, response_format, or token limits, causing hard-to-debug 400s or behavior drift. Filter or namespace metadata.

Apply this diff here:

-            if params.metadata:
-                payload.update(params.metadata)
+            if params.metadata:
+                reserved = {
+                    "model",
+                    "messages",
+                    "response_format",
+                    "max_tokens",
+                    "max_completion_tokens",
+                    "stop",
+                    "user",
+                }
+                collisions = reserved & set(params.metadata.keys())
+                if collisions:
+                    self.logger.warning(
+                        "Ignoring metadata keys that collide with reserved payload keys",
+                        extra={"collisions": sorted(collisions)},
+                    )
+                payload.update({k: v for k, v in params.metadata.items() if k not in reserved})

If you intentionally want to allow overrides, invert the precedence: payload = {**params.metadata, **payload} and document it explicitly.


490-506: Deduplicate payload assembly across generate/generate_structured.

The token/user/stop/metadata logic is now duplicated in both paths and can drift. Extract into a small helper used by both.

Example:

-            # Use max_completion_tokens for reasoning models, max_tokens for others
-            if self._reasoning(model):
-                ...
-            else:
-                ...
-            user = ...
-            if user:
-                ...
-            if params.stopSequences is not None:
-                ...
-            if params.metadata:
-                ...
+            payload = self._apply_common_openai_args(payload, params, model)

Outside this hunk:

def _apply_common_openai_args(self, payload: dict, params: RequestParams, model: str) -> dict:
    if self._reasoning(model):
        payload["max_completion_tokens"] = params.maxTokens
        payload["reasoning_effort"] = self._reasoning_effort
    else:
        payload["max_tokens"] = params.maxTokens
    user = params.user or getattr(self.context.config.openai, "user", None)
    if user:
        payload["user"] = user
    if params.stopSequences is not None:
        payload["stop"] = params.stopSequences
    # apply filtered metadata as suggested above
    if params.metadata:
        reserved = {"model","messages","response_format","max_tokens","max_completion_tokens","stop","user"}
        payload.update({k: v for k, v in params.metadata.items() if k not in reserved})
    return payload
examples/temporal/router.py (4)

41-50: Update run() docstring to match the new no-arg signature.

Doc still references “input” and an Args/Returns section that no longer apply.

-        """
-        Run the workflow, processing the input data.
-
-        Args:
-            input_data: The data to process
-
-        Returns:
-            A WorkflowResult containing the processed data
-        """
+        """Run the workflow and produce a WorkflowResult."""

52-55: Make filesystem root injection idempotent.

Avoid appending duplicate CWD entries on repeated runs.

-        context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])
+        roots = context.config.mcp.servers["filesystem"].args
+        cwd = os.getcwd()
+        if cwd not in roots:
+            roots.append(cwd)

95-95: Log serializable payloads.

If results are Pydantic models, convert to dicts to avoid logger incompatibilities.

-        logger.info("Router Results:", data=results)
+        logger.info("Router Results:", data=[r.model_dump() if hasattr(r, "model_dump") else r for r in results])

Apply similarly to the other three “Router Results” logs.

Also applies to: 124-124, 134-134, 142-142


103-110: Guard read_file against missing config file
Add at top of examples/temporal/router.py:

from pathlib import Path

Then replace lines 103–110 with:

-            result = await agent.call_tool(
-                name="read_file",
-                arguments={
-                    "path": str(os.path.join(os.getcwd(), "mcp_agent.config.yaml"))
-                },
-            )
-            logger.info("read_file result:", data=result.model_dump())
+            cfg_path = Path(os.getcwd()) / "mcp_agent.config.yaml"
+            if not cfg_path.exists():
+                logger.warning("Config file not found in CWD; skipping read_file", cwd=os.getcwd())
+            else:
+                result = await agent.call_tool(
+                    name="read_file",
+                    arguments={"path": str(cfg_path)},
+                )
+                logger.info("read_file result:", data=result.model_dump())
src/mcp_agent/server/app_server.py (3)

731-734: Docstring endpoint names: use hyphenated tool names, not slashes.

Several docstrings still reference workflows/...; public tools are workflows-...

Apply:

@@
-            A dict with workflow_id and run_id for the started workflow run, can be passed to
-            workflows/get_status, workflows/resume, and workflows/cancel.
+            A dict with workflow_id and run_id for the started workflow run; pass to
+            workflows-get_status, workflows-resume, and workflows-cancel.
@@
-            run_id: Optional run ID of the workflow to check.
-                If omitted, the server will use the latest run for the workflow_id provided.
-                Received from workflows/run or workflows/runs/list.
+            run_id: Optional run ID of the workflow to check.
+                If omitted, the server will use the latest run for the workflow_id provided.
+                Received from workflows-run or workflows-runs-list.
@@
-            workflow_id: Optional workflow identifier (usually the tool/workflow name).
-                If omitted, the server will infer it from the run metadata when possible.
-                Received from workflows/run or workflows/runs/list.
+            workflow_id: Optional workflow identifier (usually the tool/workflow name).
+                If omitted, the server will infer it from the run metadata when possible.
+                Received from workflows-run or workflows-runs-list.
@@
-        Returns:
-            A dict with workflow_id and run_id for the started workflow run, can be passed to
-            workflows/get_status, workflows/resume, and workflows/cancel.
+        Returns:
+            A dict with workflow_id and run_id for the started workflow run; pass to
+            workflows-get_status, workflows-resume, and workflows-cancel.

Also applies to: 754-761, 1265-1267


694-696: Fix return description for workflows-runs-list.

Function returns a list, not a dict mapping.

Apply:

-        Returns:
-            A dictionary mapping workflow instance IDs to their detailed status information.
+        Returns:
+            A list of workflow run status payloads.

932-936: Docstring: async tool registration details are outdated.

We register a single alias tool (not -run) and no per-tool get_status alias. Update to prevent confusion.

Apply:

-    - @app.async_tool registers alias tools <name>-run and <name>-get_status
-      that proxy to the workflow run/status utilities.
+    - @app.async_tool registers an alias tool <name> that starts the workflow
+      asynchronously and returns IDs; use workflows-get_status to query status.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7459ef3 and 979c064.

📒 Files selected for processing (6)
  • examples/temporal/router.py (2 hunks)
  • src/mcp_agent/server/app_server.py (13 hunks)
  • src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (1 hunks)
  • src/mcp_agent/workflows/llm/augmented_llm_openai.py (1 hunks)
  • tests/server/test_app_server.py (3 hunks)
  • tests/server/test_tool_decorators.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
tests/server/test_tool_decorators.py (2)
src/mcp_agent/server/app_server.py (2)
  • app (112-114)
  • workflow_registry (122-124)
src/mcp_agent/app.py (1)
  • context (146-151)
examples/temporal/router.py (1)
src/mcp_agent/executor/workflow.py (1)
  • WorkflowResult (55-61)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/workflow.py (2)
  • run_id (133-138)
  • id (126-130)
src/mcp_agent/executor/temporal/workflow_registry.py (3)
  • resume_workflow (76-126)
  • cancel_workflow (128-170)
  • get_workflow (63-74)
src/mcp_agent/executor/workflow_registry.py (7)
  • resume_workflow (76-95)
  • resume_workflow (224-240)
  • cancel_workflow (98-111)
  • cancel_workflow (242-254)
  • get_workflow (60-73)
  • get_workflow (211-222)
  • WorkflowRegistry (20-147)
🔇 Additional comments (10)
src/mcp_agent/workflows/llm/augmented_llm_openai.py (2)

499-503: LGTM: user and stop propagation.

Propagating user and stop keeps parity with the non-structured path and is consistent with OpenAI API usage.


491-499: Parameters confirmed as correct. As of September 2025, the O-series reasoning models (o1, o3, o4/o4-mini) in the Chat Completions API use max_completion_tokens (not max_tokens) and the reasoning_effort parameter; no changes needed.

examples/temporal/router.py (2)

84-88: llm_factory migration looks good.

Providing a shared instance via lambda is fine for the example; switch to per-agent instantiation later if agent-specific context is needed.


151-153: start_workflow() call aligns with no-input run(). LGTM.

tests/server/test_tool_decorators.py (3)

81-83: LGTM: sync tools shouldn’t get workflows- aliases.*

Assertion matches the new contract (only the sync tool is added).


97-107: LGTM: registry-based status polling.

Using app.context.workflow_registry.get_workflow_status(run_id) aligns with the public API and removes the internal helper dependency.


159-173: LGTM: wrap path validates WorkflowResult unwrapping.

Polling via registry and checking both dict/value shapes is solid.

tests/server/test_app_server.py (3)

313-316: LGTM: only one run tool per workflow.

Count expectations reflect removal of per-workflow get_status aliases.


341-343: LGTM: new workflow registers a single run tool.

Matches idempotent registration logic.


397-399: LGTM: persistence across SSE requests.

Re-registration is idempotent for the same FastMCP instance.

mcp:
servers:
fetch:
command: "/Users/saqadri/.local/bin/uvx"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

Copy link
Collaborator

@saqadri saqadri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving to unblock, but worried about the change in behavior where we use the name vs. id.

)
workflow = await self.get_workflow(run_id, workflow_id)
if workflow and not workflow_id:
workflow_id = workflow.id or workflow.name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the previous behavior was effectively workflow.name or workflow.id... IIRC that was intentional but I don't remember why anymore. LMK if you noticed any issues

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't noticed any issues. Looking at the register/unregister it's already inconsistent in how workflow_id and name are used. Registration happens with workflow_id (which is optional, so potential for None errors). But then unregister will prioritize name if no workflow_id is given... My understanding is it's intended to just be a reference to map all the run_ids for the workflow so realistically using one or the other won't matter as long as it's consistent? The default asyncio workflow id is created as self._workflow_id = provided_workflow_id or self.name, so it uses what's provided at run call or defaults to the name. So id and name are synonymous for asyncio case unless workflow_id is provided, in which case we do want to prioritize id over name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can fix the register issue / consistency here

Comment on lines 671 to 672
f"workflows-{workflow_name}-cancel",
f"workflows-{workflow_name}-resume",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these

workflow = await workflow_registry.get_workflow(
run_id=run_id, workflow_id=workflow_id
)
workflow_id = workflow.id if workflow else None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also changing the behavior. Should it still default to workflow_name?

Suggested change
workflow_id = workflow.id if workflow else None
workflow_id = workflow.id if workflow and workflow.id else workflow_name



class QualityRating(str, Enum):
class QualityRating(int, Enum):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On main evaluator_optimizer is broken because the LLM is returning an str (e.g. "GOOD") and then, I think, the updated openai structured serialization is erroring because we expect a number for the enum value and get back a string. Changing this to int results in proper serialization

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see

Copy link
Member Author

@rholinshead rholinshead Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can undo and leave as failing for now, just wanted to make sure the example worked

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No let's go with it, that's totally fine

async def test_get_workflow_status_by_workflow_id(registry, mock_executor):
mock_workflow = MagicMock(name="test_workflow")
mock_workflow.id = "workflow-id"
mock_workflow.name = "workflow-id"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests should actually have a different value for workflow.name and .id, because I am worried about inconsistent use. This is a tricky part of the codebase. Workflow Name would be the class name or tool name, and workflow id would be a uuid

@rholinshead rholinshead merged commit ae27182 into main Sep 9, 2025
7 of 8 checks passed
andrew-lastmile added a commit that referenced this pull request Sep 11, 2025
* Temporarily exclude CLI from test coverage (#429)

### TL;DR

Exclude CLI code from test coverage metrics for now. Will add tests when we're done sprinting 10000 mph 

![Added via Giphy](https://media4.giphy.com/media/v1.Y2lkPWM5NDg3NzQzOTNudmtpNXcyazNnZWo2enIzem5neXR2a3l0cGx5aWFlbDB6ZTA1dyZlcD12MV9naWZzX3NlYXJjaCZjdD1n/sRKg9r2YWeCTG5JTTo/giphy.gif)

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

* **Tests**
  * Adjusted test coverage collection to exclude non-critical CLI components, resulting in more accurate coverage metrics for core functionality.

* **Chores**
  * Updated coverage reporting configuration to align with the new exclusion rules, ensuring consistent results across local and CI runs.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* Add workflow commands to CLI (#424)

### TL;DR

Added workflow management commands to the MCP Agent CLI, including describe, suspend, resume, and cancel operations.

### What changed?

- Added four new workflow management commands:
    - `describe_workflow`: Shows detailed information about a workflow execution
    - `suspend_workflow`: Pauses a running workflow execution
    - `resume_workflow`: Resumes a previously suspended workflow
    - `cancel_workflow`: Permanently stops a workflow execution
- Implemented corresponding API client methods in `WorkflowAPIClient`:
    - `suspend_workflow`
    - `resume_workflow`
    - `cancel_workflow`
- Updated the CLI structure to expose these commands under `mcp-agent cloud workflows`
- Added an alias for `describe_workflow` as `status` for backward compatibility

### How to test?

Test the new workflow commands with a running workflow:

```
# Get workflow details
mcp-agent cloud workflows describe run_abc123
mcp-agent cloud workflows status run_abc123  # alias

# Suspend a workflow
mcp-agent cloud workflows suspend run_abc123

# Resume a workflow (with optional payload)
mcp-agent cloud workflows resume run_abc123
mcp-agent cloud workflows resume run_abc123 --payload '{"data": "value"}'

# Cancel a workflow (with optional reason)
mcp-agent cloud workflows cancel run_abc123
mcp-agent cloud workflows cancel run_abc123 --reason "User requested cancellation"
```

### Why make this change?

These commands provide essential workflow lifecycle management capabilities to users, allowing them to monitor and control workflow executions through the CLI. The ability to suspend, resume, and cancel workflows gives users more control over long-running operations and helps manage resources more efficiently.

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

- New Features
  - Introduced “workflows” CLI group with commands: describe (alias: status), resume, suspend, and cancel.
  - Describe supports text, JSON, and YAML output; all commands work with server ID or URL and include improved error messages.

- Refactor
  - Renamed CLI group from “workflow” to “workflows” and reorganized command registrations.
  - Consolidated internal utility imports (no behavior change).

- Chores
  - Updated module descriptions.
  - Removed legacy workflow status package/exports in favor of the new workflows commands.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* add servers workflow subcommand (#428)

# Add servers workflows subcommand

This PR adds a new `workflows` subcommand to the `mcp-agent cloud servers` command that allows users to list workflows associated with a specific server. The command supports:

- Filtering by workflow status
- Limiting the number of results
- Multiple output formats (text, JSON, YAML)
- Accepting server IDs, app config IDs, or server URLs as input

Examples:
```
mcp-agent cloud servers workflows app_abc123
mcp-agent cloud servers workflows https://server.example.com --status running
mcp-agent cloud servers workflows apcnf_xyz789 --limit 10 --format json
```

The PR also cleans up the examples in the existing workflow commands and adds the necessary API client support for listing workflows.

* add workflow list and runs (#430)

### TL;DR

Reorganized workflow commands

`mcp-agent cloud workflows runs`
`mcp-agent cloud workflows list`
`mcp-agent cloud server workflows` (alias of workflows list)

### What changed?

- Moved `list_workflows_for_server` from the servers module to the workflows module as `list_workflow_runs`
- Added new workflow commands: `list_workflows` and `list_workflow_runs`
- Updated CLI command structure to make workflows commands more intuitive
- Applied consistent code formatting with black across all server and workflow related files

### How to test?

Test the new and reorganized workflow commands:

```bash
# List available workflow definitions
mcp-agent cloud workflows list app_abc123

# List workflow runs (previously under servers workflows)
mcp-agent cloud workflows runs app_abc123

# Test with different output formats
mcp-agent cloud workflows list app_abc123 --format json
mcp-agent cloud workflows runs app_abc123 --format yaml

# Verify existing commands still work
mcp-agent cloud servers list
mcp-agent cloud workflows describe app_abc123 run_xyz789
```

* [ez] Move deploy command to cloud namespace (#431)

### TL;DR

Added `cloud deploy` command as an alias for the existing `deploy` command.

* First pass at implementing the mcp-agent CLI (#409)

* Initial scaffolding

* initial CLI

* checkpoint

* checkpoint 2

* various updates to cli

* fix lint and format

* fix: should load secrets.yaml template instead when running init cli command

* fix: prevent None values in either mcp-agent secrets and config yaml files from overwriting one another when merging both

* fix: when running config check, use get_settings() instead of Settings() to ensure settings are loaded.

* fix: handle None values for servers in MCPSettings so it defaults to empty dict and update secrets.yaml template so it does not overwrite mcp servers in config

* Inform users to save and close editor to continue when running config edit command

* fix: Update openai, anthropic and azure regex for keys cli command

* Sort model list by provider and model name

* Add filtering support for models list cli command

* disable untested commands

* lint, format, gen_schema

* get rid of accidental otlp exporter changes from another branch

* get rid of accidental commit from other branch

---------

Co-authored-by: StreetLamb <[email protected]>

* Docs MVP (#436)

* Initial scaffolding

* initial CLI

* checkpoint

* checkpoint 2

* various updates to cli

* fix lint and format

* fix: should load secrets.yaml template instead when running init cli command

* fix: prevent None values in either mcp-agent secrets and config yaml files from overwriting one another when merging both

* fix: when running config check, use get_settings() instead of Settings() to ensure settings are loaded.

* fix: handle None values for servers in MCPSettings so it defaults to empty dict and update secrets.yaml template so it does not overwrite mcp servers in config

* Inform users to save and close editor to continue when running config edit command

* fix: Update openai, anthropic and azure regex for keys cli command

* Sort model list by provider and model name

* Add filtering support for models list cli command

* disable untested commands

* Fixes to docs

* Updating the main.py and !developer_secrets for secrets

* updating python entry files to main.py

* Fix tracer.py

---------

Co-authored-by: StreetLamb <[email protected]>
Co-authored-by: Andrew Hoh <[email protected]>

* fix: max complete token for openai gen structured (#438)

* Fix regression in CLI ("cloud cloud")

* docs fixes

* Fix top-level cli cloud commands (deploy, login, etc)

* Add eager tool validation to ensure json serializability of input params/result types

* More docs updates

* Refactor workflow runs list to use MCP tool calls (#439)

### TL;DR

Refactored the workflow runs listing command to use MCP tool calls instead of direct API client calls.

### What changed?

- Replaced the direct API client approach with MCP tool calls to retrieve workflow runs
- Added a new `_list_workflow_runs_async` function that uses the MCP App and gen_client to communicate with the server
- Improved status filtering and display logic to work with both object and dictionary response formats
- Enhanced error handling and formatting of workflow run information
- Updated the workflow data processing to handle different response formats more robustly

### How to test?

```bash
# List workflow runs from a server
mcp-agent cloud workflows runs <server_id_or_url>

# Filter by status
mcp-agent cloud workflows runs <server_id_or_url> --status running

# Limit results
mcp-agent cloud workflows runs <server_id_or_url> --limit 10

# Change output format
mcp-agent cloud workflows runs <server_id_or_url> --format json
```

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

- New Features
  - Add status filtering for workflow runs, with common aliases (e.g., timeout → timed_out).
  - Add an optional limit to constrain the number of results.
  - Allow server selection via direct URL or config-based server ID.

- Refactor
  - Update text output: columns now show Workflow ID, Name, Status, Run ID, Created; Principal removed.
  - Improve date formatting and consistent JSON/YAML/Text rendering.

- Bug Fixes
  - Clearer error messages and safer handling when server info is missing or no data is returned.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* Update workflows commands UI to be more consistant with the rest of the CLI (#432)

### TL;DR

Improved CLI workflow command output formatting with better visual indicators and consistent styling.

### How to test?

```
mcp-agent cloud workflows cancel <run-id>
mcp-agent cloud workflows describe <run-id>
mcp-agent cloud workflows resume <run-id>
```

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

* **Style**
  * Cancel workflow: added a blank line before the status and changed the success icon to 🚫 (yellow).
  * Describe workflow: replaced panel UI with a clean, header-based text layout (“🔍 Workflow Details”), showing name with colorized status and fields for Workflow ID, Run ID, and Created. Updated status indicators with emojis and colors; timestamp is now plain text on its own line.
  * Resume workflow: success message now applies consistent coloring to the entire line for improved readability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

* Feature: Update Workflow Tool Calls to Work with workflow_id (#435)

* Support for workflow_id and run_id

* Update temporal workflow registry

* tests

* Update LLMS.txt

* Fix config

* Return bool for cancel result

* Validate ids provided

* Fix cancel workflow id

* Fix workflows-resume response

* Add workflow-name specific resume and cancel tools

* Fix return type

* Fix examples

* Remove redundant workflows-{name}-tool tool calls

* Add _workflow_status back

* Use registry helper

* Changes from review

* Add back evaluator_optimizer enum fix

* Fix a hang that can happen at shutdown (#440)

* Fix a shutdown hang

* Fix tests

* fix taskgroup closed in a different context than when it was started in error

* some PR feedback fixes

* PR feedback

* Fix random failures of server aggregator not found for agent in temporal (#441)

* Fix a shutdown hang

* Fix tests

* fix taskgroup closed in a different context than when it was started in error

* some PR feedback fixes

* Fix random failures of server aggregator not found for agent in temporal environment

* Bump pyproject version

* Fix gateway URL resolution (#443)

* Fix gateway URL resolution

Removed incorrect dependence on ServerRegistry for gateway URLs; the gateway is not an MCP server.
App server (src/mcp_agent/server/app_server.py) builds workflow memo with:
- gateway_url precedence: X-MCP-Gateway-URL or X-Forwarded-Url → reconstruct X-Forwarded-Proto/Host/Prefix → request.base_url → MCP_GATEWAY_URL env.
- gateway_token precedence: X-MCP-Gateway-Token → MCP_GATEWAY_TOKEN env.
Worker-side (SystemActivities/SessionProxy) uses memo.gateway_url and gateway_token; falls back to worker env.
Client proxy helpers (src/mcp_agent/mcp/client_proxy.py):
- _resolve_gateway_url: explicit param → context → env → local default.
- Updated public signatures to drop server_registry parameter.

* Cloud/deployable temporal example (#395)

* Move workflows to workflows.py file

* Fix router example

* Add remaining dependencies

* Update orchestrator to @app.async_tool example

* Changes from review

* Fix interactive_workflow to be runnable via tool

* Fix resume tool params

* Fix: Use helpful typer and invoke for root cli commands (#444)

* Use helpful typer and invoke for root cli commands

* Fix lint

* Fix enum check (#445)

* Fix/swap relative mcp agent dependency on deploy (#446)

* Update wrangler wrapper to handle requirements.txt processing

* Fix backup handling

* pass api key to workflow (#447)

* pass api key to workflow

* guard against settings not existing

---------

Co-authored-by: John Corbett <[email protected]>
Co-authored-by: Sarmad Qadri <[email protected]>
Co-authored-by: StreetLamb <[email protected]>
Co-authored-by: Yi <[email protected]>
Co-authored-by: Ryan Holinshead <[email protected]>
Co-authored-by: roman-van-der-krogt <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants