diff --git a/src/mcp_agent/cli/cloud/commands/auth/logout/main.py b/src/mcp_agent/cli/cloud/commands/auth/logout/main.py index 03941eea8..34ec34990 100644 --- a/src/mcp_agent/cli/cloud/commands/auth/logout/main.py +++ b/src/mcp_agent/cli/cloud/commands/auth/logout/main.py @@ -17,19 +17,16 @@ def logout() -> None: print_info("Not currently logged in.") return - # Show who is being logged out user_info = "current user" if credentials.username: user_info = f"user '{credentials.username}'" elif credentials.email: user_info = f"user '{credentials.email}'" - # Confirm logout action if not Confirm.ask(f"Are you sure you want to logout {user_info}?", default=False): print_info("Logout cancelled.") return - # Clear credentials if clear_credentials(): print_success("Successfully logged out.") else: diff --git a/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py b/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py index 14e3fa2a6..5c5b8cafb 100644 --- a/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py +++ b/src/mcp_agent/cli/cloud/commands/auth/whoami/main.py @@ -28,18 +28,15 @@ def whoami() -> None: console = Console() - # Create user info table user_table = Table(show_header=False, box=None) user_table.add_column("Field", style="bold") user_table.add_column("Value") - # Add user information if credentials.username: user_table.add_row("Username", credentials.username) if credentials.email: user_table.add_row("Email", credentials.email) - # Add token expiry if available if credentials.token_expires_at: user_table.add_row( "Token Expires", @@ -48,6 +45,5 @@ def whoami() -> None: else: user_table.add_row("Token Expires", "Never") - # Create user panel user_panel = Panel(user_table, title="User Information", title_align="left") console.print(user_panel) diff --git a/src/mcp_agent/cli/cloud/commands/logger/tail/main.py b/src/mcp_agent/cli/cloud/commands/logger/tail/main.py index d99c6df70..a2d857bf5 100644 --- a/src/mcp_agent/cli/cloud/commands/logger/tail/main.py +++ b/src/mcp_agent/cli/cloud/commands/logger/tail/main.py @@ -18,10 +18,7 @@ from mcp_agent.cli.exceptions import CLIError from mcp_agent.cli.auth import load_credentials, UserCredentials from mcp_agent.cli.core.constants import DEFAULT_API_BASE_URL -from mcp_agent.cli.cloud.commands.logger.utils import ( - parse_app_identifier, - resolve_server_url, -) +from mcp_agent.cli.core.utils import parse_app_identifier, resolve_server_url console = Console() @@ -76,103 +73,89 @@ def tail_logs( ), ) -> None: """Tail logs for an MCP app deployment. - + Retrieve and optionally stream logs from deployed MCP apps. Supports filtering by time duration, text patterns, and continuous streaming. - + Examples: # Get last 50 logs from an app mcp-agent cloud logger tail app_abc123 --limit 50 - + # Stream logs continuously mcp-agent cloud logger tail https://app.mcpac.dev/abc123 --follow - + # Show logs from the last hour with error filtering mcp-agent cloud logger tail app_abc123 --since 1h --grep "ERROR|WARN" - + # Follow logs and filter for specific patterns mcp-agent cloud logger tail app_abc123 --follow --grep "authentication.*failed" """ - + credentials = load_credentials() if not credentials: - console.print( - "[red]Error: Not authenticated. Run 'mcp-agent login' first.[/red]" - ) + console.print("[red]Error: Not authenticated. Run 'mcp-agent login' first.[/red]") raise typer.Exit(4) - + # Validate conflicting options if follow and since: - console.print( - "[red]Error: --since cannot be used with --follow (streaming mode)[/red]" - ) + console.print("[red]Error: --since cannot be used with --follow (streaming mode)[/red]") raise typer.Exit(6) - + if follow and limit != DEFAULT_LOG_LIMIT: - console.print( - "[red]Error: --limit cannot be used with --follow (streaming mode)[/red]" - ) + console.print("[red]Error: --limit cannot be used with --follow (streaming mode)[/red]") raise typer.Exit(6) - + if follow and order_by: - console.print( - "[red]Error: --order-by cannot be used with --follow (streaming mode)[/red]" - ) + console.print("[red]Error: --order-by cannot be used with --follow (streaming mode)[/red]") raise typer.Exit(6) - + if follow and (asc or desc): - console.print( - "[red]Error: --asc/--desc cannot be used with --follow (streaming mode)[/red]" - ) + console.print("[red]Error: --asc/--desc cannot be used with --follow (streaming mode)[/red]") raise typer.Exit(6) - + # Validate order_by values if order_by and order_by not in ["timestamp", "severity"]: console.print("[red]Error: --order-by must be 'timestamp' or 'severity'[/red]") raise typer.Exit(6) - + # Validate that both --asc and --desc are not used together if asc and desc: console.print("[red]Error: Cannot use both --asc and --desc together[/red]") raise typer.Exit(6) - + # Validate format values if format and format not in ["text", "json", "yaml"]: console.print("[red]Error: --format must be 'text', 'json', or 'yaml'[/red]") raise typer.Exit(6) - + app_id, config_id, server_url = parse_app_identifier(app_identifier) - + try: if follow: - asyncio.run( - _stream_logs( - app_id=app_id, - config_id=config_id, - server_url=server_url, - credentials=credentials, - grep_pattern=grep, - app_identifier=app_identifier, - format=format, - ) - ) + asyncio.run(_stream_logs( + app_id=app_id, + config_id=config_id, + server_url=server_url, + credentials=credentials, + grep_pattern=grep, + app_identifier=app_identifier, + format=format, + )) else: - asyncio.run( - _fetch_logs( - app_id=app_id, - config_id=config_id, - server_url=server_url, - credentials=credentials, - since=since, - grep_pattern=grep, - limit=limit, - order_by=order_by, - asc=asc, - desc=desc, - format=format, - ) - ) - + asyncio.run(_fetch_logs( + app_id=app_id, + config_id=config_id, + server_url=server_url, + credentials=credentials, + since=since, + grep_pattern=grep, + limit=limit, + order_by=order_by, + asc=asc, + desc=desc, + format=format, + )) + except KeyboardInterrupt: console.print("\n[yellow]Interrupted by user[/yellow]") sys.exit(0) @@ -183,7 +166,7 @@ def tail_logs( async def _fetch_logs( app_id: Optional[str], - config_id: Optional[str], + config_id: Optional[str], server_url: Optional[str], credentials: UserCredentials, since: Optional[str], @@ -195,40 +178,38 @@ async def _fetch_logs( format: str, ) -> None: """Fetch logs one-time via HTTP API.""" - + api_base = DEFAULT_API_BASE_URL headers = { "Authorization": f"Bearer {credentials.api_key}", "Content-Type": "application/json", } - + payload = {} - + if app_id: payload["app_id"] = app_id elif config_id: payload["app_configuration_id"] = config_id else: - raise CLIError( - "Unable to determine app or configuration ID from provided identifier" - ) - + raise CLIError("Unable to determine app or configuration ID from provided identifier") + if since: payload["since"] = since if limit: payload["limit"] = limit - + if order_by: if order_by == "timestamp": payload["orderBy"] = "LOG_ORDER_BY_TIMESTAMP" elif order_by == "severity": payload["orderBy"] = "LOG_ORDER_BY_LEVEL" - + if asc: payload["order"] = "LOG_ORDER_ASC" elif desc: payload["order"] = "LOG_ORDER_DESC" - + with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), @@ -236,7 +217,7 @@ async def _fetch_logs( transient=True, ) as progress: progress.add_task("Fetching logs...", total=None) - + try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( @@ -244,148 +225,128 @@ async def _fetch_logs( json=payload, headers=headers, ) - + if response.status_code == 401: - raise CLIError( - "Authentication failed. Try running 'mcp-agent login'" - ) + raise CLIError("Authentication failed. Try running 'mcp-agent login'") elif response.status_code == 404: raise CLIError("App or configuration not found") elif response.status_code != 200: - raise CLIError( - f"API request failed: {response.status_code} {response.text}" - ) - + raise CLIError(f"API request failed: {response.status_code} {response.text}") + data = response.json() log_entries = data.get("logEntries", []) - + except httpx.RequestError as e: raise CLIError(f"Failed to connect to API: {e}") - - filtered_logs = ( - _filter_logs(log_entries, grep_pattern) if grep_pattern else log_entries - ) - + + filtered_logs = _filter_logs(log_entries, grep_pattern) if grep_pattern else log_entries + if not filtered_logs: console.print("[yellow]No logs found matching the criteria[/yellow]") return - + _display_logs(filtered_logs, title=f"Logs for {app_id or config_id}", format=format) + async def _stream_logs( app_id: Optional[str], config_id: Optional[str], - server_url: Optional[str], + server_url: Optional[str], credentials: UserCredentials, grep_pattern: Optional[str], app_identifier: str, format: str, ) -> None: """Stream logs continuously via SSE.""" - + if not server_url: server_url = await resolve_server_url(app_id, config_id, credentials) - + parsed = urlparse(server_url) stream_url = f"{parsed.scheme}://{parsed.netloc}/logs" hostname = parsed.hostname or "" - deployment_id = hostname.split(".")[0] if "." in hostname else hostname - + deployment_id = hostname.split('.')[0] if '.' in hostname else hostname + headers = { "Accept": "text/event-stream", "Cache-Control": "no-cache", "X-Routing-Key": deployment_id, } - + if credentials.api_key: headers["Authorization"] = f"Bearer {credentials.api_key}" - - console.print( - f"[blue]Streaming logs from {app_identifier} (Press Ctrl+C to stop)[/blue]" - ) - + + console.print(f"[blue]Streaming logs from {app_identifier} (Press Ctrl+C to stop)[/blue]") + # Setup signal handler for graceful shutdown def signal_handler(signum, frame): console.print("\n[yellow]Stopping log stream...[/yellow]") sys.exit(0) - + signal.signal(signal.SIGINT, signal_handler) - + try: async with httpx.AsyncClient(timeout=None) as client: async with client.stream("GET", stream_url, headers=headers) as response: + if response.status_code == 401: - raise CLIError( - "Authentication failed. Try running 'mcp-agent login'" - ) + raise CLIError("Authentication failed. Try running 'mcp-agent login'") elif response.status_code == 404: raise CLIError("Log stream not found for the specified app") elif response.status_code != 200: - raise CLIError( - f"Failed to connect to log stream: {response.status_code}" - ) - + raise CLIError(f"Failed to connect to log stream: {response.status_code}") + console.print("[green]✓ Connected to log stream[/green]\n") - + buffer = "" async for chunk in response.aiter_text(): buffer += chunk - lines = buffer.split("\n") - + lines = buffer.split('\n') + for line in lines[:-1]: - if line.startswith("data:"): - data_content = line.removeprefix("data:") - + if line.startswith('data:'): + data_content = line.removeprefix('data:') + try: log_data = json.loads(data_content) - - if "message" in log_data: - timestamp = log_data.get("time") + + if 'message' in log_data: + timestamp = log_data.get('time') if timestamp: - formatted_timestamp = ( - _convert_timestamp_to_local(timestamp) - ) + formatted_timestamp = _convert_timestamp_to_local(timestamp) else: formatted_timestamp = datetime.now().isoformat() - + log_entry = { - "timestamp": formatted_timestamp, - "message": log_data["message"], - "level": log_data.get("level", "INFO"), + 'timestamp': formatted_timestamp, + 'message': log_data['message'], + 'level': log_data.get('level', 'INFO') } - - if not grep_pattern or _matches_pattern( - log_entry["message"], grep_pattern - ): + + if not grep_pattern or _matches_pattern(log_entry['message'], grep_pattern): _display_log_entry(log_entry, format=format) - + except json.JSONDecodeError: # Skip malformed JSON continue - + except httpx.RequestError as e: raise CLIError(f"Failed to connect to log stream: {e}") -def _filter_logs( - log_entries: List[Dict[str, Any]], pattern: str -) -> List[Dict[str, Any]]: + + +def _filter_logs(log_entries: List[Dict[str, Any]], pattern: str) -> List[Dict[str, Any]]: """Filter log entries by pattern.""" if not pattern: return log_entries - + try: regex = re.compile(pattern, re.IGNORECASE) - return [ - entry for entry in log_entries if regex.search(entry.get("message", "")) - ] + return [entry for entry in log_entries if regex.search(entry.get('message', ''))] except re.error: - return [ - entry - for entry in log_entries - if pattern.lower() in entry.get("message", "").lower() - ] + return [entry for entry in log_entries if pattern.lower() in entry.get('message', '').lower()] def _matches_pattern(message: str, pattern: str) -> bool: @@ -400,21 +361,21 @@ def _matches_pattern(message: str, pattern: str) -> bool: def _clean_log_entry(entry: Dict[str, Any]) -> Dict[str, Any]: """Clean up a log entry for structured output formats.""" cleaned_entry = entry.copy() - cleaned_entry["severity"] = _parse_log_level(entry.get("level", "INFO")) - cleaned_entry["message"] = _clean_message(entry.get("message", "")) - cleaned_entry.pop("level", None) + cleaned_entry['severity'] = _parse_log_level(entry.get('level', 'INFO')) + cleaned_entry['message'] = _clean_message(entry.get('message', '')) + cleaned_entry.pop('level', None) return cleaned_entry def _display_text_log_entry(entry: Dict[str, Any]) -> None: """Display a single log entry in text format.""" - timestamp = _format_timestamp(entry.get("timestamp", "")) - raw_level = entry.get("level", "INFO") + timestamp = _format_timestamp(entry.get('timestamp', '')) + raw_level = entry.get('level', 'INFO') level = _parse_log_level(raw_level) - message = _clean_message(entry.get("message", "")) - + message = _clean_message(entry.get('message', '')) + level_style = _get_level_style(level) - + console.print( f"[bright_black not bold]{timestamp}[/bright_black not bold] " f"[{level_style}]{level:7}[/{level_style}] " @@ -422,13 +383,11 @@ def _display_text_log_entry(entry: Dict[str, Any]) -> None: ) -def _display_logs( - log_entries: List[Dict[str, Any]], title: str = "Logs", format: str = "text" -) -> None: +def _display_logs(log_entries: List[Dict[str, Any]], title: str = "Logs", format: str = "text") -> None: """Display logs in the specified format.""" if not log_entries: return - + if format == "json": cleaned_entries = [_clean_log_entry(entry) for entry in log_entries] print(json.dumps(cleaned_entries, indent=2)) @@ -438,7 +397,7 @@ def _display_logs( else: # text format (default) if title: console.print(f"[bold blue]{title}[/bold blue]\n") - + for entry in log_entries: _display_text_log_entry(entry) @@ -467,20 +426,20 @@ def _format_timestamp(timestamp_str: str) -> str: try: if timestamp_str: # Parse UTC timestamp and convert to local time - dt_utc = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + dt_utc = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) dt_local = dt_utc.astimezone() - return dt_local.strftime("%H:%M:%S") - return datetime.now().strftime("%H:%M:%S") + return dt_local.strftime('%H:%M:%S') + return datetime.now().strftime('%H:%M:%S') except (ValueError, TypeError): return timestamp_str[:8] if len(timestamp_str) >= 8 else timestamp_str def _parse_log_level(level: str) -> str: """Parse log level from API format to clean display format.""" - if level.startswith("LOG_LEVEL_"): - clean_level = level.replace("LOG_LEVEL_", "") - if clean_level == "UNSPECIFIED": - return "UNKNOWN" + if level.startswith('LOG_LEVEL_'): + clean_level = level.replace('LOG_LEVEL_', '') + if clean_level == 'UNSPECIFIED': + return 'UNKNOWN' return clean_level return level.upper() @@ -488,36 +447,29 @@ def _parse_log_level(level: str) -> str: def _clean_message(message: str) -> str: """Remove redundant log level prefix from message if present.""" prefixes = [ - "ERROR:", - "WARNING:", - "INFO:", - "DEBUG:", - "TRACE:", - "WARN:", - "FATAL:", - "UNKNOWN:", - "UNSPECIFIED:", + 'ERROR:', 'WARNING:', 'INFO:', 'DEBUG:', 'TRACE:', + 'WARN:', 'FATAL:', 'UNKNOWN:', 'UNSPECIFIED:' ] - + for prefix in prefixes: if message.startswith(prefix): - return message[len(prefix) :].lstrip() - + return message[len(prefix):].lstrip() + return message def _get_level_style(level: str) -> str: """Get Rich style for log level.""" level = level.upper() - if level in ["ERROR", "FATAL"]: + if level in ['ERROR', 'FATAL']: return "red bold" - elif level in ["WARN", "WARNING"]: + elif level in ['WARN', 'WARNING']: return "yellow bold" - elif level == "INFO": + elif level == 'INFO': return "blue" - elif level in ["DEBUG", "TRACE"]: + elif level in ['DEBUG', 'TRACE']: return "dim" - elif level in ["UNKNOWN", "UNSPECIFIED"]: + elif level in ['UNKNOWN', 'UNSPECIFIED']: return "magenta" else: return "white" diff --git a/src/mcp_agent/cli/cloud/commands/logger/utils.py b/src/mcp_agent/cli/cloud/commands/logger/utils.py deleted file mode 100644 index 3e05bad20..000000000 --- a/src/mcp_agent/cli/cloud/commands/logger/utils.py +++ /dev/null @@ -1,100 +0,0 @@ -"""Utility functions for logger commands.""" - -from typing import Optional, Tuple - -import httpx - -from mcp_agent.cli.exceptions import CLIError -from mcp_agent.cli.auth import UserCredentials -from mcp_agent.cli.core.constants import DEFAULT_API_BASE_URL - - -def parse_app_identifier( - identifier: str, -) -> Tuple[Optional[str], Optional[str], Optional[str]]: - """Parse app identifier to extract app ID, config ID, and server URL.""" - - # Check if it's a URL - if identifier.startswith(("http://", "https://")): - return None, None, identifier - - # Check if it's an MCPAppConfig ID (starts with apcnf_) - if identifier.startswith("apcnf_"): - return None, identifier, None - - # Check if it's an MCPApp ID (starts with app_) - if identifier.startswith("app_"): - return identifier, None, None - - # If no specific prefix, assume it's an app ID for backward compatibility - return identifier, None, None - - -async def resolve_server_url( - app_id: Optional[str], - config_id: Optional[str], - credentials: UserCredentials, -) -> str: - """Resolve server URL from app ID or configuration ID.""" - - if not app_id and not config_id: - raise CLIError("Either app_id or config_id must be provided") - - # Determine the endpoint and payload based on identifier type - if app_id: - endpoint = "/mcp_app/get_app" - payload = {"app_id": app_id} - response_key = "app" - not_found_msg = f"App '{app_id}' not found" - not_deployed_msg = f"App '{app_id}' is not deployed yet" - no_url_msg = f"No server URL found for app '{app_id}'" - offline_msg = f"App '{app_id}' server is offline" - api_error_msg = "Failed to get app info" - else: # config_id - endpoint = "/mcp_app/get_app_configuration" - payload = {"app_configuration_id": config_id} - response_key = "appConfiguration" - not_found_msg = f"App configuration '{config_id}' not found" - not_deployed_msg = f"App configuration '{config_id}' is not deployed yet" - no_url_msg = f"No server URL found for app configuration '{config_id}'" - offline_msg = f"App configuration '{config_id}' server is offline" - api_error_msg = "Failed to get app configuration" - - api_base = DEFAULT_API_BASE_URL - headers = { - "Authorization": f"Bearer {credentials.api_key}", - "Content-Type": "application/json", - } - - try: - async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.post( - f"{api_base}{endpoint}", json=payload, headers=headers - ) - - if response.status_code == 404: - raise CLIError(not_found_msg) - elif response.status_code != 200: - raise CLIError( - f"{api_error_msg}: {response.status_code} {response.text}" - ) - - data = response.json() - resource_info = data.get(response_key, {}) - server_info = resource_info.get("appServerInfo") - - if not server_info: - raise CLIError(not_deployed_msg) - - server_url = server_info.get("serverUrl") - if not server_url: - raise CLIError(no_url_msg) - - status = server_info.get("status", "APP_SERVER_STATUS_UNSPECIFIED") - if status == "APP_SERVER_STATUS_OFFLINE": - raise CLIError(offline_msg) - - return server_url - - except httpx.RequestError as e: - raise CLIError(f"Failed to connect to API: {e}") diff --git a/src/mcp_agent/cli/cloud/commands/servers/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/__init__.py new file mode 100644 index 000000000..907613d43 --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/servers/__init__.py @@ -0,0 +1,11 @@ +"""Server management commands for MCP Agent Cloud.""" + +from .list.main import list_servers +from .describe.main import describe_server +from .delete.main import delete_server + +__all__ = [ + "list_servers", + "describe_server", + "delete_server", +] \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/commands/servers/delete/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/delete/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/mcp_agent/cli/cloud/commands/servers/delete/main.py b/src/mcp_agent/cli/cloud/commands/servers/delete/main.py new file mode 100644 index 000000000..58ea37f0c --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/servers/delete/main.py @@ -0,0 +1,78 @@ + +import typer +from rich.panel import Panel + +from mcp_agent.cli.core.utils import run_async +from mcp_agent.cli.exceptions import CLIError +from mcp_agent.cli.mcp_app.api_client import MCPApp +from ..utils import ( + setup_authenticated_client, + resolve_server, + handle_server_api_errors, + get_server_name, + get_server_id, +) +from mcp_agent.cli.utils.ux import console, print_info + + +@handle_server_api_errors +def delete_server( + id_or_url: str = typer.Argument(..., help="Server ID or URL to delete"), + force: bool = typer.Option(False, "--force", "-f", help="Force deletion without confirmation prompt"), +) -> None: + """Delete a specific MCP Server.""" + client = setup_authenticated_client() + server = resolve_server(client, id_or_url) + + # Determine server type and delete function + if isinstance(server, MCPApp): + server_type = "Deployed Server" + delete_function = client.delete_app + else: + server_type = "Configured Server" + delete_function = client.delete_app_configuration + + server_name = get_server_name(server) + server_id = get_server_id(server) + + if not force: + console.print( + Panel( + f"Name: [cyan]{server_name}[/cyan]\n" + f"Type: [cyan]{server_type}[/cyan]\n" + f"ID: [cyan]{server_id}[/cyan]\n\n" + f"[bold red]⚠️ This action cannot be undone![/bold red]", + title="Server to Delete", + border_style="red", + expand=False, + ) + ) + + confirm = typer.confirm(f"\nAre you sure you want to delete this {server_type.lower()}?") + if not confirm: + print_info("Deletion cancelled.") + return + + if isinstance(server, MCPApp): + can_delete = run_async(client.can_delete_app(server_id)) + else: + can_delete = run_async(client.can_delete_app_configuration(server_id)) + + if not can_delete: + raise CLIError( + f"You do not have permission to delete this {server_type.lower()}. " + f"You can only delete servers that you created." + ) + deleted_id = run_async(delete_function(server_id)) + + console.print( + Panel( + f"[green]✅ Successfully deleted {server_type.lower()}[/green]\n\n" + f"Name: [cyan]{server_name}[/cyan]\n" + f"ID: [cyan]{deleted_id}[/cyan]", + title="Deletion Complete", + border_style="green", + expand=False, + ) + ) + diff --git a/src/mcp_agent/cli/cloud/commands/servers/describe/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/describe/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/mcp_agent/cli/cloud/commands/servers/describe/main.py b/src/mcp_agent/cli/cloud/commands/servers/describe/main.py new file mode 100644 index 000000000..bc06f2897 --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/servers/describe/main.py @@ -0,0 +1,164 @@ +import json +from typing import Optional, Union + +import typer +import yaml +from rich.panel import Panel + +from mcp_agent.cli.exceptions import CLIError +from mcp_agent.cli.mcp_app.api_client import MCPApp, MCPAppConfiguration +from ..utils import ( + setup_authenticated_client, + validate_output_format, + resolve_server, + handle_server_api_errors, + clean_server_status, +) +from mcp_agent.cli.utils.ux import console + + +@handle_server_api_errors +def describe_server( + id_or_url: str = typer.Argument(..., help="Server ID or URL to describe"), + format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"), +) -> None: + """Describe a specific MCP Server.""" + validate_output_format(format) + client = setup_authenticated_client() + server = resolve_server(client, id_or_url) + print_server_description(server, format) + + +def print_server_description(server: Union[MCPApp, MCPAppConfiguration], output_format: str = "text") -> None: + """Print detailed description information for a server.""" + + valid_formats = ["text", "json", "yaml"] + if output_format not in valid_formats: + raise CLIError(f"Invalid format '{output_format}'. Valid options are: {', '.join(valid_formats)}") + + if output_format == "json": + _print_server_json(server) + elif output_format == "yaml": + _print_server_yaml(server) + else: + _print_server_text(server) + + +def _print_server_json(server: Union[MCPApp, MCPAppConfiguration]) -> None: + """Print server in JSON format.""" + server_data = _server_to_dict(server) + print(json.dumps(server_data, indent=2, default=str)) + + +def _print_server_yaml(server: Union[MCPApp, MCPAppConfiguration]) -> None: + """Print server in YAML format.""" + server_data = _server_to_dict(server) + print(yaml.dump(server_data, default_flow_style=False)) + + +def _server_to_dict(server: Union[MCPApp, MCPAppConfiguration]) -> dict: + """Convert server to dictionary.""" + if isinstance(server, MCPApp): + server_type = "deployed" + server_id = server.appId + server_name = server.name + server_description = server.description + created_at = server.createdAt + server_info = server.appServerInfo + underlying_app = None + else: + server_type = "configured" + server_id = server.appConfigurationId + server_name = server.app.name if server.app else "Unnamed" + server_description = server.app.description if server.app else None + created_at = server.createdAt + server_info = server.appServerInfo + underlying_app = { + "app_id": server.app.appId, + "name": server.app.name + } if server.app else None + + status_raw = server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" + server_url = server_info.serverUrl if server_info else None + + data = { + "id": server_id, + "name": server_name, + "type": server_type, + "status": clean_server_status(status_raw), + "server_url": server_url, + "description": server_description, + "created_at": created_at.isoformat() if created_at else None + } + + if underlying_app: + data["underlying_app"] = underlying_app + + return data + + + + +def _print_server_text(server: Union[MCPApp, MCPAppConfiguration]) -> None: + """Print server in text format.""" + if isinstance(server, MCPApp): + server_type = "Deployed Server" + server_id = server.appId + server_name = server.name + server_description = server.description + created_at = server.createdAt + server_info = server.appServerInfo + else: + server_type = "Configured Server" + server_id = server.appConfigurationId + server_name = server.app.name if server.app else "Unnamed" + server_description = server.app.description if server.app else None + created_at = server.createdAt + server_info = server.appServerInfo + + status_text = "❓ Unknown" + server_url = "N/A" + + if server_info: + status_text = _server_status_text(server_info.status) + server_url = server_info.serverUrl + content_lines = [ + f"Name: [cyan]{server_name}[/cyan]", + f"Type: [cyan]{server_type}[/cyan]", + f"ID: [cyan]{server_id}[/cyan]", + f"Status: {status_text}", + f"Server URL: [cyan]{server_url}[/cyan]", + ] + + if server_description: + content_lines.append(f"Description: [cyan]{server_description}[/cyan]") + + if created_at: + content_lines.append(f"Created: [cyan]{created_at.strftime('%Y-%m-%d %H:%M:%S')}[/cyan]") + + if isinstance(server, MCPAppConfiguration) and server.app: + content_lines.extend([ + "", + "[bold]Underlying App:[/bold]", + f" App ID: [cyan]{server.app.appId}[/cyan]", + f" App Name: [cyan]{server.app.name}[/cyan]", + ]) + + console.print( + Panel( + "\n".join(content_lines), + title="Server Description", + border_style="blue", + expand=False, + ) + ) + + +def _server_status_text(status: str) -> str: + """Convert server status code to emoji and text.""" + if status == "APP_SERVER_STATUS_ONLINE": + return "[green]🟢 Active[/green]" + elif status == "APP_SERVER_STATUS_OFFLINE": + return "[red]🔴 Offline[/red]" + else: + return "❓ Unknown" \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/commands/servers/list/__init__.py b/src/mcp_agent/cli/cloud/commands/servers/list/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/mcp_agent/cli/cloud/commands/servers/list/main.py b/src/mcp_agent/cli/cloud/commands/servers/list/main.py new file mode 100644 index 000000000..1ea27740c --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/servers/list/main.py @@ -0,0 +1,335 @@ +import asyncio +import json +from typing import List, Optional, Union + +import typer +import yaml +from rich.panel import Panel + +from mcp_agent.cli.core.utils import run_async +from mcp_agent.cli.mcp_app.api_client import MCPApp, MCPAppConfiguration +from ..utils import ( + setup_authenticated_client, + validate_output_format, + handle_server_api_errors, + clean_server_status, +) +from mcp_agent.cli.utils.ux import console, print_info +from datetime import datetime + + +@handle_server_api_errors +def list_servers( + limit: Optional[int] = typer.Option(None, "--limit", help="Maximum number of results to return"), + filter: Optional[str] = typer.Option(None, "--filter", help="Filter by name, description, or status (case-insensitive)"), + sort_by: Optional[str] = typer.Option(None, "--sort-by", help="Sort by field: name, created, status (prefix with - for reverse)"), + format: Optional[str] = typer.Option("text", "--format", help="Output format (text|json|yaml)"), +) -> None: + """List MCP Servers with optional filtering and sorting. + + Examples: + # Filter servers containing 'api' + mcp-agent cloud servers list --filter api + + # Sort by creation date (newest first) + mcp-agent cloud servers list --sort-by -created + + # Filter active servers and sort by name + mcp-agent cloud servers list --filter active --sort-by name + + # Get JSON output with filtering + mcp-agent cloud servers list --filter production --format json + """ + validate_output_format(format) + client = setup_authenticated_client() + + # Use limit or default + max_results = limit or 100 + + async def parallel_requests(): + return await asyncio.gather( + client.list_apps(max_results=max_results), + client.list_app_configurations(max_results=max_results), + ) + + list_apps_res, list_app_configs_res = run_async(parallel_requests()) + + # Apply client-side filtering and sorting + filtered_deployed = _apply_filter(list_apps_res.apps, filter) if filter else list_apps_res.apps + filtered_configured = _apply_filter(list_app_configs_res.appConfigurations, filter) if filter else list_app_configs_res.appConfigurations + + sorted_deployed = _apply_sort(filtered_deployed, sort_by) if sort_by else filtered_deployed + sorted_configured = _apply_sort(filtered_configured, sort_by) if sort_by else filtered_configured + + if format == "json": + _print_servers_json(sorted_deployed, sorted_configured) + elif format == "yaml": + _print_servers_yaml(sorted_deployed, sorted_configured) + else: + _print_servers_text(sorted_deployed, sorted_configured, filter, sort_by) + + + +def _apply_filter(servers: List[Union[MCPApp, MCPAppConfiguration]], filter_expr: str) -> List[Union[MCPApp, MCPAppConfiguration]]: + """Apply client-side filtering to servers.""" + if not filter_expr: + return servers + + filtered_servers = [] + # Support basic filtering by name, status, description + filter_lower = filter_expr.lower() + + for server in servers: + # Get server attributes for filtering + try: + if isinstance(server, MCPApp): + name = server.name or "" + description = server.description or "" + status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + elif hasattr(server, 'app'): # MCPAppConfiguration + name = server.app.name if server.app else "" + description = server.app.description if server.app else "" + status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + else: # Fallback for other types (like test mocks) + name = getattr(server, 'name', '') or "" + description = getattr(server, 'description', '') or "" + server_info = getattr(server, 'appServerInfo', None) + status = server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" + except Exception: + # Skip servers that can't be processed + continue + + # Clean status for filtering + clean_status = clean_server_status(status).lower() + + # Check if filter matches name, description, or status + if (filter_lower in name.lower() or + filter_lower in description.lower() or + filter_lower in clean_status): + filtered_servers.append(server) + + return filtered_servers + + +def _apply_sort(servers: List[Union[MCPApp, MCPAppConfiguration]], sort_field: str) -> List[Union[MCPApp, MCPAppConfiguration]]: + """Apply client-side sorting to servers.""" + if not sort_field: + return servers + + # Normalize sort field + sort_field_lower = sort_field.lower() + reverse = False + + # Support reverse sorting with - prefix + if sort_field_lower.startswith('-'): + reverse = True + sort_field_lower = sort_field_lower[1:] + + def get_sort_key(server): + try: + if isinstance(server, MCPApp): + name = server.name or "" + created_at = server.createdAt + status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + elif hasattr(server, 'app'): # MCPAppConfiguration + name = server.app.name if server.app else "" + created_at = server.createdAt + status = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + else: # Fallback for other types (like test mocks) + name = getattr(server, 'name', '') or "" + created_at = getattr(server, 'createdAt', None) + server_info = getattr(server, 'appServerInfo', None) + status = server_info.status if server_info else "APP_SERVER_STATUS_OFFLINE" + except Exception: + # Return default values for sorting if server can't be processed + name = "" + created_at = None + status = "APP_SERVER_STATUS_OFFLINE" + + if sort_field_lower == 'name': + return name.lower() + elif sort_field_lower in ['created', 'created_at', 'date']: + return created_at or datetime.min.replace(tzinfo=None if created_at is None else created_at.tzinfo) + elif sort_field_lower == 'status': + return clean_server_status(status).lower() + else: + # Default to name if sort field not recognized + return name.lower() + + try: + return sorted(servers, key=get_sort_key, reverse=reverse) + except Exception: + # If sorting fails, return original list + return servers + + +def _print_servers_text(deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration], filter_param: Optional[str], sort_by: Optional[str]) -> None: + """Print servers in text format.""" + print_info_header() + + # Display deployed servers + if deployed_servers: + num_servers = len(deployed_servers) + print_info(f"Found {num_servers} deployed server(s):") + print_servers(deployed_servers) + else: + console.print("\n[bold blue]🖥️ Deployed MCP Servers (0)[/bold blue]") + print_info("No deployed servers found.") + + console.print("\n" + "─" * 80 + "\n") + + # Display configured servers + if configured_servers: + num_configs = len(configured_servers) + print_info(f"Found {num_configs} configured server(s):") + print_server_configs(configured_servers) + else: + console.print("\n[bold blue]⚙️ Configured MCP Servers (0)[/bold blue]") + print_info("No configured servers found.") + + if filter_param or sort_by: + console.print(f"\n[dim]Applied filters: filter={filter_param or 'None'}, sort-by={sort_by or 'None'}[/dim]") + filter_desc = f"filter='{filter_param}'" if filter_param else "filter=None" + sort_desc = f"sort-by='{sort_by}'" if sort_by else "sort-by=None" + print_info(f"Client-side {filter_desc}, {sort_desc}. Sort fields: name, created, status (-prefix for reverse).") + + +def _print_servers_json(deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration]) -> None: + """Print servers in JSON format.""" + deployed_data = [_server_to_dict(server) for server in deployed_servers] + configured_data = [_server_config_to_dict(config) for config in configured_servers] + + output = { + "deployed_servers": deployed_data, + "configured_servers": configured_data + } + print(json.dumps(output, indent=2, default=str)) + + +def _print_servers_yaml(deployed_servers: List[MCPApp], configured_servers: List[MCPAppConfiguration]) -> None: + """Print servers in YAML format.""" + deployed_data = [_server_to_dict(server) for server in deployed_servers] + configured_data = [_server_config_to_dict(config) for config in configured_servers] + + output = { + "deployed_servers": deployed_data, + "configured_servers": configured_data + } + print(yaml.dump(output, default_flow_style=False)) + + +def _server_to_dict(server: MCPApp) -> dict: + """Convert MCPApp to dictionary.""" + status_raw = server.appServerInfo.status if server.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + return { + "id": server.appId, + "name": server.name or "Unnamed", + "description": server.description, + "status": clean_server_status(status_raw), + "server_url": server.appServerInfo.serverUrl if server.appServerInfo else None, + "creator_id": server.creatorId, + "created_at": server.createdAt.isoformat() if server.createdAt else None, + "type": "deployed" + } + + +def _server_config_to_dict(config: MCPAppConfiguration) -> dict: + """Convert MCPAppConfiguration to dictionary.""" + status_raw = config.appServerInfo.status if config.appServerInfo else "APP_SERVER_STATUS_OFFLINE" + return { + "config_id": config.appConfigurationId, + "app_id": config.app.appId if config.app else None, + "name": config.app.name if config.app else "Unnamed", + "description": config.app.description if config.app else None, + "status": clean_server_status(status_raw), + "server_url": config.appServerInfo.serverUrl if config.appServerInfo else None, + "creator_id": config.creatorId, + "created_at": config.createdAt.isoformat() if config.createdAt else None, + "type": "configured" + } + + + + +def print_info_header() -> None: + """Print a styled header explaining the following tables""" + console.print( + Panel( + "Deployed Servers: [cyan]MCP Servers which you have bundled and deployed, as a developer[/cyan]\n" + "Configured Servers: [cyan]MCP Servers which you have configured to use with your MCP clients[/cyan]", + title="MCP Servers", + border_style="blue", + expand=False, + ) + ) + + +def print_servers(servers: List[MCPApp]) -> None: + """Print a list of deployed servers in a clean, copyable format.""" + console.print(f"\n[bold blue]🖥️ Deployed MCP Servers ({len(servers)})[/bold blue]") + + for i, server in enumerate(servers): + if i > 0: + console.print() + + status = _server_status_text( + server.appServerInfo.status + if server.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) + + console.print(f"[bold cyan]{server.name or 'Unnamed'}[/bold cyan] {status}") + console.print(f" Server ID: {server.appId}") + + if server.appServerInfo and server.appServerInfo.serverUrl: + console.print(f" Server URL: {server.appServerInfo.serverUrl}") + + if server.description: + console.print(f" Description: {server.description}") + + console.print(f" Created: {server.createdAt.strftime('%Y-%m-%d %H:%M:%S')}") + + +def print_server_configs(server_configs: List[MCPAppConfiguration]) -> None: + """Print a list of configured servers in a clean, copyable format.""" + console.print( + f"\n[bold blue]⚙️ Configured MCP Servers ({len(server_configs)})[/bold blue]" + ) + + for i, config in enumerate(server_configs): + if i > 0: + console.print() + + status = _server_status_text( + config.appServerInfo.status + if config.appServerInfo + else "APP_SERVER_STATUS_OFFLINE" + ) + + console.print( + f"[bold cyan]{config.app.name if config.app else 'Unnamed'}[/bold cyan] {status}" + ) + console.print(f" Config ID: {config.appConfigurationId}") + + if config.app: + console.print(f" Server ID: {config.app.appId}") + if config.app.description: + console.print(f" Description: {config.app.description}") + + if config.appServerInfo and config.appServerInfo.serverUrl: + console.print(f" Server URL: {config.appServerInfo.serverUrl}") + + if config.createdAt: + console.print( + f" Created: {config.createdAt.strftime('%Y-%m-%d %H:%M:%S')}" + ) + + +def _server_status_text(status: str) -> str: + """Convert server status code to emoji.""" + if status == "APP_SERVER_STATUS_ONLINE": + return "[green]🟢 Active[/green]" + elif status == "APP_SERVER_STATUS_OFFLINE": + return "[red]🔴 Offline[/red]" + else: + return "❓ Unknown" \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/commands/servers/utils.py b/src/mcp_agent/cli/cloud/commands/servers/utils.py new file mode 100644 index 000000000..380150b73 --- /dev/null +++ b/src/mcp_agent/cli/cloud/commands/servers/utils.py @@ -0,0 +1,152 @@ +"""Shared utilities for server commands.""" + +from functools import wraps +from typing import Union + +from mcp_agent.cli.auth import load_api_key_credentials +from mcp_agent.cli.core.api_client import UnauthenticatedError +from mcp_agent.cli.core.constants import DEFAULT_API_BASE_URL +from mcp_agent.cli.core.utils import parse_app_identifier, run_async +from mcp_agent.cli.exceptions import CLIError +from mcp_agent.cli.mcp_app.api_client import ( + MCPApp, + MCPAppClient, + MCPAppConfiguration, +) + + +def setup_authenticated_client() -> MCPAppClient: + """Setup authenticated MCP App client. + + Returns: + Configured MCPAppClient instance + + Raises: + CLIError: If authentication fails + """ + effective_api_key = load_api_key_credentials() + + if not effective_api_key: + raise CLIError( + "Must be logged in to access servers. Run 'mcp-agent login'." + ) + + return MCPAppClient( + api_url=DEFAULT_API_BASE_URL, api_key=effective_api_key + ) + + +def validate_output_format(format: str) -> None: + """Validate output format parameter. + + Args: + format: Output format to validate + + Raises: + CLIError: If format is invalid + """ + valid_formats = ["text", "json", "yaml"] + if format not in valid_formats: + raise CLIError(f"Invalid format '{format}'. Valid options are: {', '.join(valid_formats)}") + + +def resolve_server(client: MCPAppClient, id_or_url: str) -> Union[MCPApp, MCPAppConfiguration]: + """Resolve server from ID or URL. + + Args: + client: Authenticated MCP App client + id_or_url: Server identifier (ID, config ID, or URL) + + Returns: + Server object (MCPApp or MCPAppConfiguration) + + Raises: + CLIError: If server resolution fails + """ + try: + app_id, config_id, server_url = parse_app_identifier(id_or_url) + + if server_url: + return run_async(client.get_app(server_url=server_url)) + elif config_id: + return run_async(client.get_app_configuration(app_config_id=config_id)) + else: + return run_async(client.get_app(app_id=app_id)) + + except Exception as e: + raise CLIError(f"Failed to resolve server '{id_or_url}': {str(e)}") from e + + +def handle_server_api_errors(func): + """Decorator to handle common API errors for server commands. + + Args: + func: Function to wrap with error handling + + Returns: + Wrapped function with error handling + """ + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except UnauthenticatedError as e: + raise CLIError( + "Invalid API key. Run 'mcp-agent login' or set MCP_API_KEY environment variable with new API key." + ) from e + except CLIError: + # Re-raise CLIErrors as-is + raise + except Exception as e: + # Get the original function name for better error messages + func_name = func.__name__.replace('_', ' ') + raise CLIError(f"Error in {func_name}: {str(e)}") from e + + return wrapper + + +def get_server_name(server: Union[MCPApp, MCPAppConfiguration]) -> str: + """Get display name for a server. + + Args: + server: Server object + + Returns: + Server display name + """ + if isinstance(server, MCPApp): + return server.name or "Unnamed" + else: + return server.app.name if server.app else "Unnamed" + + +def get_server_id(server: Union[MCPApp, MCPAppConfiguration]) -> str: + """Get ID for a server. + + Args: + server: Server object + + Returns: + Server ID + """ + if isinstance(server, MCPApp): + return server.appId + else: + return server.appConfigurationId + + +def clean_server_status(status: str) -> str: + """Convert server status from API format to clean format. + + Args: + status: API status string + + Returns: + Clean status string + """ + if status == "APP_SERVER_STATUS_ONLINE": + return "active" + elif status == "APP_SERVER_STATUS_OFFLINE": + return "offline" + else: + return "unknown" \ No newline at end of file diff --git a/src/mcp_agent/cli/cloud/main.py b/src/mcp_agent/cli/cloud/main.py index 392bec91d..b09beaf7d 100644 --- a/src/mcp_agent/cli/cloud/main.py +++ b/src/mcp_agent/cli/cloud/main.py @@ -13,13 +13,7 @@ from rich.panel import Panel from typer.core import TyperGroup -from mcp_agent.cli.cloud.commands import ( - configure_app, - deploy_config, - login, - logout, - whoami, -) +from mcp_agent.cli.cloud.commands import configure_app, deploy_config, login, logout, whoami from mcp_agent.cli.cloud.commands.logger import tail_logs from mcp_agent.cli.cloud.commands.app import ( delete_app, @@ -28,6 +22,11 @@ ) from mcp_agent.cli.cloud.commands.apps import list_apps from mcp_agent.cli.cloud.commands.workflow import get_workflow_status +from mcp_agent.cli.cloud.commands.servers import ( + list_servers, + describe_server, + delete_server, +) from mcp_agent.cli.exceptions import CLIError from mcp_agent.cli.utils.ux import print_error @@ -141,6 +140,20 @@ def invoke(self, ctx): app_cmd_workflow.command(name="status")(get_workflow_status) app.add_typer(app_cmd_workflow, name="workflow", help="Manage MCP Workflows") +# Sub-typer for `mcp-agent servers` commands +app_cmd_servers = typer.Typer( + help="Management commands for MCP Servers", + no_args_is_help=True, + cls=HelpfulTyperGroup, +) +app_cmd_servers.command(name="list")(list_servers) +app_cmd_servers.command(name="describe")(describe_server) +app_cmd_servers.command(name="delete")(delete_server) +app.add_typer(app_cmd_servers, name="servers", help="Manage MCP Servers") + +# Alias for servers - apps should behave identically +app.add_typer(app_cmd_servers, name="apps", help="Manage MCP Apps (alias for servers)") + # Sub-typer for `mcp-agent cloud` commands app_cmd_cloud = typer.Typer( help="Cloud operations and management", @@ -179,9 +192,9 @@ def invoke(self, ctx): # Add sub-typers to cloud app_cmd_cloud.add_typer(app_cmd_cloud_auth, name="auth", help="Authentication commands") -app_cmd_cloud.add_typer( - app_cmd_cloud_logger, name="logger", help="Logging and observability" -) +app_cmd_cloud.add_typer(app_cmd_cloud_logger, name="logger", help="Logging and observability") +app_cmd_cloud.add_typer(app_cmd_servers, name="servers", help="Server management commands") +app_cmd_cloud.add_typer(app_cmd_servers, name="apps", help="App management commands (alias for servers)") # Register cloud commands app.add_typer(app_cmd_cloud, name="cloud", help="Cloud operations and management") # Top-level auth commands that map to cloud auth commands diff --git a/src/mcp_agent/cli/core/utils.py b/src/mcp_agent/cli/core/utils.py index 836769b28..b3409c111 100644 --- a/src/mcp_agent/cli/core/utils.py +++ b/src/mcp_agent/cli/core/utils.py @@ -1,4 +1,11 @@ import asyncio +from typing import Optional, Tuple + +import httpx + +from mcp_agent.cli.exceptions import CLIError +from mcp_agent.cli.auth import UserCredentials +from mcp_agent.cli.core.constants import DEFAULT_API_BASE_URL def run_async(coro): @@ -17,3 +24,83 @@ def run_async(coro): loop = asyncio.get_event_loop() return loop.run_until_complete(coro) raise + + +def parse_app_identifier(identifier: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """Parse app identifier to extract app ID, config ID, and server URL.""" + + if identifier.startswith(('http://', 'https://')): + return None, None, identifier + + if identifier.startswith('apcnf_'): + return None, identifier, None + + if identifier.startswith('app_'): + return identifier, None, None + + return identifier, None, None + + +async def resolve_server_url( + app_id: Optional[str], + config_id: Optional[str], + credentials: UserCredentials, +) -> str: + """Resolve server URL from app ID or configuration ID.""" + + if not app_id and not config_id: + raise CLIError("Either app_id or config_id must be provided") + + if app_id: + endpoint = "/mcp_app/get_app" + payload = {"app_id": app_id} + response_key = "app" + not_found_msg = f"App '{app_id}' not found" + not_deployed_msg = f"App '{app_id}' is not deployed yet" + no_url_msg = f"No server URL found for app '{app_id}'" + offline_msg = f"App '{app_id}' server is offline" + api_error_msg = "Failed to get app info" + else: + endpoint = "/mcp_app/get_app_configuration" + payload = {"app_configuration_id": config_id} + response_key = "appConfiguration" + not_found_msg = f"App configuration '{config_id}' not found" + not_deployed_msg = f"App configuration '{config_id}' is not deployed yet" + no_url_msg = f"No server URL found for app configuration '{config_id}'" + offline_msg = f"App configuration '{config_id}' server is offline" + api_error_msg = "Failed to get app configuration" + + api_base = DEFAULT_API_BASE_URL + headers = { + "Authorization": f"Bearer {credentials.api_key}", + "Content-Type": "application/json", + } + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post(f"{api_base}{endpoint}", json=payload, headers=headers) + + if response.status_code == 404: + raise CLIError(not_found_msg) + elif response.status_code != 200: + raise CLIError(f"{api_error_msg}: {response.status_code} {response.text}") + + data = response.json() + resource_info = data.get(response_key, {}) + server_info = resource_info.get("appServerInfo") + + if not server_info: + raise CLIError(not_deployed_msg) + + server_url = server_info.get("serverUrl") + if not server_url: + raise CLIError(no_url_msg) + + status = server_info.get("status", "APP_SERVER_STATUS_UNSPECIFIED") + if status == "APP_SERVER_STATUS_OFFLINE": + raise CLIError(offline_msg) + + return server_url + + except httpx.RequestError as e: + raise CLIError(f"Failed to connect to API: {e}") diff --git a/uv.lock b/uv.lock index 09b0c7340..63660bdb1 100644 --- a/uv.lock +++ b/uv.lock @@ -2040,7 +2040,7 @@ wheels = [ [[package]] name = "mcp-agent" -version = "0.1.17" +version = "0.1.16" source = { editable = "." } dependencies = [ { name = "aiohttp" },