diff --git a/src/mcp_agent/cli/cloud/commands/servers/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/__init__.py index 40bd4cf1f..03df48a2d 100644 --- a/src/mcp_agent/cli/cloud/commands/servers/__init__.py +++ b/src/mcp_agent/cli/cloud/commands/servers/__init__.py @@ -3,11 +3,9 @@ from .list.main import list_servers from .describe.main import describe_server from .delete.main import delete_server -from .workflows.main import list_workflows_for_server __all__ = [ "list_servers", - "describe_server", + "describe_server", "delete_server", - "list_workflows_for_server", -] \ No newline at end of file +] diff --git a/src/mcp_agent/cli/cloud/commands/servers/delete/main.py b/src/mcp_agent/cli/cloud/commands/servers/delete/main.py index 48d9ab04d..9afcaf6ab 100644 --- a/src/mcp_agent/cli/cloud/commands/servers/delete/main.py +++ b/src/mcp_agent/cli/cloud/commands/servers/delete/main.py @@ -1,4 +1,3 @@ - import typer from rich.panel import Panel @@ -17,21 +16,25 @@ @handle_server_api_errors def delete_server( - id_or_url: str = typer.Argument(..., help="Server ID or app configuration ID to delete"), - force: bool = typer.Option(False, "--force", "-f", help="Force deletion without confirmation prompt"), + id_or_url: str = typer.Argument( + ..., help="Server ID or app configuration ID to delete" + ), + force: bool = typer.Option( + False, "--force", "-f", help="Force deletion without confirmation prompt" + ), ) -> None: """Delete a specific MCP Server.""" client = setup_authenticated_client() server = resolve_server(client, id_or_url) - + # Determine server type and delete function if isinstance(server, MCPApp): server_type = "Deployed Server" delete_function = client.delete_app else: - server_type = "Configured Server" + server_type = "Configured Server" delete_function = client.delete_app_configuration - + server_name = get_server_name(server) server_id = get_server_id(server) @@ -47,8 +50,10 @@ def delete_server( expand=False, ) ) - - confirm = typer.confirm(f"\nAre you sure you want to delete this {server_type.lower()}?") + + confirm = typer.confirm( + f"\nAre you sure you want to delete this {server_type.lower()}?" + ) if not confirm: print_info("Deletion cancelled.") return @@ -57,14 +62,14 @@ def delete_server( can_delete = run_async(client.can_delete_app(server_id)) else: can_delete = run_async(client.can_delete_app_configuration(server_id)) - + if not can_delete: raise CLIError( f"You do not have permission to delete this {server_type.lower()}. " f"You can only delete servers that you created." ) deleted_id = run_async(delete_function(server_id)) - + console.print( Panel( f"[green]✅ Successfully deleted {server_type.lower()}[/green]\n\n" @@ -75,4 +80,3 @@ def delete_server( expand=False, ) ) - diff --git a/src/mcp_agent/cli/cloud/commands/servers/describe/main.py b/src/mcp_agent/cli/cloud/commands/servers/describe/main.py index ced0c8403..50ca0051c 100644 --- a/src/mcp_agent/cli/cloud/commands/servers/describe/main.py +++ b/src/mcp_agent/cli/cloud/commands/servers/describe/main.py @@ -9,7 +9,7 @@ from mcp_agent.cli.mcp_app.api_client import MCPApp, MCPAppConfiguration from ...utils import ( setup_authenticated_client, - validate_output_format, + validate_output_format, resolve_server, handle_server_api_errors, clean_server_status, @@ -17,10 +17,14 @@ from mcp_agent.cli.utils.ux import console -@handle_server_api_errors +@handle_server_api_errors def describe_server( - id_or_url: str = typer.Argument(..., help="Server ID or app configuration ID to describe"), - format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"), + id_or_url: str = typer.Argument( + ..., help="Server ID or app configuration ID to describe" + ), + format: Optional[str] = typer.Option( + "text", "--format", help="Output format (text|json|yaml)" + ), ) -> None: """Describe a specific MCP Server.""" validate_output_format(format) @@ -29,13 +33,17 @@ def describe_server( print_server_description(server, format) -def print_server_description(server: Union[MCPApp, MCPAppConfiguration], output_format: str = "text") -> None: +def print_server_description( + server: Union[MCPApp, MCPAppConfiguration], output_format: str = "text" +) -> None: """Print detailed description information for a server.""" - + valid_formats = ["text", "json", "yaml"] if output_format not in valid_formats: - raise CLIError(f"Invalid format '{output_format}'. Valid options are: {', '.join(valid_formats)}") - + raise CLIError( + f"Invalid format '{output_format}'. Valid options are: {', '.join(valid_formats)}" + ) + if output_format == "json": _print_server_json(server) elif output_format == "yaml": @@ -73,14 +81,15 @@ def _server_to_dict(server: Union[MCPApp, MCPAppConfiguration]) -> dict: server_description = server.app.description if server.app else None created_at = server.createdAt server_info = server.appServerInfo - underlying_app = { - "app_id": server.app.appId, - "name": server.app.name - } if server.app else None + underlying_app = ( + {"app_id": server.app.appId, "name": server.app.name} + if server.app + else None + ) status_raw = server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" server_url = server_info.serverUrl if server_info else None - + data = { "id": server_id, "name": server_name, @@ -88,15 +97,13 @@ def _server_to_dict(server: Union[MCPApp, MCPAppConfiguration]) -> dict: "status": clean_server_status(status_raw), "server_url": server_url, "description": server_description, - "created_at": created_at.isoformat() if created_at else None + "created_at": created_at.isoformat() if created_at else None, } - + if underlying_app: data["underlying_app"] = underlying_app - - return data - + return data def _print_server_text(server: Union[MCPApp, MCPAppConfiguration]) -> None: @@ -118,7 +125,7 @@ def _print_server_text(server: Union[MCPApp, MCPAppConfiguration]) -> None: status_text = "❓ Unknown" server_url = "N/A" - + if server_info: status_text = _server_status_text(server_info.status) server_url = server_info.serverUrl @@ -129,20 +136,24 @@ def _print_server_text(server: Union[MCPApp, MCPAppConfiguration]) -> None: f"Status: {status_text}", f"Server URL: [cyan]{server_url}[/cyan]", ] - + if server_description: content_lines.append(f"Description: [cyan]{server_description}[/cyan]") - + if created_at: - content_lines.append(f"Created: [cyan]{created_at.strftime('%Y-%m-%d %H:%M:%S')}[/cyan]") + content_lines.append( + f"Created: [cyan]{created_at.strftime('%Y-%m-%d %H:%M:%S')}[/cyan]" + ) if isinstance(server, MCPAppConfiguration) and server.app: - content_lines.extend([ - "", - "[bold]Underlying App:[/bold]", - f" App ID: [cyan]{server.app.appId}[/cyan]", - f" App Name: [cyan]{server.app.name}[/cyan]", - ]) + content_lines.extend( + [ + "", + "[bold]Underlying App:[/bold]", + f" App ID: [cyan]{server.app.appId}[/cyan]", + f" App Name: [cyan]{server.app.name}[/cyan]", + ] + ) console.print( Panel( @@ -161,4 +172,4 @@ def _server_status_text(status: str) -> str: elif status == "APP_SERVER_STATUS_OFFLINE": return "[red]🔴 Offline[/red]" else: - return "❓ Unknown" \ No newline at end of file + return "❓ Unknown" diff --git a/src/mcp_agent/cli/cloud/commands/servers/list/main.py b/src/mcp_agent/cli/cloud/commands/servers/list/main.py index daf39a4ef..d8d52fd3c 100644 --- a/src/mcp_agent/cli/cloud/commands/servers/list/main.py +++ b/src/mcp_agent/cli/cloud/commands/servers/list/main.py @@ -10,7 +10,7 @@ from mcp_agent.cli.mcp_app.api_client import MCPApp, MCPAppConfiguration from ...utils import ( setup_authenticated_client, - validate_output_format, + validate_output_format, handle_server_api_errors, clean_server_status, ) @@ -20,26 +20,38 @@ @handle_server_api_errors def list_servers( - limit: Optional[int] = typer.Option(None, "--limit", help="Maximum number of results to return"), - filter: Optional[str] = typer.Option(None, "--filter", help="Filter by name, description, or status (case-insensitive)"), - sort_by: Optional[str] = typer.Option(None, "--sort-by", help="Sort by field: name, created, status (prefix with - for reverse)"), - format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"), + limit: Optional[int] = typer.Option( + None, "--limit", help="Maximum number of results to return" + ), + filter: Optional[str] = typer.Option( + None, + "--filter", + help="Filter by name, description, or status (case-insensitive)", + ), + sort_by: Optional[str] = typer.Option( + None, + "--sort-by", + help="Sort by field: name, created, status (prefix with - for reverse)", + ), + format: Optional[str] = typer.Option( + "text", "--format", help="Output format (text|json|yaml)" + ), ) -> None: """List MCP Servers with optional filtering and sorting. - + Examples: - + mcp-agent cloud servers list --filter api - + mcp-agent cloud servers list --sort-by -created - + mcp-agent cloud servers list --filter active --sort-by name - + mcp-agent cloud servers list --filter production --format json """ validate_output_format(format) client = setup_authenticated_client() - + # Use limit or default max_results = limit or 100 @@ -52,11 +64,21 @@ async def parallel_requests(): list_apps_res, list_app_configs_res = run_async(parallel_requests()) # Apply client-side filtering and sorting - filtered_deployed = _apply_filter(list_apps_res.apps, filter) if filter else list_apps_res.apps - filtered_configured = _apply_filter(list_app_configs_res.appConfigurations, filter) if filter else list_app_configs_res.appConfigurations - - sorted_deployed = _apply_sort(filtered_deployed, sort_by) if sort_by else filtered_deployed - sorted_configured = _apply_sort(filtered_configured, sort_by) if sort_by else filtered_configured + filtered_deployed = ( + _apply_filter(list_apps_res.apps, filter) if filter else list_apps_res.apps + ) + filtered_configured = ( + _apply_filter(list_app_configs_res.appConfigurations, filter) + if filter + else list_app_configs_res.appConfigurations + ) + + sorted_deployed = ( + _apply_sort(filtered_deployed, sort_by) if sort_by else filtered_deployed + ) + sorted_configured = ( + _apply_sort(filtered_configured, sort_by) if sort_by else filtered_configured + ) if format == "json": _print_servers_json(sorted_deployed, sorted_configured) @@ -66,93 +88,120 @@ async def parallel_requests(): _print_servers_text(sorted_deployed, sorted_configured, filter, sort_by) - -def _apply_filter(servers: List[Union[MCPApp, MCPAppConfiguration]], filter_expr: str) -> List[Union[MCPApp, MCPAppConfiguration]]: +def _apply_filter( + servers: List[Union[MCPApp, MCPAppConfiguration]], filter_expr: str +) -> List[Union[MCPApp, MCPAppConfiguration]]: """Apply client-side filtering to servers.""" if not filter_expr: return servers - + filtered_servers = [] # Support basic filtering by name, status, description filter_lower = filter_expr.lower() - + for server in servers: # Get server attributes for filtering try: if isinstance(server, MCPApp): name = server.name or "" description = server.description or "" - status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" - elif hasattr(server, 'app'): # MCPAppConfiguration + status = ( + server.appServerInfo.status + if server.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) + elif hasattr(server, "app"): # MCPAppConfiguration name = server.app.name if server.app else "" description = server.app.description if server.app else "" - status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + status = ( + server.appServerInfo.status + if server.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) else: # Fallback for other types (like test mocks) - name = getattr(server, 'name', '') or "" - description = getattr(server, 'description', '') or "" - server_info = getattr(server, 'appServerInfo', None) - status = server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" + name = getattr(server, "name", "") or "" + description = getattr(server, "description", "") or "" + server_info = getattr(server, "appServerInfo", None) + status = ( + server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" + ) except Exception: # Skip servers that can't be processed continue - + # Clean status for filtering clean_status = clean_server_status(status).lower() - + # Check if filter matches name, description, or status - if (filter_lower in name.lower() or - filter_lower in description.lower() or - filter_lower in clean_status): + if ( + filter_lower in name.lower() + or filter_lower in description.lower() + or filter_lower in clean_status + ): filtered_servers.append(server) - + return filtered_servers -def _apply_sort(servers: List[Union[MCPApp, MCPAppConfiguration]], sort_field: str) -> List[Union[MCPApp, MCPAppConfiguration]]: +def _apply_sort( + servers: List[Union[MCPApp, MCPAppConfiguration]], sort_field: str +) -> List[Union[MCPApp, MCPAppConfiguration]]: """Apply client-side sorting to servers.""" if not sort_field: return servers - + # Normalize sort field sort_field_lower = sort_field.lower() reverse = False - + # Support reverse sorting with - prefix - if sort_field_lower.startswith('-'): + if sort_field_lower.startswith("-"): reverse = True sort_field_lower = sort_field_lower[1:] - + def get_sort_key(server): try: if isinstance(server, MCPApp): name = server.name or "" created_at = server.createdAt - status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" - elif hasattr(server, 'app'): # MCPAppConfiguration + status = ( + server.appServerInfo.status + if server.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) + elif hasattr(server, "app"): # MCPAppConfiguration name = server.app.name if server.app else "" created_at = server.createdAt - status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + status = ( + server.appServerInfo.status + if server.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) else: # Fallback for other types (like test mocks) - name = getattr(server, 'name', '') or "" - created_at = getattr(server, 'createdAt', None) - server_info = getattr(server, 'appServerInfo', None) - status = server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" + name = getattr(server, "name", "") or "" + created_at = getattr(server, "createdAt", None) + server_info = getattr(server, "appServerInfo", None) + status = ( + server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" + ) except Exception: # Return default values for sorting if server can't be processed name = "" created_at = None status = "APP_SERVER_STATUS_OFFLINE" - - if sort_field_lower == 'name': + + if sort_field_lower == "name": return name.lower() - elif sort_field_lower in ['created', 'created_at', 'date']: - return created_at or datetime.min.replace(tzinfo=None if created_at is None else created_at.tzinfo) - elif sort_field_lower == 'status': + elif sort_field_lower in ["created", "created_at", "date"]: + return created_at or datetime.min.replace( + tzinfo=None if created_at is None else created_at.tzinfo + ) + elif sort_field_lower == "status": return clean_server_status(status).lower() else: # Default to name if sort field not recognized return name.lower() - + try: return sorted(servers, key=get_sort_key, reverse=reverse) except Exception: @@ -160,7 +209,12 @@ def get_sort_key(server): return servers -def _print_servers_text(deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration], filter_param: Optional[str], sort_by: Optional[str]) -> None: +def _print_servers_text( + deployed_servers: List[MCPApp], + configured_servers: List[MCPAppConfiguration], + filter_param: Optional[str], + sort_by: Optional[str], +) -> None: """Print servers in text format.""" print_info_header() @@ -185,39 +239,45 @@ def _print_servers_text(deployed_servers: List[MCPApp], configured_servers: List print_info("No configured servers found.") if filter_param or sort_by: - console.print(f"\n[dim]Applied filters: filter={filter_param or 'None'}, sort-by={sort_by or 'None'}[/dim]") + console.print( + f"\n[dim]Applied filters: filter={filter_param or 'None'}, sort-by={sort_by or 'None'}[/dim]" + ) filter_desc = f"filter='{filter_param}'" if filter_param else "filter=None" sort_desc = f"sort-by='{sort_by}'" if sort_by else "sort-by=None" - print_info(f"Client-side {filter_desc}, {sort_desc}. Sort fields: name, created, status (-prefix for reverse).") + print_info( + f"Client-side {filter_desc}, {sort_desc}. Sort fields: name, created, status (-prefix for reverse)." + ) -def _print_servers_json(deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration]) -> None: +def _print_servers_json( + deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration] +) -> None: """Print servers in JSON format.""" deployed_data = [_server_to_dict(server) for server in deployed_servers] configured_data = [_server_config_to_dict(config) for config in configured_servers] - - output = { - "deployed_servers": deployed_data, - "configured_servers": configured_data - } + + output = {"deployed_servers": deployed_data, "configured_servers": configured_data} print(json.dumps(output, indent=2, default=str)) -def _print_servers_yaml(deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration]) -> None: +def _print_servers_yaml( + deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration] +) -> None: """Print servers in YAML format.""" deployed_data = [_server_to_dict(server) for server in deployed_servers] configured_data = [_server_config_to_dict(config) for config in configured_servers] - - output = { - "deployed_servers": deployed_data, - "configured_servers": configured_data - } + + output = {"deployed_servers": deployed_data, "configured_servers": configured_data} print(yaml.dump(output, default_flow_style=False)) def _server_to_dict(server: MCPApp) -> dict: """Convert MCPApp to dictionary.""" - status_raw = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + status_raw = ( + server.appServerInfo.status + if server.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) return { "id": server.appId, "name": server.name or "Unnamed", @@ -226,13 +286,17 @@ def _server_to_dict(server: MCPApp) -> dict: "server_url": server.appServerInfo.serverUrl if server.appServerInfo else None, "creator_id": server.creatorId, "created_at": server.createdAt.isoformat() if server.createdAt else None, - "type": "deployed" + "type": "deployed", } def _server_config_to_dict(config: MCPAppConfiguration) -> dict: """Convert MCPAppConfiguration to dictionary.""" - status_raw = config.appServerInfo.status if config.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + status_raw = ( + config.appServerInfo.status + if config.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) return { "config_id": config.appConfigurationId, "app_id": config.app.appId if config.app else None, @@ -242,12 +306,10 @@ def _server_config_to_dict(config: MCPAppConfiguration) -> dict: "server_url": config.appServerInfo.serverUrl if config.appServerInfo else None, "creator_id": config.creatorId, "created_at": config.createdAt.isoformat() if config.createdAt else None, - "type": "configured" + "type": "configured", } - - def print_info_header() -> None: """Print a styled header explaining the following tables""" console.print( @@ -329,4 +391,4 @@ def _server_status_text(status: str) -> str: elif status == "APP_SERVER_STATUS_OFFLINE": return "[red]🔴 Offline[/red]" else: - return "❓ Unknown" \ No newline at end of file + return "❓ Unknown" diff --git a/src/mcp_agent/cli/cloud/commands/servers/workflows/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/workflows/__init__.py deleted file mode 100644 index 1c9260a18..000000000 --- a/src/mcp_agent/cli/cloud/commands/servers/workflows/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -"""Workflows subcommand for servers.""" - -from .main import list_workflows_for_server - -__all__ = [ - "list_workflows_for_server", -] \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/commands/utils.py b/src/mcp_agent/cli/cloud/commands/utils.py index df258afe2..4cacb0391 100644 --- a/src/mcp_agent/cli/cloud/commands/utils.py +++ b/src/mcp_agent/cli/cloud/commands/utils.py @@ -17,60 +17,60 @@ def setup_authenticated_client() -> MCPAppClient: """Setup authenticated MCP App client. - + Returns: Configured MCPAppClient instance - + Raises: CLIError: If authentication fails """ effective_api_key = load_api_key_credentials() if not effective_api_key: - raise CLIError( - "Must be logged in to access servers. Run 'mcp-agent login'." - ) + raise CLIError("Must be logged in to access servers. Run 'mcp-agent login'.") - return MCPAppClient( - api_url=DEFAULT_API_BASE_URL, api_key=effective_api_key - ) + return MCPAppClient(api_url=DEFAULT_API_BASE_URL, api_key=effective_api_key) def validate_output_format(format: str) -> None: """Validate output format parameter. - + Args: format: Output format to validate - + Raises: CLIError: If format is invalid """ valid_formats = ["text", "json", "yaml"] if format not in valid_formats: - raise CLIError(f"Invalid format '{format}'. Valid options are: {', '.join(valid_formats)}") + raise CLIError( + f"Invalid format '{format}'. Valid options are: {', '.join(valid_formats)}" + ) -def resolve_server(client: MCPAppClient, id_or_url: str) -> Union[MCPApp, MCPAppConfiguration]: +def resolve_server( + client: MCPAppClient, id_or_url: str +) -> Union[MCPApp, MCPAppConfiguration]: """Resolve server from ID. - + Args: client: Authenticated MCP App client id_or_url: Server identifier (app ID or app config ID) - + Returns: Server object (MCPApp or MCPAppConfiguration) - + Raises: CLIError: If server resolution fails """ try: app_id, config_id = parse_app_identifier(id_or_url) - + if config_id: return run_async(client.get_app_configuration(app_config_id=config_id)) else: return run_async(client.get_app(app_id=app_id)) - + except ValueError as e: raise CLIError(str(e)) from e except Exception as e: @@ -79,13 +79,14 @@ def resolve_server(client: MCPAppClient, id_or_url: str) -> Union[MCPApp, MCPApp def handle_server_api_errors(func): """Decorator to handle common API errors for server commands. - + Args: func: Function to wrap with error handling - + Returns: Wrapped function with error handling """ + @wraps(func) def wrapper(*args, **kwargs): try: @@ -99,18 +100,18 @@ def wrapper(*args, **kwargs): raise except Exception as e: # Get the original function name for better error messages - func_name = func.__name__.replace('_', ' ') + func_name = func.__name__.replace("_", " ") raise CLIError(f"Error in {func_name}: {str(e)}") from e - + return wrapper def get_server_name(server: Union[MCPApp, MCPAppConfiguration]) -> str: """Get display name for a server. - + Args: server: Server object - + Returns: Server display name """ @@ -122,10 +123,10 @@ def get_server_name(server: Union[MCPApp, MCPAppConfiguration]) -> str: def get_server_id(server: Union[MCPApp, MCPAppConfiguration]) -> str: """Get ID for a server. - + Args: server: Server object - + Returns: Server ID """ @@ -137,10 +138,10 @@ def get_server_id(server: Union[MCPApp, MCPAppConfiguration]) -> str: def clean_server_status(status: str) -> str: """Convert server status from API format to clean format. - + Args: status: API status string - + Returns: Clean status string """ @@ -149,4 +150,4 @@ def clean_server_status(status: str) -> str: elif status == "APP_SERVER_STATUS_OFFLINE": return "offline" else: - return "unknown" \ No newline at end of file + return "unknown" diff --git a/src/mcp_agent/cli/cloud/commands/workflows/__init__.py b/src/mcp_agent/cli/cloud/commands/workflows/__init__.py index 5ecd86ba4..7aedee0d8 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/__init__.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/__init__.py @@ -3,10 +3,14 @@ from .describe import describe_workflow from .resume import resume_workflow, suspend_workflow from .cancel import cancel_workflow +from .list import list_workflows +from .runs import list_workflow_runs __all__ = [ "describe_workflow", "resume_workflow", "suspend_workflow", "cancel_workflow", + "list_workflows", + "list_workflow_runs", ] diff --git a/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py b/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py index fcc5524e7..5b480433b 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/cancel/main.py @@ -10,53 +10,63 @@ from mcp_agent.cli.utils.ux import console from mcp_agent.config import MCPServerSettings, Settings, LoggerSettings from mcp_agent.mcp.gen_client import gen_client -from ...utils import setup_authenticated_client, resolve_server, handle_server_api_errors +from ...utils import ( + setup_authenticated_client, + resolve_server, + handle_server_api_errors, +) async def _cancel_workflow_async( - server_id_or_url: str, - run_id: str, - reason: Optional[str] = None + server_id_or_url: str, run_id: str, reason: Optional[str] = None ) -> None: """Cancel a workflow using MCP tool calls to a deployed server.""" - if server_id_or_url.startswith(('http://', 'https://')): + if server_id_or_url.startswith(("http://", "https://")): server_url = server_id_or_url else: client = setup_authenticated_client() server = resolve_server(client, server_id_or_url) - - if hasattr(server, 'appServerInfo') and server.appServerInfo: + + if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl else: - raise CLIError(f"Server '{server_id_or_url}' is not deployed or has no server URL") - + raise CLIError( + f"Server '{server_id_or_url}' is not deployed or has no server URL" + ) + if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - + quiet_settings = Settings(logger=LoggerSettings(level="error")) app = MCPApp(name="workflows_cli", settings=quiet_settings) - + try: async with app.run() as workflow_app: context = workflow_app.context - - sse_url = f"{server_url.rstrip('/')}/sse" if not server_url.endswith('/sse') else server_url + + sse_url = ( + f"{server_url.rstrip('/')}/sse" + if not server_url.endswith("/sse") + else server_url + ) context.server_registry.registry["workflow_server"] = MCPServerSettings( name="workflow_server", description=f"Deployed MCP server {server_url}", url=sse_url, - transport="sse" + transport="sse", ) - - async with gen_client("workflow_server", server_registry=context.server_registry) as client: + + async with gen_client( + "workflow_server", server_registry=context.server_registry + ) as client: tool_params = {"run_id": run_id} - + result = await client.call_tool("workflows-cancel", tool_params) - + success = result.content[0].text if result.content else False if isinstance(success, str): - success = success.lower() == 'true' - + success = success.lower() == "true" + if success: console.print("[yellow]⚠[/yellow] Successfully cancelled workflow") console.print(f" Run ID: [cyan]{run_id}[/cyan]") @@ -64,26 +74,32 @@ async def _cancel_workflow_async( console.print(f" Reason: [dim]{reason}[/dim]") else: raise CLIError(f"Failed to cancel workflow with run ID {run_id}") - + except Exception as e: - raise CLIError(f"Error cancelling workflow with run ID {run_id}: {str(e)}") from e + raise CLIError( + f"Error cancelling workflow with run ID {run_id}: {str(e)}" + ) from e @handle_server_api_errors def cancel_workflow( - server_id_or_url: str = typer.Argument(..., help="Server ID or URL hosting the workflow"), + server_id_or_url: str = typer.Argument( + ..., help="Server ID or URL hosting the workflow" + ), run_id: str = typer.Argument(..., help="Run ID of the workflow to cancel"), - reason: Optional[str] = typer.Option(None, "--reason", help="Optional reason for cancellation"), + reason: Optional[str] = typer.Option( + None, "--reason", help="Optional reason for cancellation" + ), ) -> None: """Cancel a workflow execution. - + Permanently stops a workflow execution. Unlike suspend, a cancelled workflow cannot be resumed and will be marked as cancelled. - + Examples: - + mcp-agent cloud workflows cancel app_abc123 run_xyz789 - + mcp-agent cloud workflows cancel app_abc123 run_xyz789 --reason "User requested" """ run_async(_cancel_workflow_async(server_id_or_url, run_id, reason)) diff --git a/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py b/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py index b06db73cf..7dd3eb974 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/describe/main.py @@ -13,53 +13,61 @@ from mcp_agent.cli.utils.ux import console from mcp_agent.config import MCPServerSettings, Settings, LoggerSettings from mcp_agent.mcp.gen_client import gen_client -from ...utils import setup_authenticated_client, resolve_server, handle_server_api_errors +from ...utils import ( + setup_authenticated_client, + resolve_server, + handle_server_api_errors, +) async def _describe_workflow_async( - server_id_or_url: str, - run_id: str, - format: str = "text" + server_id_or_url: str, run_id: str, format: str = "text" ) -> None: """Describe a workflow using MCP tool calls to a deployed server.""" - if server_id_or_url.startswith(('http://', 'https://')): + if server_id_or_url.startswith(("http://", "https://")): server_url = server_id_or_url else: client = setup_authenticated_client() server = resolve_server(client, server_id_or_url) - - if hasattr(server, 'appServerInfo') and server.appServerInfo: + + if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl else: - raise CLIError(f"Server '{server_id_or_url}' is not deployed or has no server URL") - + raise CLIError( + f"Server '{server_id_or_url}' is not deployed or has no server URL" + ) + if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - + quiet_settings = Settings(logger=LoggerSettings(level="error")) app = MCPApp(name="workflows_cli", settings=quiet_settings) - + try: async with app.run() as workflow_app: context = workflow_app.context - sse_url = f"{server_url}/sse" if not server_url.endswith('/sse') else server_url + sse_url = ( + f"{server_url}/sse" if not server_url.endswith("/sse") else server_url + ) context.server_registry.registry["workflow_server"] = MCPServerSettings( name="workflow_server", description=f"Deployed MCP server {server_url}", url=sse_url, - transport="sse" + transport="sse", ) - - async with gen_client("workflow_server", server_registry=context.server_registry) as client: - result = await client.call_tool("workflows-get_status", { - "run_id": run_id - }) - + + async with gen_client( + "workflow_server", server_registry=context.server_registry + ) as client: + result = await client.call_tool( + "workflows-get_status", {"run_id": run_id} + ) + workflow_status = result.content[0].text if result.content else {} if isinstance(workflow_status, str): workflow_status = json.loads(workflow_status) - + if not workflow_status: raise CLIError(f"Workflow with run ID '{run_id}' not found.") @@ -69,26 +77,32 @@ async def _describe_workflow_async( print(yaml.dump(workflow_status, default_flow_style=False)) else: # text format print_workflow_status(workflow_status) - + except Exception as e: - raise CLIError(f"Error describing workflow with run ID {run_id}: {str(e)}") from e + raise CLIError( + f"Error describing workflow with run ID {run_id}: {str(e)}" + ) from e @handle_server_api_errors def describe_workflow( - server_id_or_url: str = typer.Argument(..., help="Server ID or URL hosting the workflow"), + server_id_or_url: str = typer.Argument( + ..., help="Server ID or URL hosting the workflow" + ), run_id: str = typer.Argument(..., help="Run ID of the workflow to describe"), - format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"), + format: Optional[str] = typer.Option( + "text", "--format", help="Output format (text|json|yaml)" + ), ) -> None: """Describe a workflow execution (alias: status). - + Shows detailed information about a workflow execution including its current status, creation time, and other metadata. - + Examples: - + mcp-agent cloud workflows describe app_abc123 run_xyz789 - + mcp-agent cloud workflows describe app_abc123 run_xyz789 --format json """ if format not in ["text", "json", "yaml"]: @@ -101,19 +115,24 @@ def describe_workflow( def print_workflow_status(workflow_status: dict) -> None: """Print workflow status information in text format.""" name = workflow_status.get("name", "N/A") - workflow_id = workflow_status.get("workflow_id", workflow_status.get("workflowId", "N/A")) + workflow_id = workflow_status.get( + "workflow_id", workflow_status.get("workflowId", "N/A") + ) run_id = workflow_status.get("run_id", workflow_status.get("runId", "N/A")) status = workflow_status.get("status", "N/A") - - created_at = workflow_status.get("created_at", workflow_status.get("createdAt", "N/A")) + + created_at = workflow_status.get( + "created_at", workflow_status.get("createdAt", "N/A") + ) if created_at != "N/A" and isinstance(created_at, str): try: from datetime import datetime - created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) - created_at = created_dt.strftime('%Y-%m-%d %H:%M:%S') + + created_dt = datetime.fromisoformat(created_at.replace("Z", "+00:00")) + created_at = created_dt.strftime("%Y-%m-%d %H:%M:%S") except (ValueError, TypeError): pass # Keep original format if parsing fails - + console.print( Panel( f"Name: [cyan]{name}[/cyan]\n" @@ -131,7 +150,7 @@ def print_workflow_status(workflow_status: dict) -> None: def _format_status(status: str) -> str: """Format the execution status text.""" status_lower = str(status).lower() - + if "running" in status_lower: return "🔄 Running" elif "failed" in status_lower or "error" in status_lower: diff --git a/src/mcp_agent/cli/cloud/commands/workflows/list/__init__.py b/src/mcp_agent/cli/cloud/commands/workflows/list/__init__.py new file mode 100644 index 000000000..0b0f6a821 --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/workflows/list/__init__.py @@ -0,0 +1,5 @@ +"""Workflow list command module.""" + +from .main import list_workflows + +__all__ = ["list_workflows"] diff --git a/src/mcp_agent/cli/cloud/commands/workflows/list/main.py b/src/mcp_agent/cli/cloud/commands/workflows/list/main.py new file mode 100644 index 000000000..f93be1a13 --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/workflows/list/main.py @@ -0,0 +1,155 @@ +"""Workflow list command implementation.""" + +import json +from typing import Optional + +import typer +import yaml +from rich.table import Table + +from mcp_agent.app import MCPApp +from mcp_agent.cli.core.utils import run_async +from mcp_agent.cli.exceptions import CLIError +from mcp_agent.cli.utils.ux import console, print_info +from mcp_agent.config import MCPServerSettings, Settings, LoggerSettings +from mcp_agent.mcp.gen_client import gen_client +from ...utils import ( + setup_authenticated_client, + resolve_server, + handle_server_api_errors, + validate_output_format, +) + + +async def _list_workflows_async(server_id_or_url: str, format: str = "text") -> None: + """List available workflows using MCP tool calls to a deployed server.""" + if server_id_or_url.startswith(("http://", "https://")): + server_url = server_id_or_url + else: + client = setup_authenticated_client() + server = resolve_server(client, server_id_or_url) + + if hasattr(server, "appServerInfo") and server.appServerInfo: + server_url = server.appServerInfo.serverUrl + else: + raise CLIError( + f"Server '{server_id_or_url}' is not deployed or has no server URL" + ) + + if not server_url: + raise CLIError(f"No server URL found for server '{server_id_or_url}'") + + quiet_settings = Settings(logger=LoggerSettings(level="error")) + app = MCPApp(name="workflows_cli", settings=quiet_settings) + + try: + async with app.run() as workflow_app: + context = workflow_app.context + + sse_url = ( + f"{server_url}/sse" if not server_url.endswith("/sse") else server_url + ) + context.server_registry.registry["workflow_server"] = MCPServerSettings( + name="workflow_server", + description=f"Deployed MCP server {server_url}", + url=sse_url, + transport="sse", + ) + + async with gen_client( + "workflow_server", server_registry=context.server_registry + ) as client: + result = await client.call_tool("workflows-list", {}) + + workflows_data = result.content[0].text if result.content else "{}" + if isinstance(workflows_data, str): + workflows_data = json.loads(workflows_data) + + if not workflows_data: + workflows_data = {} + + if format == "json": + print(json.dumps(workflows_data, indent=2)) + elif format == "yaml": + print(yaml.dump(workflows_data, default_flow_style=False)) + else: # text format + print_workflows_text(workflows_data, server_id_or_url) + + except Exception as e: + raise CLIError( + f"Error listing workflows for server {server_id_or_url}: {str(e)}" + ) from e + + +@handle_server_api_errors +def list_workflows( + server_id_or_url: str = typer.Argument( + ..., help="Server ID or URL to list workflows for" + ), + format: Optional[str] = typer.Option( + "text", "--format", help="Output format (text|json|yaml)" + ), +) -> None: + """List available workflow definitions for an MCP Server. + + This command lists the workflow definitions that a server provides, + showing what workflows can be executed. + + Examples: + + mcp-agent cloud workflows list app_abc123 + + mcp-agent cloud workflows list https://server.example.com --format json + """ + validate_output_format(format) + run_async(_list_workflows_async(server_id_or_url, format)) + + +def print_workflows_text(workflows_data: dict, server_id_or_url: str) -> None: + """Print workflows information in text format.""" + server_name = server_id_or_url + + console.print( + f"\n[bold blue]📋 Available Workflows for Server: {server_name}[/bold blue]" + ) + + if not workflows_data or not any(workflows_data.values()): + print_info("No workflows found for this server.") + return + + total_workflows = sum( + len(workflow_list) if isinstance(workflow_list, list) else 1 + for workflow_list in workflows_data.values() + ) + console.print(f"\nFound {total_workflows} workflow definition(s):") + + table = Table(show_header=True, header_style="bold blue") + table.add_column("Name", style="cyan", width=25) + table.add_column("Description", style="green", width=40) + table.add_column("Capabilities", style="yellow", width=25) + table.add_column("Tool Endpoints", style="dim", width=20) + + for workflow_name, workflow_info in workflows_data.items(): + if isinstance(workflow_info, dict): + name = workflow_info.get("name", workflow_name) + description = workflow_info.get("description", "N/A") + capabilities = ", ".join(workflow_info.get("capabilities", [])) + tool_endpoints = ", ".join( + [ep.split("-")[-1] for ep in workflow_info.get("tool_endpoints", [])] + ) + + table.add_row( + _truncate_string(name, 25), + _truncate_string(description, 40), + _truncate_string(capabilities if capabilities else "N/A", 25), + _truncate_string(tool_endpoints if tool_endpoints else "N/A", 20), + ) + + console.print(table) + + +def _truncate_string(text: str, max_length: int) -> str: + """Truncate string to max_length, adding ellipsis if truncated.""" + if len(text) <= max_length: + return text + return text[: max_length - 3] + "..." diff --git a/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py b/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py index 19e479900..3c2271db0 100644 --- a/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/resume/main.py @@ -11,86 +11,126 @@ from mcp_agent.cli.utils.ux import console from mcp_agent.config import MCPServerSettings, Settings, LoggerSettings from mcp_agent.mcp.gen_client import gen_client -from ...utils import setup_authenticated_client, resolve_server, handle_server_api_errors +from ...utils import ( + setup_authenticated_client, + resolve_server, + handle_server_api_errors, +) async def _signal_workflow_async( server_id_or_url: str, run_id: str, signal_name: str = "resume", - payload: Optional[str] = None + payload: Optional[str] = None, ) -> None: """Send a signal to a workflow using MCP tool calls to a deployed server.""" - if server_id_or_url.startswith(('http://', 'https://')): + if server_id_or_url.startswith(("http://", "https://")): server_url = server_id_or_url else: client = setup_authenticated_client() server = resolve_server(client, server_id_or_url) - - if hasattr(server, 'appServerInfo') and server.appServerInfo: + + if hasattr(server, "appServerInfo") and server.appServerInfo: server_url = server.appServerInfo.serverUrl else: - raise CLIError(f"Server '{server_id_or_url}' is not deployed or has no server URL") - + raise CLIError( + f"Server '{server_id_or_url}' is not deployed or has no server URL" + ) + if not server_url: raise CLIError(f"No server URL found for server '{server_id_or_url}'") - + quiet_settings = Settings(logger=LoggerSettings(level="error")) app = MCPApp(name="workflows_cli", settings=quiet_settings) - + try: async with app.run() as workflow_app: context = workflow_app.context - - sse_url = f"{server_url.rstrip('/')}/sse" if not server_url.endswith('/sse') else server_url + + sse_url = ( + f"{server_url.rstrip('/')}/sse" + if not server_url.endswith("/sse") + else server_url + ) context.server_registry.registry["workflow_server"] = MCPServerSettings( name="workflow_server", description=f"Deployed MCP server {server_url}", url=sse_url, - transport="sse" + transport="sse", ) - - async with gen_client("workflow_server", server_registry=context.server_registry) as client: + + async with gen_client( + "workflow_server", server_registry=context.server_registry + ) as client: tool_params = {"run_id": run_id, "signal_name": signal_name} if payload: tool_params["payload"] = payload - + result = await client.call_tool("workflows-resume", tool_params) - + success = result.content[0].text if result.content else False if isinstance(success, str): - success = success.lower() == 'true' - + success = success.lower() == "true" + if success: - action_past = "resumed" if signal_name == "resume" else "suspended" if signal_name == "suspend" else f"signaled ({signal_name})" - action_color = "green" if signal_name == "resume" else "yellow" if signal_name == "suspend" else "blue" - action_icon = "✓" if signal_name == "resume" else "⏸" if signal_name == "suspend" else "📡" - console.print(f"[{action_color}]{action_icon}[/{action_color}] Successfully {action_past} workflow") + action_past = ( + "resumed" + if signal_name == "resume" + else "suspended" + if signal_name == "suspend" + else f"signaled ({signal_name})" + ) + action_color = ( + "green" + if signal_name == "resume" + else "yellow" + if signal_name == "suspend" + else "blue" + ) + action_icon = ( + "✓" + if signal_name == "resume" + else "⏸" + if signal_name == "suspend" + else "📡" + ) + console.print( + f"[{action_color}]{action_icon}[/{action_color}] Successfully {action_past} workflow" + ) console.print(f" Run ID: [cyan]{run_id}[/cyan]") else: - raise CLIError(f"Failed to {signal_name} workflow with run ID {run_id}") - + raise CLIError( + f"Failed to {signal_name} workflow with run ID {run_id}" + ) + except Exception as e: - raise CLIError(f"Error {signal_name}ing workflow with run ID {run_id}: {str(e)}") from e + raise CLIError( + f"Error {signal_name}ing workflow with run ID {run_id}: {str(e)}" + ) from e @handle_server_api_errors def resume_workflow( - server_id_or_url: str = typer.Argument(..., help="Server ID or URL hosting the workflow"), + server_id_or_url: str = typer.Argument( + ..., help="Server ID or URL hosting the workflow" + ), run_id: str = typer.Argument(..., help="Run ID of the workflow to resume"), - payload: Optional[str] = typer.Option(None, "--payload", help="JSON or text payload to pass to resumed workflow"), + payload: Optional[str] = typer.Option( + None, "--payload", help="JSON or text payload to pass to resumed workflow" + ), ) -> None: """Resume a suspended workflow execution. - + Resumes execution of a previously suspended workflow. Optionally accepts a payload (JSON or text) to pass data to the resumed workflow. - + Examples: - + mcp-agent cloud workflows resume app_abc123 run_xyz789 - + mcp-agent cloud workflows resume app_abc123 run_xyz789 --payload '{"data": "value"}' - + mcp-agent cloud workflows resume app_abc123 run_xyz789 --payload "simple text" """ if payload: @@ -105,15 +145,19 @@ def resume_workflow( @handle_server_api_errors def suspend_workflow( - server_id_or_url: str = typer.Argument(..., help="Server ID or URL hosting the workflow"), + server_id_or_url: str = typer.Argument( + ..., help="Server ID or URL hosting the workflow" + ), run_id: str = typer.Argument(..., help="Run ID of the workflow to suspend"), - payload: Optional[str] = typer.Option(None, "--payload", help="JSON or text payload to pass to suspended workflow"), + payload: Optional[str] = typer.Option( + None, "--payload", help="JSON or text payload to pass to suspended workflow" + ), ) -> None: """Suspend a workflow execution. - + Temporarily pauses a workflow execution, which can later be resumed. Optionally accepts a payload (JSON or text) to pass data to the suspended workflow. - + Examples: mcp-agent cloud workflows suspend app_abc123 run_xyz789 mcp-agent cloud workflows suspend https://server.example.com run_xyz789 --payload '{"reason": "maintenance"}' diff --git a/src/mcp_agent/cli/cloud/commands/workflows/runs/__init__.py b/src/mcp_agent/cli/cloud/commands/workflows/runs/__init__.py new file mode 100644 index 000000000..96ad2805e --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/workflows/runs/__init__.py @@ -0,0 +1,5 @@ +"""Workflow runs command module.""" + +from .main import list_workflow_runs + +__all__ = ["list_workflow_runs"] diff --git a/src/mcp_agent/cli/cloud/commands/servers/workflows/main.py b/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py similarity index 71% rename from src/mcp_agent/cli/cloud/commands/servers/workflows/main.py rename to src/mcp_agent/cli/cloud/commands/workflows/runs/main.py index b133a881a..de2935c30 100644 --- a/src/mcp_agent/cli/cloud/commands/servers/workflows/main.py +++ b/src/mcp_agent/cli/cloud/commands/workflows/runs/main.py @@ -1,4 +1,4 @@ -"""Server workflows command implementation.""" +"""Workflow runs command implementation.""" import json from typing import Optional @@ -19,39 +19,51 @@ @handle_server_api_errors -def list_workflows_for_server( - server_id_or_url: str = typer.Argument(..., help="Server ID, app config ID, or server URL to list workflows for"), - limit: Optional[int] = typer.Option(None, "--limit", help="Maximum number of results to return"), - status: Optional[str] = typer.Option(None, "--status", help="Filter by status: running|failed|timed_out|canceled|terminated|completed|continued"), - format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"), +def list_workflow_runs( + server_id_or_url: str = typer.Argument( + ..., help="Server ID, app config ID, or server URL to list workflow runs for" + ), + limit: Optional[int] = typer.Option( + None, "--limit", help="Maximum number of results to return" + ), + status: Optional[str] = typer.Option( + None, + "--status", + help="Filter by status: running|failed|timed_out|canceled|terminated|completed|continued", + ), + format: Optional[str] = typer.Option( + "text", "--format", help="Output format (text|json|yaml)" + ), ) -> None: - """List workflows for an MCP Server. - + """List workflow runs for an MCP Server. + Examples: - - mcp-agent cloud servers workflows app_abc123 - - mcp-agent cloud servers workflows https://server.example.com --status running - - mcp-agent cloud servers workflows apcnf_xyz789 --limit 10 --format json + + mcp-agent cloud workflows runs app_abc123 + + mcp-agent cloud workflows runs https://server.example.com --status running + + mcp-agent cloud workflows runs apcnf_xyz789 --limit 10 --format json """ validate_output_format(format) client = setup_authenticated_client() - - if server_id_or_url.startswith(('http://', 'https://')): + + if server_id_or_url.startswith(("http://", "https://")): resolved_server = resolve_server(client, server_id_or_url) - - if hasattr(resolved_server, 'appId'): + + if hasattr(resolved_server, "appId"): app_id_or_config_id = resolved_server.appId - elif hasattr(resolved_server, 'appConfigurationId'): + elif hasattr(resolved_server, "appConfigurationId"): app_id_or_config_id = resolved_server.appConfigurationId else: - raise ValueError(f"Could not extract app ID or config ID from server: {server_id_or_url}") + raise ValueError( + f"Could not extract app ID or config ID from server: {server_id_or_url}" + ) else: app_id_or_config_id = server_id_or_url - + max_results = limit or 100 - + status_filter = None if status: status_map = { @@ -69,20 +81,21 @@ def list_workflows_for_server( status_filter = status_map.get(status.lower()) if not status_filter: valid_statuses = "running|failed|timed_out|timeout|canceled|cancelled|terminated|completed|continued|continued_as_new" - raise typer.BadParameter(f"Invalid status '{status}'. Valid options: {valid_statuses}") + raise typer.BadParameter( + f"Invalid status '{status}'. Valid options: {valid_statuses}" + ) async def list_workflows_async(): return await client.list_workflows( - app_id_or_config_id=app_id_or_config_id, - max_results=max_results + app_id_or_config_id=app_id_or_config_id, max_results=max_results ) response = run_async(list_workflows_async()) workflows = response.workflows or [] - + if status_filter: workflows = [w for w in workflows if w.execution_status == status_filter] - + if format == "json": _print_workflows_json(workflows) elif format == "yaml": @@ -94,15 +107,17 @@ async def list_workflows_async(): def _print_workflows_text(workflows, status_filter, server_id_or_url): """Print workflows in text format.""" server_name = server_id_or_url - - console.print(f"\n[bold blue]📊 Workflows for Server: {server_name}[/bold blue]") - + + console.print( + f"\n[bold blue]📊 Workflow Runs for Server: {server_name}[/bold blue]" + ) + if not workflows: - print_info("No workflows found for this server.") + print_info("No workflow runs found for this server.") return - - console.print(f"\nFound {len(workflows)} workflow(s):") - + + console.print(f"\nFound {len(workflows)} workflow run(s):") + table = Table(show_header=True, header_style="bold blue") table.add_column("Workflow ID", style="cyan", width=20) table.add_column("Name", style="green", width=20) @@ -110,12 +125,16 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url): table.add_column("Run ID", style="dim", width=15) table.add_column("Created", style="dim", width=20) table.add_column("Principal", style="dim", width=15) - + for workflow in workflows: status_display = _get_status_display(workflow.execution_status) - created_display = workflow.created_at.strftime('%Y-%m-%d %H:%M:%S') if workflow.created_at else "N/A" + created_display = ( + workflow.created_at.strftime("%Y-%m-%d %H:%M:%S") + if workflow.created_at + else "N/A" + ) run_id_display = _truncate_string(workflow.run_id or "N/A", 15) - + table.add_row( _truncate_string(workflow.workflow_id, 20), _truncate_string(workflow.name, 20), @@ -124,9 +143,9 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url): created_display, _truncate_string(workflow.principal_id, 15), ) - + console.print(table) - + if status_filter: console.print(f"\n[dim]Filtered by status: {status_filter}[/dim]") @@ -134,13 +153,13 @@ def _print_workflows_text(workflows, status_filter, server_id_or_url): def _print_workflows_json(workflows): """Print workflows in JSON format.""" workflows_data = [_workflow_to_dict(workflow) for workflow in workflows] - print(json.dumps({"workflows": workflows_data}, indent=2, default=str)) + print(json.dumps({"workflow_runs": workflows_data}, indent=2, default=str)) def _print_workflows_yaml(workflows): """Print workflows in YAML format.""" workflows_data = [_workflow_to_dict(workflow) for workflow in workflows] - print(yaml.dump({"workflows": workflows_data}, default_flow_style=False)) + print(yaml.dump({"workflow_runs": workflows_data}, default_flow_style=False)) def _workflow_to_dict(workflow): @@ -151,7 +170,9 @@ def _workflow_to_dict(workflow): "name": workflow.name, "created_at": workflow.created_at.isoformat() if workflow.created_at else None, "principal_id": workflow.principal_id, - "execution_status": workflow.execution_status.value if workflow.execution_status else None, + "execution_status": workflow.execution_status.value + if workflow.execution_status + else None, } @@ -159,14 +180,14 @@ def _truncate_string(text: str, max_length: int) -> str: """Truncate string to max_length, adding ellipsis if truncated.""" if len(text) <= max_length: return text - return text[:max_length-3] + "..." + return text[: max_length - 3] + "..." def _get_status_display(status): """Convert WorkflowExecutionStatus to display string with emoji.""" if not status: return "❓ Unknown" - + status_map = { WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING: "[green]🟢 Running[/green]", WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED: "[blue]✅ Completed[/blue]", @@ -176,5 +197,5 @@ def _get_status_display(status): WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: "[orange]⏰ Timed Out[/orange]", WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: "[purple]🔄 Continued[/purple]", } - - return status_map.get(status, "❓ Unknown") \ No newline at end of file + + return status_map.get(status, "❓ Unknown") diff --git a/src/mcp_agent/cli/cloud/main.py b/src/mcp_agent/cli/cloud/main.py index ac18d3086..b416e36cb 100644 --- a/src/mcp_agent/cli/cloud/main.py +++ b/src/mcp_agent/cli/cloud/main.py @@ -32,12 +32,13 @@ resume_workflow, suspend_workflow, cancel_workflow, + list_workflows, + list_workflow_runs, ) from mcp_agent.cli.cloud.commands.servers import ( list_servers, describe_server, delete_server, - list_workflows_for_server, ) from mcp_agent.cli.exceptions import CLIError from mcp_agent.cli.utils.ux import print_error @@ -150,10 +151,14 @@ def invoke(self, ctx): cls=HelpfulTyperGroup, ) app_cmd_workflows.command(name="describe")(describe_workflow) -app_cmd_workflows.command(name="status")(describe_workflow) # alias for describe +app_cmd_workflows.command( + name="status", help="Describe a workflow execution (alias for 'describe')" +)(describe_workflow) app_cmd_workflows.command(name="resume")(resume_workflow) app_cmd_workflows.command(name="suspend")(suspend_workflow) app_cmd_workflows.command(name="cancel")(cancel_workflow) +app_cmd_workflows.command(name="list")(list_workflows) +app_cmd_workflows.command(name="runs")(list_workflow_runs) # Sub-typer for `mcp-agent servers` commands app_cmd_servers = typer.Typer( @@ -164,7 +169,10 @@ def invoke(self, ctx): app_cmd_servers.command(name="list")(list_servers) app_cmd_servers.command(name="describe")(describe_server) app_cmd_servers.command(name="delete")(delete_server) -app_cmd_servers.command(name="workflows")(list_workflows_for_server) +app_cmd_servers.command( + name="workflows", + help="List available workflows for a server (alias for 'workflows list')", +)(list_workflows) app.add_typer(app_cmd_servers, name="servers", help="Manage MCP Servers") # Alias for servers - apps should behave identically diff --git a/src/mcp_agent/cli/core/utils.py b/src/mcp_agent/cli/core/utils.py index e6a430e30..0bca83854 100644 --- a/src/mcp_agent/cli/core/utils.py +++ b/src/mcp_agent/cli/core/utils.py @@ -28,36 +28,38 @@ def run_async(coro): def parse_app_identifier(identifier: str) -> Tuple[Optional[str], Optional[str]]: """Parse app identifier to extract app ID and config ID. - + Args: identifier: App identifier (must be app_... or apcnf_...) - + Returns: Tuple of (app_id, config_id) - + Raises: ValueError: If identifier format is not recognized """ - - if identifier.startswith('apcnf_'): + + if identifier.startswith("apcnf_"): return None, identifier - - if identifier.startswith('app_'): + + if identifier.startswith("app_"): return identifier, None - - raise ValueError(f"Invalid identifier format: '{identifier}'. Must be an app ID (app_...) or app configuration ID (apcnf_...)") + + raise ValueError( + f"Invalid identifier format: '{identifier}'. Must be an app ID (app_...) or app configuration ID (apcnf_...)" + ) async def resolve_server_url( app_id: Optional[str], - config_id: Optional[str], + config_id: Optional[str], credentials: UserCredentials, ) -> str: """Resolve server URL from app ID or configuration ID.""" - + if not app_id and not config_id: raise CLIError("Either app_id or config_id must be provided") - + if app_id: endpoint = "/mcp_app/get_app" payload = {"app_id": app_id} @@ -76,38 +78,42 @@ async def resolve_server_url( no_url_msg = f"No server URL found for app configuration '{config_id}'" offline_msg = f"App configuration '{config_id}' server is offline" api_error_msg = "Failed to get app configuration" - + api_base = DEFAULT_API_BASE_URL headers = { "Authorization": f"Bearer {credentials.api_key}", "Content-Type": "application/json", } - + try: async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.post(f"{api_base}{endpoint}", json=payload, headers=headers) - + response = await client.post( + f"{api_base}{endpoint}", json=payload, headers=headers + ) + if response.status_code == 404: raise CLIError(not_found_msg) elif response.status_code != 200: - raise CLIError(f"{api_error_msg}: {response.status_code} {response.text}") - + raise CLIError( + f"{api_error_msg}: {response.status_code} {response.text}" + ) + data = response.json() resource_info = data.get(response_key, {}) server_info = resource_info.get("appServerInfo") - + if not server_info: raise CLIError(not_deployed_msg) - + server_url = server_info.get("serverUrl") if not server_url: raise CLIError(no_url_msg) - + status = server_info.get("status", "APP_SERVER_STATUS_UNSPECIFIED") if status == "APP_SERVER_STATUS_OFFLINE": raise CLIError(offline_msg) - + return server_url - + except httpx.RequestError as e: raise CLIError(f"Failed to connect to API: {e}") diff --git a/src/mcp_agent/cli/mcp_app/api_client.py b/src/mcp_agent/cli/mcp_app/api_client.py index 892cf3258..d3156cd17 100644 --- a/src/mcp_agent/cli/mcp_app/api_client.py +++ b/src/mcp_agent/cli/mcp_app/api_client.py @@ -66,6 +66,7 @@ class CanDoActionsResponse(BaseModel): class WorkflowExecutionStatus(Enum): """From temporal.api.enums.v1.WorkflowExecutionStatus""" + WORKFLOW_EXECUTION_STATUS_UNSPECIFIED = "WORKFLOW_EXECUTION_STATUS_UNSPECIFIED" WORKFLOW_EXECUTION_STATUS_RUNNING = "WORKFLOW_EXECUTION_STATUS_RUNNING" WORKFLOW_EXECUTION_STATUS_FAILED = "WORKFLOW_EXECUTION_STATUS_FAILED" @@ -73,11 +74,14 @@ class WorkflowExecutionStatus(Enum): WORKFLOW_EXECUTION_STATUS_CANCELED = "WORKFLOW_EXECUTION_STATUS_CANCELED" WORKFLOW_EXECUTION_STATUS_TERMINATED = "WORKFLOW_EXECUTION_STATUS_TERMINATED" WORKFLOW_EXECUTION_STATUS_COMPLETED = "WORKFLOW_EXECUTION_STATUS_COMPLETED" - WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW = "WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW" + WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW = ( + "WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW" + ) class WorkflowInfo(BaseModel): """Information about a workflow execution instance""" + workflow_id: str run_id: Optional[str] = None name: str @@ -88,6 +92,7 @@ class WorkflowInfo(BaseModel): class ListWorkflowsResponse(BaseModel): """Response for listing workflows""" + workflows: Optional[List[WorkflowInfo]] = [] next_page_token: Optional[str] = None total_count: Optional[int] = 0 @@ -136,19 +141,21 @@ def is_valid_server_url_format(server_url: str) -> bool: class LogEntry(BaseModel): """Represents a single log entry.""" + timestamp: Optional[str] = None level: Optional[str] = None message: Optional[str] = None # Allow additional fields that might be present - + class Config: extra = "allow" class GetAppLogsResponse(BaseModel): """Response from get_app_logs API endpoint.""" + logEntries: Optional[List[LogEntry]] = [] - + @property def log_entries_list(self) -> List[LogEntry]: """Get log entries regardless of field name format.""" @@ -502,16 +509,16 @@ async def list_workflows( page_token: Optional[str] = None, ) -> ListWorkflowsResponse: """List workflows for a specific app or app configuration. - + Args: app_id_or_config_id: The app ID (e.g. app_abc123) or app config ID (e.g. apcnf_xyz789) name_filter: Optional workflow name filter max_results: Maximum number of results to return page_token: Pagination token - + Returns: ListWorkflowsResponse: The list of workflows - + Raises: ValueError: If the app_id_or_config_id is invalid httpx.HTTPError: If the API request fails @@ -519,19 +526,21 @@ async def list_workflows( payload: Dict[str, Any] = { "max_results": max_results, } - + if is_valid_app_id_format(app_id_or_config_id): payload["app_specifier"] = {"app_id": app_id_or_config_id} elif is_valid_app_config_id_format(app_id_or_config_id): payload["app_specifier"] = {"app_config_id": app_id_or_config_id} else: - raise ValueError(f"Invalid app ID or app config ID format: {app_id_or_config_id}. Expected format: app_xxx or apcnf_xxx") - + raise ValueError( + f"Invalid app ID or app config ID format: {app_id_or_config_id}. Expected format: app_xxx or apcnf_xxx" + ) + if name_filter: payload["name"] = name_filter if page_token: payload["page_token"] = page_token - + response = await self.post("/workflow/list", payload) return ListWorkflowsResponse(**response.json()) @@ -712,12 +721,18 @@ async def get_app_logs( if not app_id and not app_configuration_id: raise ValueError("Either app_id or app_configuration_id must be provided") if app_id and app_configuration_id: - raise ValueError("Only one of app_id or app_configuration_id can be provided") - + raise ValueError( + "Only one of app_id or app_configuration_id can be provided" + ) + if app_id and not is_valid_app_id_format(app_id): raise ValueError(f"Invalid app ID format: {app_id}") - if app_configuration_id and not is_valid_app_config_id_format(app_configuration_id): - raise ValueError(f"Invalid app configuration ID format: {app_configuration_id}") + if app_configuration_id and not is_valid_app_config_id_format( + app_configuration_id + ): + raise ValueError( + f"Invalid app configuration ID format: {app_configuration_id}" + ) # Prepare request payload payload = {}