Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 39 additions & 34 deletions LLMS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2311,16 +2311,21 @@ This implementation queries Temporal for workflow status and manages workflows.
**Function:** `TemporalWorkflowRegistry.unregister(self, run_id: str, workflow_id: str | None = None) -> None`


**Function:** `TemporalWorkflowRegistry.get_workflow(self, run_id: str, workflow_id: str | None = None) -> Optional['Workflow']`
**Function:** `TemporalWorkflowRegistry.get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']`

- **Description**: Get a workflow instance by run ID or workflow ID. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, returns the latest run for that workflow.

**Function:** `TemporalWorkflowRegistry.resume_workflow(self, run_id: str, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`
**Function:** `TemporalWorkflowRegistry.resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`

- **Description**: Resume a paused workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, resumes the latest run for that workflow.

**Function:** `TemporalWorkflowRegistry.cancel_workflow(self, run_id: str, workflow_id: str | None = None) -> bool`
**Function:** `TemporalWorkflowRegistry.cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool`

- **Description**: Cancel a running workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, cancels the latest run for that workflow.

**Function:** `TemporalWorkflowRegistry.get_workflow_status(self, run_id: str, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`
**Function:** `TemporalWorkflowRegistry.get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`

- **Description**: Get the status of a workflow run. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, returns status for the latest run for that workflow.


**Function:** `TemporalWorkflowRegistry.list_workflow_statuses(self) -> List[Dict[str, Any]]`
Expand Down Expand Up @@ -2656,45 +2661,45 @@ Provides a central place to register, look up, and manage workflow instances.
- **Returns**
- `None`: Return value

**Function:** `WorkflowRegistry.get_workflow(self, run_id: str, workflow_id: str | None = None) -> Optional['Workflow']`
**Function:** `WorkflowRegistry.get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']`

- **Description**: Get a workflow instance by run ID. Args: run_id: The unique ID for this specific workflow run. workflow_id: The ID of the workflow to retrieve Returns: The workflow instance, or None if not found
- **Description**: Get a workflow instance by run ID or workflow ID. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, returns the latest run for that workflow.
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run.
- `workflow_id` (str | None, optional): The ID of the workflow to retrieve
- `run_id` (str | None, optional): The unique ID for a specific workflow run to retrieve.
- `workflow_id` (str | None, optional): The ID of the workflow to retrieve.
- **Returns**
- `Optional['Workflow']`: The workflow instance, or None if not found

**Function:** `WorkflowRegistry.resume_workflow(self, run_id: str, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`
**Function:** `WorkflowRegistry.resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`

- **Description**: Resume a paused workflow. Args: run_id: The unique ID for this specific workflow run workflow_id: The ID of the workflow to resume signal_name: Name of the signal to send to the workflow (default is "resume") payload: Payload to send with the signal Returns: True if the resume signal was sent successfully, False otherwise
- **Description**: Resume a paused workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, resumes the latest run for that workflow.
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run
- `run_id` (str | None, optional): The unique ID for this specific workflow run
- `workflow_id` (str | None, optional): The ID of the workflow to resume
- `signal_name` (str | None, optional): Name of the signal to send to the workflow (default is "resume")
- `payload` (Any | None, optional): Payload to send with the signal
- **Returns**
- `bool`: True if the resume signal was sent successfully, False otherwise

**Function:** `WorkflowRegistry.cancel_workflow(self, run_id: str, workflow_id: str | None = None) -> bool`
**Function:** `WorkflowRegistry.cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool`

- **Description**: Cancel (terminate) a running workflow. Args: run_id: The unique ID for this specific workflow run workflow_id: The ID of the workflow to cancel Returns: True if the cancel signal was sent successfully, False otherwise
- **Description**: Cancel (terminate) a running workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, cancels the latest run for that workflow.
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run
- `run_id` (str | None, optional): The unique ID for this specific workflow run
- `workflow_id` (str | None, optional): The ID of the workflow to cancel
- **Returns**
- `bool`: True if the cancel signal was sent successfully, False otherwise

**Function:** `WorkflowRegistry.get_workflow_status(self, run_id: str, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`
**Function:** `WorkflowRegistry.get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`

- **Description**: Get the status of a workflow run. Args: run_id: The unique ID for this specific workflow run workflow_id: The ID of the workflow to cancel Returns: The last available workflow status if found, None otherwise
- **Description**: Get the status of a workflow run. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, returns status for the latest run for that workflow.
- **Parameters**
- `self`
- `run_id` (str): The unique ID for this specific workflow run
- `workflow_id` (str | None, optional): The ID of the workflow to cancel
- `run_id` (str | None, optional): The unique ID for this specific workflow run
- `workflow_id` (str | None, optional): The ID of the workflow to get status for
- **Returns**
- `Optional[Dict[str, Any]]`: The last available workflow status if found, None otherwise

