diff --git a/src/mcp_agent/cli/cloud/commands/servers/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/__init__.py index 907613d43..40bd4cf1f 100644 --- a/src/mcp_agent/cli/cloud/commands/servers/__init__.py +++ b/src/mcp_agent/cli/cloud/commands/servers/__init__.py @@ -3,9 +3,11 @@ from .list.main import list_servers from .describe.main import describe_server from .delete.main import delete_server +from .workflows.main import list_workflows_for_server __all__ = [ "list_servers", "describe_server", "delete_server", + "list_workflows_for_server", ] \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/commands/servers/list/main.py b/src/mcp_agent/cli/cloud/commands/servers/list/main.py index a184375c8..daf39a4ef 100644 --- a/src/mcp_agent/cli/cloud/commands/servers/list/main.py +++ b/src/mcp_agent/cli/cloud/commands/servers/list/main.py @@ -28,16 +28,13 @@ def list_servers( """List MCP Servers with optional filtering and sorting. Examples: - # Filter servers containing 'api' + mcp-agent cloud servers list --filter api - # Sort by creation date (newest first) mcp-agent cloud servers list --sort-by -created - # Filter active servers and sort by name mcp-agent cloud servers list --filter active --sort-by name - # Get JSON output with filtering mcp-agent cloud servers list --filter production --format json """ validate_output_format(format) diff --git a/src/mcp_agent/cli/cloud/commands/servers/workflows/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/workflows/__init__.py new file mode 100644 index 000000000..1c9260a18 --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/servers/workflows/__init__.py @@ -0,0 +1,7 @@ +"""Workflows subcommand for servers.""" + +from .main import list_workflows_for_server + +__all__ = [ + "list_workflows_for_server", +] \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/commands/servers/workflows/main.py b/src/mcp_agent/cli/cloud/commands/servers/workflows/main.py new file mode 100644 index 000000000..b133a881a --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/servers/workflows/main.py @@ -0,0 +1,180 @@ +"""Server workflows command implementation.""" + +import json +from typing import Optional + +import typer +import yaml +from rich.table import Table + +from mcp_agent.cli.core.utils import run_async +from mcp_agent.cli.mcp_app.api_client import WorkflowExecutionStatus +from ...utils import ( + setup_authenticated_client, + validate_output_format, + handle_server_api_errors, + resolve_server, +) +from mcp_agent.cli.utils.ux import console, print_info + + +@handle_server_api_errors +def list_workflows_for_server( + server_id_or_url: str = typer.Argument(..., help="Server ID, app config ID, or server URL to list workflows for"), + limit: Optional[int] = typer.Option(None, "--limit", help="Maximum number of results to return"), + status: Optional[str] = typer.Option(None, "--status", help="Filter by status: running|failed|timed_out|canceled|terminated|completed|continued"), + format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"), +) -> None: + """List workflows for an MCP Server. + + Examples: + + mcp-agent cloud servers workflows app_abc123 + + mcp-agent cloud servers workflows https://server.example.com --status running + + mcp-agent cloud servers workflows apcnf_xyz789 --limit 10 --format json + """ + validate_output_format(format) + client = setup_authenticated_client() + + if server_id_or_url.startswith(('http://', 'https://')): + resolved_server = resolve_server(client, server_id_or_url) + + if hasattr(resolved_server, 'appId'): + app_id_or_config_id = resolved_server.appId + elif hasattr(resolved_server, 'appConfigurationId'): + app_id_or_config_id = resolved_server.appConfigurationId + else: + raise ValueError(f"Could not extract app ID or config ID from server: {server_id_or_url}") + else: + app_id_or_config_id = server_id_or_url + + max_results = limit or 100 + + status_filter = None + if status: + status_map = { + "running": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING, + "failed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED, + "timed_out": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, + "timeout": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, # alias + "canceled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, + "cancelled": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, # alias + "terminated": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED, + "completed": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED, + "continued": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW, + "continued_as_new": WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW, + } + status_filter = status_map.get(status.lower()) + if not status_filter: + valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new" + raise typer.BadParameter(f"Invalid status '{status}'. Valid options: {valid_statuses}") + + async def list_workflows_async(): + return await client.list_workflows( + app_id_or_config_id=app_id_or_config_id, + max_results=max_results + ) + + response = run_async(list_workflows_async()) + workflows = response.workflows or [] + + if status_filter: + workflows = [w for w in workflows if w.execution_status == status_filter] + + if format == "json": + _print_workflows_json(workflows) + elif format == "yaml": + _print_workflows_yaml(workflows) + else: + _print_workflows_text(workflows, status, server_id_or_url) + + +def _print_workflows_text(workflows, status_filter, server_id_or_url): + """Print workflows in text format.""" + server_name = server_id_or_url + + console.print(f"\n[bold blue]📊 Workflows for Server: {server_name}[/bold blue]") + + if not workflows: + print_info("No workflows found for this server.") + return + + console.print(f"\nFound {len(workflows)} workflow(s):") + + table = Table(show_header=True, header_style="bold blue") + table.add_column("Workflow ID", style="cyan", width=20) + table.add_column("Name", style="green", width=20) + table.add_column("Status", style="yellow", width=15) + table.add_column("Run ID", style="dim", width=15) + table.add_column("Created", style="dim", width=20) + table.add_column("Principal", style="dim", width=15) + + for workflow in workflows: + status_display = _get_status_display(workflow.execution_status) + created_display = workflow.created_at.strftime('%Y-%m-%d %H:%M:%S') if workflow.created_at else "N/A" + run_id_display = _truncate_string(workflow.run_id or "N/A", 15) + + table.add_row( + _truncate_string(workflow.workflow_id, 20), + _truncate_string(workflow.name, 20), + status_display, + run_id_display, + created_display, + _truncate_string(workflow.principal_id, 15), + ) + + console.print(table) + + if status_filter: + console.print(f"\n[dim]Filtered by status: {status_filter}[/dim]") + + +def _print_workflows_json(workflows): + """Print workflows in JSON format.""" + workflows_data = [_workflow_to_dict(workflow) for workflow in workflows] + print(json.dumps({"workflows": workflows_data}, indent=2, default=str)) + + +def _print_workflows_yaml(workflows): + """Print workflows in YAML format.""" + workflows_data = [_workflow_to_dict(workflow) for workflow in workflows] + print(yaml.dump({"workflows": workflows_data}, default_flow_style=False)) + + +def _workflow_to_dict(workflow): + """Convert WorkflowInfo to dictionary.""" + return { + "workflow_id": workflow.workflow_id, + "run_id": workflow.run_id, + "name": workflow.name, + "created_at": workflow.created_at.isoformat() if workflow.created_at else None, + "principal_id": workflow.principal_id, + "execution_status": workflow.execution_status.value if workflow.execution_status else None, + } + + +def _truncate_string(text: str, max_length: int) -> str: + """Truncate string to max_length, adding ellipsis if truncated.""" + if len(text) <= max_length: + return text + return text[:max_length-3] + "..." + + +def _get_status_display(status): + """Convert WorkflowExecutionStatus to display string with emoji.""" + if not status: + return "❓ Unknown" + + status_map = { + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING: "[green]🟢 Running[/green]", + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED: "[blue]✅ Completed[/blue]", + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED: "[red]❌ Failed[/red]", + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED: "[yellow]🟡 Canceled[/yellow]", + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED: "[red]🔴 Terminated[/red]", + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: "[orange]⏰ Timed Out[/orange]", + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: "[purple]🔄 Continued[/purple]", + } + + return status_map.get(status, "❓ Unknown") \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py b/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py index 5131c6b58..fcc5524e7 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py @@ -81,7 +81,9 @@ def cancel_workflow( cannot be resumed and will be marked as cancelled. Examples: + mcp-agent cloud workflows cancel app_abc123 run_xyz789 - mcp-agent cloud workflows cancel https://server.example.com run_xyz789 --reason "User requested cancellation" + + mcp-agent cloud workflows cancel app_abc123 run_xyz789 --reason "User requested" """ run_async(_cancel_workflow_async(server_id_or_url, run_id, reason)) diff --git a/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py b/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py index 4e7a9f388..b06db73cf 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py @@ -86,8 +86,10 @@ def describe_workflow( creation time, and other metadata. Examples: + mcp-agent cloud workflows describe app_abc123 run_xyz789 - mcp-agent cloud workflows describe https://server.example.com run_xyz789 --format json + + mcp-agent cloud workflows describe app_abc123 run_xyz789 --format json """ if format not in ["text", "json", "yaml"]: console.print("[red]Error: --format must be 'text', 'json', or 'yaml'[/red]") diff --git a/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py b/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py index 4fea41fdd..19e479900 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py @@ -86,8 +86,11 @@ def resume_workflow( a payload (JSON or text) to pass data to the resumed workflow. Examples: + mcp-agent cloud workflows resume app_abc123 run_xyz789 - mcp-agent cloud workflows resume https://server.example.com run_xyz789 --payload '{"data": "value"}' + + mcp-agent cloud workflows resume app_abc123 run_xyz789 --payload '{"data": "value"}' + mcp-agent cloud workflows resume app_abc123 run_xyz789 --payload "simple text" """ if payload: diff --git a/src/mcp_agent/cli/cloud/main.py b/src/mcp_agent/cli/cloud/main.py index 8de0dfe42..ac18d3086 100644 --- a/src/mcp_agent/cli/cloud/main.py +++ b/src/mcp_agent/cli/cloud/main.py @@ -37,6 +37,7 @@ list_servers, describe_server, delete_server, + list_workflows_for_server, ) from mcp_agent.cli.exceptions import CLIError from mcp_agent.cli.utils.ux import print_error @@ -163,6 +164,7 @@ def invoke(self, ctx): app_cmd_servers.command(name="list")(list_servers) app_cmd_servers.command(name="describe")(describe_server) app_cmd_servers.command(name="delete")(delete_server) +app_cmd_servers.command(name="workflows")(list_workflows_for_server) app.add_typer(app_cmd_servers, name="servers", help="Manage MCP Servers") # Alias for servers - apps should behave identically diff --git a/src/mcp_agent/cli/mcp_app/api_client.py b/src/mcp_agent/cli/mcp_app/api_client.py index cace3cb8b..892cf3258 100644 --- a/src/mcp_agent/cli/mcp_app/api_client.py +++ b/src/mcp_agent/cli/mcp_app/api_client.py @@ -3,6 +3,7 @@ from datetime import datetime from typing import Any, Dict, List, Literal, Optional, Union from urllib.parse import urlparse +from enum import Enum from pydantic import BaseModel @@ -63,6 +64,35 @@ class CanDoActionsResponse(BaseModel): canDoActions: Optional[List[CanDoActionCheck]] = [] +class WorkflowExecutionStatus(Enum): + """From temporal.api.enums.v1.WorkflowExecutionStatus""" + WORKFLOW_EXECUTION_STATUS_UNSPECIFIED = "WORKFLOW_EXECUTION_STATUS_UNSPECIFIED" + WORKFLOW_EXECUTION_STATUS_RUNNING = "WORKFLOW_EXECUTION_STATUS_RUNNING" + WORKFLOW_EXECUTION_STATUS_FAILED = "WORKFLOW_EXECUTION_STATUS_FAILED" + WORKFLOW_EXECUTION_STATUS_TIMED_OUT = "WORKFLOW_EXECUTION_STATUS_TIMED_OUT" + WORKFLOW_EXECUTION_STATUS_CANCELED = "WORKFLOW_EXECUTION_STATUS_CANCELED" + WORKFLOW_EXECUTION_STATUS_TERMINATED = "WORKFLOW_EXECUTION_STATUS_TERMINATED" + WORKFLOW_EXECUTION_STATUS_COMPLETED = "WORKFLOW_EXECUTION_STATUS_COMPLETED" + WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW = "WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW" + + +class WorkflowInfo(BaseModel): + """Information about a workflow execution instance""" + workflow_id: str + run_id: Optional[str] = None + name: str + created_at: datetime + principal_id: str + execution_status: Optional[WorkflowExecutionStatus] = None + + +class ListWorkflowsResponse(BaseModel): + """Response for listing workflows""" + workflows: Optional[List[WorkflowInfo]] = [] + next_page_token: Optional[str] = None + total_count: Optional[int] = 0 + + APP_ID_PREFIX = "app_" APP_CONFIG_ID_PREFIX = "apcnf_" @@ -464,6 +494,47 @@ async def list_app_configurations( response = await self.post("/mcp_app/list_app_configurations", payload) return ListAppConfigurationsResponse(**response.json()) + async def list_workflows( + self, + app_id_or_config_id: str, + name_filter: Optional[str] = None, + max_results: int = 100, + page_token: Optional[str] = None, + ) -> ListWorkflowsResponse: + """List workflows for a specific app or app configuration. + + Args: + app_id_or_config_id: The app ID (e.g. app_abc123) or app config ID (e.g. apcnf_xyz789) + name_filter: Optional workflow name filter + max_results: Maximum number of results to return + page_token: Pagination token + + Returns: + ListWorkflowsResponse: The list of workflows + + Raises: + ValueError: If the app_id_or_config_id is invalid + httpx.HTTPError: If the API request fails + """ + payload: Dict[str, Any] = { + "max_results": max_results, + } + + if is_valid_app_id_format(app_id_or_config_id): + payload["app_specifier"] = {"app_id": app_id_or_config_id} + elif is_valid_app_config_id_format(app_id_or_config_id): + payload["app_specifier"] = {"app_config_id": app_id_or_config_id} + else: + raise ValueError(f"Invalid app ID or app config ID format: {app_id_or_config_id}. Expected format: app_xxx or apcnf_xxx") + + if name_filter: + payload["name"] = name_filter + if page_token: + payload["page_token"] = page_token + + response = await self.post("/workflow/list", payload) + return ListWorkflowsResponse(**response.json()) + async def delete_app(self, app_id: str) -> str: """Delete an MCP App via the API.