diff --git a/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py b/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py index de2935c30..2f7fa10f6 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py @@ -7,18 +7,89 @@ import yaml from rich.table import Table +from mcp_agent.app import MCPApp from mcp_agent.cli.core.utils import run_async -from mcp_agent.cli.mcp_app.api_client import WorkflowExecutionStatus +from mcp_agent.cli.exceptions import CLIError +from mcp_agent.config import MCPServerSettings, Settings, LoggerSettings +from mcp_agent.mcp.gen_client import gen_client from ...utils import ( setup_authenticated_client, validate_output_format, - handle_server_api_errors, resolve_server, ) from mcp_agent.cli.utils.ux import console, print_info -@handle_server_api_errors +async def _list_workflow_runs_async( + server_id_or_url: str, limit: Optional[int], status: Optional[str], format: str +) -> None: + """List workflow runs using MCP tool calls to a deployed server.""" + if server_id_or_url.startswith(("http://", "https://")): + server_url = server_id_or_url + else: + client = setup_authenticated_client() + server = resolve_server(client, server_id_or_url) + + if hasattr(server, "appServerInfo") and server.appServerInfo: + server_url = server.appServerInfo.serverUrl + else: + raise CLIError( + f"Server '{server_id_or_url}' is not deployed or has no server URL" + ) + + if not server_url: + raise CLIError(f"No server URL found for server '{server_id_or_url}'") + + quiet_settings = Settings(logger=LoggerSettings(level="error")) + app = MCPApp(name="workflows_cli", settings=quiet_settings) + + try: + async with app.run() as workflow_app: + context = workflow_app.context + + sse_url = ( + f"{server_url}/sse" if not server_url.endswith("/sse") else server_url + ) + context.server_registry.registry["workflow_server"] = MCPServerSettings( + name="workflow_server", + description=f"Deployed MCP server {server_url}", + url=sse_url, + transport="sse", + ) + + async with gen_client( + "workflow_server", server_registry=context.server_registry + ) as client: + result = await client.call_tool("workflows-runs-list", {}) + + workflows_data = result.content[0].text if result.content else [] + if isinstance(workflows_data, str): + workflows_data = json.loads(workflows_data) + + if not workflows_data: + workflows_data = [] + + workflows = workflows_data + if status: + status_filter = _get_status_filter(status) + workflows = [w for w in workflows if _matches_status(w, status_filter)] + + if limit: + workflows = workflows[:limit] + + if format == "json": + _print_workflows_json(workflows) + elif format == "yaml": + _print_workflows_yaml(workflows) + else: + _print_workflows_text(workflows, status, server_id_or_url) + + except Exception as e: + raise CLIError( + f"Error listing workflow runs for server {server_id_or_url}: {str(e)}" + ) from e + + def list_workflow_runs( server_id_or_url: str = typer.Argument( ..., help="Server ID, app config ID, or server URL to list workflow runs for" @@ -46,62 +117,44 @@ def list_workflow_runs( mcp-agent cloud workflows runs apcnf_xyz789 --limit 10 --format json """ validate_output_format(format) - client = setup_authenticated_client() + run_async(_list_workflow_runs_async(server_id_or_url, limit, status, format)) - if server_id_or_url.startswith(("http://", "https://")): - resolved_server = resolve_server(client, server_id_or_url) - - if hasattr(resolved_server, "appId"): - app_id_or_config_id = resolved_server.appId - elif hasattr(resolved_server, "appConfigurationId"): - app_id_or_config_id = resolved_server.appConfigurationId - else: - raise ValueError( - f"Could not extract app ID or config ID from server: {server_id_or_url}" - ) - else: - app_id_or_config_id = server_id_or_url - - max_results = limit or 100 - - status_filter = None - if status: - status_map = { - "running": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING, - "failed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED, - "timed_out": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, - "timeout": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, # alias - "canceled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, - "cancelled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, # alias - "terminated": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED, - "completed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED, - "continued": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW, - "continued_as_new": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW, - } - status_filter = status_map.get(status.lower()) - if not status_filter: - valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new" - raise typer.BadParameter( - f"Invalid status '{status}'. Valid options: {valid_statuses}" - ) - async def list_workflows_async(): - return await client.list_workflows( - app_id_or_config_id=app_id_or_config_id, max_results=max_results +def _get_status_filter(status: str) -> str: + """Convert status string to normalized status.""" + status_map = { + "running": "running", + "failed": "failed", + "timed_out": "timed_out", + "timeout": "timed_out", # alias + "canceled": "canceled", + "cancelled": "canceled", # alias + "terminated": "terminated", + "completed": "completed", + "continued": "continued", + "continued_as_new": "continued", + } + normalized_status = status_map.get(status.lower()) + if not normalized_status: + valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new" + raise typer.BadParameter( + f"Invalid status '{status}'. Valid options: {valid_statuses}" ) + return normalized_status - response = run_async(list_workflows_async()) - workflows = response.workflows or [] - - if status_filter: - workflows = [w for w in workflows if w.execution_status == status_filter] - if format == "json": - _print_workflows_json(workflows) - elif format == "yaml": - _print_workflows_yaml(workflows) - else: - _print_workflows_text(workflows, status, server_id_or_url) +def _matches_status(workflow: dict, status_filter: str) -> bool: + """Check if workflow matches the status filter. + + Note: We use string-based matching instead of protobuf enum values because + the MCP tool response format returns status as strings, not enum objects. + This approach is more flexible and doesn't require maintaining sync with + the protobuf definitions. + """ + workflow_status = workflow.get("execution_status", "") + if isinstance(workflow_status, str): + return status_filter.lower() in workflow_status.lower() + return False def _print_workflows_text(workflows, status_filter, server_id_or_url): @@ -124,24 +177,44 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url): table.add_column("Status", style="yellow", width=15) table.add_column("Run ID", style="dim", width=15) table.add_column("Created", style="dim", width=20) - table.add_column("Principal", style="dim", width=15) for workflow in workflows: - status_display = _get_status_display(workflow.execution_status) - created_display = ( - workflow.created_at.strftime("%Y-%m-%d %H:%M:%S") - if workflow.created_at - else "N/A" - ) - run_id_display = _truncate_string(workflow.run_id or "N/A", 15) + if isinstance(workflow, dict): + workflow_id = workflow.get("workflow_id", "N/A") + name = workflow.get("name", "N/A") + execution_status = workflow.get("execution_status", "N/A") + run_id = workflow.get("run_id", "N/A") + created_at = workflow.get("created_at", "N/A") + else: + workflow_id = getattr(workflow, "workflow_id", "N/A") + name = getattr(workflow, "name", "N/A") + execution_status = getattr(workflow, "execution_status", "N/A") + run_id = getattr(workflow, "run_id", "N/A") + created_at = getattr(workflow, "created_at", "N/A") + + status_display = _get_status_display(execution_status) + + if created_at and created_at != "N/A": + if hasattr(created_at, "strftime"): + created_display = created_at.strftime("%Y-%m-%d %H:%M:%S") + else: + try: + from datetime import datetime + dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00")) + created_display = dt.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError): + created_display = str(created_at) + else: + created_display = "N/A" + + run_id_display = _truncate_string(str(run_id) if run_id else "N/A", 15) table.add_row( - _truncate_string(workflow.workflow_id, 20), - _truncate_string(workflow.name, 20), + _truncate_string(str(workflow_id) if workflow_id else "N/A", 20), + _truncate_string(str(name) if name else "N/A", 20), status_display, run_id_display, created_display, - _truncate_string(workflow.principal_id, 15), ) console.print(table) @@ -163,16 +236,16 @@ def _print_workflows_yaml(workflows): def _workflow_to_dict(workflow): - """Convert WorkflowInfo to dictionary.""" + """Convert workflow dict to standardized dictionary format.""" + if isinstance(workflow, dict): + return workflow + return { - "workflow_id": workflow.workflow_id, - "run_id": workflow.run_id, - "name": workflow.name, - "created_at": workflow.created_at.isoformat() if workflow.created_at else None, - "principal_id": workflow.principal_id, - "execution_status": workflow.execution_status.value - if workflow.execution_status - else None, + "workflow_id": getattr(workflow, "workflow_id", None), + "run_id": getattr(workflow, "run_id", None), + "name": getattr(workflow, "name", None), + "created_at": getattr(workflow, "created_at", None).isoformat() if getattr(workflow, "created_at", None) else None, + "execution_status": getattr(workflow, "execution_status", None).value if getattr(workflow, "execution_status", None) else None, } @@ -184,18 +257,25 @@ def _truncate_string(text: str, max_length: int) -> str: def _get_status_display(status): - """Convert WorkflowExecutionStatus to display string with emoji.""" + """Convert status to display string with emoji.""" if not status: return "❓ Unknown" - - status_map = { - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING: "[green]🟢 Running[/green]", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED: "[blue]✅ Completed[/blue]", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED: "[red]❌ Failed[/red]", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED: "[yellow]🟡 Canceled[/yellow]", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED: "[red]🔴 Terminated[/red]", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: "[orange]⏰ Timed Out[/orange]", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: "[purple]🔄 Continued[/purple]", - } - - return status_map.get(status, "❓ Unknown") + + status_str = str(status).lower() + + if "running" in status_str: + return "[green]🟢 Running[/green]" + elif "completed" in status_str: + return "[blue]✅ Completed[/blue]" + elif "failed" in status_str or "error" in status_str: + return "[red]❌ Failed[/red]" + elif "cancel" in status_str: + return "[yellow]🟡 Canceled[/yellow]" + elif "terminat" in status_str: + return "[red]🔴 Terminated[/red]" + elif "timeout" in status_str or "timed_out" in status_str: + return "[orange]⏰ Timed Out[/orange]" + elif "continued" in status_str: + return "[purple]🔄 Continued[/purple]" + else: + return f"❓ {status}" diff --git a/src/mcp_agent/cli/mcp_app/api_client.py b/src/mcp_agent/cli/mcp_app/api_client.py index d3156cd17..48e289635 100644 --- a/src/mcp_agent/cli/mcp_app/api_client.py +++ b/src/mcp_agent/cli/mcp_app/api_client.py @@ -3,7 +3,6 @@ from datetime import datetime from typing import Any, Dict, List, Literal, Optional, Union from urllib.parse import urlparse -from enum import Enum from pydantic import BaseModel @@ -64,39 +63,6 @@ class CanDoActionsResponse(BaseModel): canDoActions: Optional[List[CanDoActionCheck]] = [] -class WorkflowExecutionStatus(Enum): - """From temporal.api.enums.v1.WorkflowExecutionStatus""" - - WORKFLOW_EXECUTION_STATUS_UNSPECIFIED = "WORKFLOW_EXECUTION_STATUS_UNSPECIFIED" - WORKFLOW_EXECUTION_STATUS_RUNNING = "WORKFLOW_EXECUTION_STATUS_RUNNING" - WORKFLOW_EXECUTION_STATUS_FAILED = "WORKFLOW_EXECUTION_STATUS_FAILED" - WORKFLOW_EXECUTION_STATUS_TIMED_OUT = "WORKFLOW_EXECUTION_STATUS_TIMED_OUT" - WORKFLOW_EXECUTION_STATUS_CANCELED = "WORKFLOW_EXECUTION_STATUS_CANCELED" - WORKFLOW_EXECUTION_STATUS_TERMINATED = "WORKFLOW_EXECUTION_STATUS_TERMINATED" - WORKFLOW_EXECUTION_STATUS_COMPLETED = "WORKFLOW_EXECUTION_STATUS_COMPLETED" - WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW = ( - "WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW" - ) - - -class WorkflowInfo(BaseModel): - """Information about a workflow execution instance""" - - workflow_id: str - run_id: Optional[str] = None - name: str - created_at: datetime - principal_id: str - execution_status: Optional[WorkflowExecutionStatus] = None - - -class ListWorkflowsResponse(BaseModel): - """Response for listing workflows""" - - workflows: Optional[List[WorkflowInfo]] = [] - next_page_token: Optional[str] = None - total_count: Optional[int] = 0 - APP_ID_PREFIX = "app_" APP_CONFIG_ID_PREFIX = "apcnf_" @@ -501,48 +467,6 @@ async def list_app_configurations( response = await self.post("/mcp_app/list_app_configurations", payload) return ListAppConfigurationsResponse(**response.json()) - async def list_workflows( - self, - app_id_or_config_id: str, - name_filter: Optional[str] = None, - max_results: int = 100, - page_token: Optional[str] = None, - ) -> ListWorkflowsResponse: - """List workflows for a specific app or app configuration. - - Args: - app_id_or_config_id: The app ID (e.g. app_abc123) or app config ID (e.g. apcnf_xyz789) - name_filter: Optional workflow name filter - max_results: Maximum number of results to return - page_token: Pagination token - - Returns: - ListWorkflowsResponse: The list of workflows - - Raises: - ValueError: If the app_id_or_config_id is invalid - httpx.HTTPError: If the API request fails - """ - payload: Dict[str, Any] = { - "max_results": max_results, - } - - if is_valid_app_id_format(app_id_or_config_id): - payload["app_specifier"] = {"app_id": app_id_or_config_id} - elif is_valid_app_config_id_format(app_id_or_config_id): - payload["app_specifier"] = {"app_config_id": app_id_or_config_id} - else: - raise ValueError( - f"Invalid app ID or app config ID format: {app_id_or_config_id}. Expected format: app_xxx or apcnf_xxx" - ) - - if name_filter: - payload["name"] = name_filter - if page_token: - payload["page_token"] = page_token - - response = await self.post("/workflow/list", payload) - return ListWorkflowsResponse(**response.json()) async def delete_app(self, app_id: str) -> str: """Delete an MCP App via the API.