Expand Down Expand Up @@ -2723,16 +2728,16 @@ Provides a central place to register, look up, and manage workflow instances.
**Function:** `InMemoryWorkflowRegistry.unregister(self, run_id: str, workflow_id: str | None = None) -> None`


**Function:** `InMemoryWorkflowRegistry.get_workflow(self, run_id: str, workflow_id: str | None = None) -> Optional['Workflow']`
**Function:** `InMemoryWorkflowRegistry.get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']`


**Function:** `InMemoryWorkflowRegistry.resume_workflow(self, run_id: str, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`
**Function:** `InMemoryWorkflowRegistry.resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool`


**Function:** `InMemoryWorkflowRegistry.cancel_workflow(self, run_id: str, workflow_id: str | None = None) -> bool`
**Function:** `InMemoryWorkflowRegistry.cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool`


**Function:** `InMemoryWorkflowRegistry.get_workflow_status(self, run_id: str, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`
**Function:** `InMemoryWorkflowRegistry.get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]`

Comment on lines +2731 to 2741
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

InMemory registry: implement deterministic latest-run and thread-safety.

Ensure latest-run selection follows the documented order and is deterministic; guard internal maps with a lock to avoid races under asyncio.

- def get_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:
+ def get_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional['Workflow']:
    ...
- def resume_workflow(self, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:
+ def resume_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: Any | None = None) -> bool:
    ...
- def cancel_workflow(self, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ def cancel_workflow(self, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:
    ...
- def get_workflow_status(self, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
+ def get_workflow_status(self, *, run_id: str | None = None, workflow_id: str | None = None) -> Optional[Dict[str, Any]]:
    ...
+ # Example selection helper
+ def _latest_run_id_for(self, workflow_id: str) -> str | None:
+   with self._lock:
+     runs = self._index.get(workflow_id, [])
+     if not runs:
+       return None
+     runs = sorted(
+       runs,
+       key=lambda r: (r.status.get("active", False), r.status.get("updated_at") or 0, r.run_id),
+       reverse=True,
+     )
+     return runs[0].run_id

Committable suggestion skipped: line range outside the PR's diff.


**Function:** `InMemoryWorkflowRegistry.list_workflow_statuses(self) -> List[Dict[str, Any]]`
Expand Down Expand Up @@ -4479,35 +4484,35 @@ Attributes:
- **Returns**
- `str`: The run ID of the started workflow run, which can be passed to workflows/get_status, workflows/resume, and workflows/cancel.

**Function:** `get_workflow_status(ctx: MCPContext, workflow_name: str, run_id: str) -> Dict[str, Any]`
**Function:** `get_workflow_status(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]`

- **Description**: Get the status of a running workflow. Provides detailed information about a workflow instance including its current state, whether it's running or completed, and any results or errors encountered. Args: workflow_name: The name of the workflow to check. run_id: The ID of the workflow instance to check, received from workflows/run or workflows/runs/list. Returns: A dictionary with comprehensive information about the workflow status.
- **Description**: Get the status of a running workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, returns status for the latest run for that workflow. Provides detailed information about a workflow instance including its current state, whether it's running or completed, and any results or errors encountered.
- **Parameters**
- `ctx` (MCPContext)
- `workflow_name` (str): The name of the workflow to check.
- `run_id` (str): The ID of the workflow instance to check, received from workflows/run or workflows/runs/list.
- `run_id` (str | None, optional): Optional run ID of the workflow to check. If omitted, the server will use the latest run for the workflow_id provided.
- `workflow_id` (str | None, optional): Optional workflow identifier (usually the tool/workflow name). If omitted, the server will infer it from the run metadata when possible.
- **Returns**
- `Dict[str, Any]`: A dictionary with comprehensive information about the workflow status.

**Function:** `resume_workflow(ctx: MCPContext, run_id: str, workflow_name: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool`
**Function:** `resume_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool`

- **Description**: Resume a paused workflow. Args: run_id: The ID of the workflow to resume, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to resume. signal_name: Optional name of the signal to send to resume the workflow. This will default to "resume", but can be a custom signal name if the workflow was paused on a specific signal. payload: Optional payload to provide the workflow upon resumption. For example, if a workflow is waiting for human input, this can be the human input. Returns: True if the workflow was resumed, False otherwise.
- **Description**: Resume a paused workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, resumes the latest run for that workflow.
- **Parameters**
- `ctx` (MCPContext)
- `run_id` (str): The ID of the workflow to resume, received from workflows/run or workflows/runs/list.
- `workflow_name` (str | None, optional): The name of the workflow to resume.
- `run_id` (str | None, optional): The ID of the workflow to resume, received from workflows/run or workflows/runs/list. If not specified, the latest run for the workflow_id will be used.
- `workflow_id` (str | None, optional): The ID of the workflow to resume, received from workflows/run or workflows/runs/list.
- `signal_name` (str | None, optional): Optional name of the signal to send to resume the workflow. This will default to "resume", but can be a custom signal name if the workflow was paused on a specific signal.
- `payload` (str | None, optional): Optional payload to provide the workflow upon resumption. For example, if a workflow is waiting for human input, this can be the human input.
- **Returns**
- `bool`: True if the workflow was resumed, False otherwise.

**Function:** `cancel_workflow(ctx: MCPContext, run_id: str, workflow_name: str | None = None) -> bool`
**Function:** `cancel_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> bool`

- **Description**: Cancel a running workflow. Args: run_id: The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list. workflow_name: The name of the workflow to cancel. Returns: True if the workflow was cancelled, False otherwise.
- **Description**: Cancel a running workflow. Either run_id or workflow_id must be provided. If workflow_id is provided without run_id, cancels the latest run for that workflow.
- **Parameters**
- `ctx` (MCPContext)
- `run_id` (str): The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list.
- `workflow_name` (str | None, optional): The name of the workflow to cancel.
- `run_id` (str | None, optional): The ID of the workflow instance to cancel, received from workflows/run or workflows/runs/list. If not provided, will attempt to cancel the latest run for the provided workflow ID.
- `workflow_id` (str | None, optional): The ID of the workflow to cancel, received from workflows/run or workflows/runs/list.
- **Returns**
Comment on lines +4487 to 4516
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Server endpoints: validate inputs, document precedence, and use keyword-only params

  • Enforce that at least one ID is provided; 400 on neither.
  • If both provided and conflict, 400 with message; otherwise prefer run_id.
  • Make params keyword-only to match registry.
  • Response should include which IDs were used and whether selection was “latest”.

Proposed signatures:

- async def get_workflow_status(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]:
+ async def get_workflow_status(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None) -> Dict[str, Any]:

- async def resume_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool:
+ async def resume_workflow(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None, signal_name: str | None = 'resume', payload: str | None = None) -> bool:

- async def cancel_workflow(ctx: MCPContext, run_id: str | None = None, workflow_id: str | None = None) -> bool:
+ async def cancel_workflow(ctx: MCPContext, *, run_id: str | None = None, workflow_id: str | None = None) -> bool:

Also clarify in docs that workflow_id refers to Workflow.id (workflow type instance ID), not a tool name.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In LLMS.txt around lines 4487-4516, the server endpoints for
get_workflow_status/resume_workflow/cancel_workflow need input validation and
clearer parameter semantics: make run_id and workflow_id keyword-only, return
400 if neither is provided, if both are provided validate they refer to the same
instance and return 400 on conflict (clear error message), otherwise prefer
run_id when both are present; when workflow_id is used without run_id resolve
the latest run and mark selection="latest" in the response; include in the
response which identifier was used (run_id or workflow_id) and document that
workflow_id refers to Workflow.id (the workflow type/instance ID), not a tool
name.

- `bool`: True if the workflow was cancelled, False otherwise.

Expand Down
10 changes: 3 additions & 7 deletions examples/mcp_agent_server/asyncio/mcp_agent.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ logger:
mcp:
servers:
fetch:
command: "/Users/saqadri/.local/bin/uvx"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

command: "uvx"
args: ["mcp-server-fetch"]
description: "Fetch content at URLs from the world wide web"
filesystem:
command: "/Users/saqadri/.nvm/versions/node/v20.3.0/bin/npx"
args: [
"-y",
"@modelcontextprotocol/server-filesystem",
# Current directory will be added by the code
]
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem"]
description: "Read and write files on the filesystem"

openai:
Expand Down
7 changes: 3 additions & 4 deletions src/mcp_agent/cli/cloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ def invoke(self, ctx):


# Deployment command
app.command(
name="deploy",
help="Deploy an MCP agent (alias for 'cloud deploy')"
)(deploy_config)
app.command(name="deploy", help="Deploy an MCP agent (alias for 'cloud deploy')")(
deploy_config
)


# Sub-typer for `mcp-agent apps` commands
Expand Down
Loading
Loading