- 
                Notifications
    
You must be signed in to change notification settings  - Fork 780
 
Add cloud auth #410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add cloud auth #410
Changes from all commits
a985f77
              2028710
              a914f66
              603e44b
              467c243
              71daae2
              47c9c99
              0a9e2db
              521eca9
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| """Constants for the MCP Agent auth utilities.""" | ||
| 
     | 
||
| # Default values | ||
| DEFAULT_CREDENTIALS_PATH = "~/.mcp_agent/cloud/auth/credentials" | ||
| DEFAULT_CREDENTIALS_PATH = "~/.mcp-agent/credentials.json" | ||
                
      
                  jtcorbett marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,32 +1,69 @@ | ||
| import json | ||
| import os | ||
| from typing import Optional | ||
| 
     | 
||
| from .constants import DEFAULT_CREDENTIALS_PATH | ||
| from .models import UserCredentials | ||
| 
     | 
||
| 
     | 
||
| def save_api_key_credentials(api_key: str): | ||
| """Save an API key to the credentials file. | ||
| def save_credentials(credentials: UserCredentials) -> None: | ||
| """Save user credentials to the credentials file. | ||
| 
     | 
||
| Args: | ||
| api_key: API key to persist | ||
| credentials: UserCredentials object to persist | ||
| 
     | 
||
| Returns: | ||
| None | ||
| """ | ||
| credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) | ||
| os.makedirs(os.path.dirname(credentials_path), exist_ok=True) | ||
| with open(credentials_path, "w", encoding="utf-8") as f: | ||
| f.write(api_key) | ||
| cred_dir = os.path.dirname(credentials_path) | ||
| os.makedirs(cred_dir, exist_ok=True) | ||
| try: | ||
| os.chmod(cred_dir, 0o700) | ||
| except OSError: | ||
| pass | ||
| 
     | 
||
| # Create file with restricted permissions (0600) to prevent leakage | ||
| fd = os.open(credentials_path, os.O_WRONLY | os.O_CREAT, 0o600) | ||
| with os.fdopen(fd, "w") as f: | ||
| f.write(credentials.to_json()) | ||
| 
     | 
||
| def load_api_key_credentials() -> Optional[str]: | ||
| """Load an API key from the credentials file. | ||
| 
     | 
||
| def load_credentials() -> Optional[UserCredentials]: | ||
| """Load user credentials from the credentials file. | ||
| 
     | 
||
| Returns: | ||
| String. API key if it exists, None otherwise | ||
| UserCredentials object if it exists, None otherwise | ||
| """ | ||
| credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) | ||
| if os.path.exists(credentials_path): | ||
| with open(credentials_path, "r", encoding="utf-8") as f: | ||
| return f.read().strip() | ||
| try: | ||
| with open(credentials_path, "r", encoding="utf-8") as f: | ||
| return UserCredentials.from_json(f.read()) | ||
| except (json.JSONDecodeError, KeyError, ValueError): | ||
| # Handle corrupted or old format credentials | ||
| return None | ||
| return None | ||
| 
     | 
||
                
      
                  jtcorbett marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
| def clear_credentials() -> bool: | ||
| """Clear stored credentials. | ||
| 
     | 
||
| Returns: | ||
| bool: True if credentials were cleared, False if none existed | ||
| """ | ||
| credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) | ||
| if os.path.exists(credentials_path): | ||
| os.remove(credentials_path) | ||
| return True | ||
| return False | ||
| 
     | 
||
| 
     | 
||
| def load_api_key_credentials() -> Optional[str]: | ||
| """Load an API key from the credentials file (backward compatibility). | ||
| 
     | 
||
| Returns: | ||
| String. API key if it exists, None otherwise | ||
| """ | ||
| credentials = load_credentials() | ||
| return credentials.api_key if credentials else None | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| """Authentication models for MCP Agent Cloud CLI.""" | ||
| 
     | 
||
| import json | ||
| from dataclasses import dataclass, field | ||
| from datetime import datetime | ||
| from typing import Optional | ||
| 
     | 
||
| 
     | 
||
| @dataclass | ||
| class UserCredentials: | ||
| """User authentication credentials and identity information.""" | ||
| 
     | 
||
| # Authentication | ||
| api_key: str = field(repr=False) | ||
| token_expires_at: Optional[datetime] = None | ||
| 
     | 
||
| # Identity | ||
| username: Optional[str] = None | ||
| email: Optional[str] = None | ||
| 
     | 
||
| @property | ||
| def is_token_expired(self) -> bool: | ||
| """Check if the token is expired.""" | ||
| if not self.token_expires_at: | ||
| return False | ||
| return datetime.now() > self.token_expires_at | ||
| 
     | 
||
                
      
                  jtcorbett marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| def to_dict(self) -> dict: | ||
| """Convert to dictionary for JSON serialization.""" | ||
| result = { | ||
| "api_key": self.api_key, | ||
| "username": self.username, | ||
| "email": self.email, | ||
| } | ||
| 
     | 
||
| if self.token_expires_at: | ||
| result["token_expires_at"] = self.token_expires_at.isoformat() | ||
| 
     | 
||
| return result | ||
| 
     | 
||
| @classmethod | ||
| def from_dict(cls, data: dict) -> "UserCredentials": | ||
| """Create from dictionary loaded from JSON.""" | ||
| 
     | 
||
| token_expires_at = None | ||
| if "token_expires_at" in data: | ||
| token_expires_at = datetime.fromisoformat(data["token_expires_at"]) | ||
| 
     | 
||
| return cls( | ||
| api_key=data["api_key"], | ||
| token_expires_at=token_expires_at, | ||
| username=data.get("username"), | ||
| email=data.get("email"), | ||
| ) | ||
| 
         
      Comment on lines
    
      +41
     to 
      +54
    
   
  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Harden ISO8601 parsing and normalize timezone. datetime.fromisoformat does not accept a trailing 'Z'; also normalize naive timestamps to UTC. Apply:      @classmethod
     def from_dict(cls, data: dict) -> "UserCredentials":
         """Create from dictionary loaded from JSON."""
 
-        token_expires_at = None
-        if "token_expires_at" in data:
-            token_expires_at = datetime.fromisoformat(data["token_expires_at"])
+        token_expires_at = None
+        raw = data.get("token_expires_at")
+        if raw:
+            ts = raw
+            if isinstance(ts, str) and ts.endswith("Z"):
+                ts = ts[:-1] + "+00:00"
+            try:
+                token_expires_at = datetime.fromisoformat(ts)
+                if token_expires_at.tzinfo is None:
+                    token_expires_at = token_expires_at.replace(tzinfo=timezone.utc)
+            except ValueError:
+                token_expires_at = None
 🤖 Prompt for AI Agents | 
||
| 
     | 
||
| def to_json(self) -> str: | ||
| """Convert to JSON string.""" | ||
| return json.dumps(self.to_dict(), indent=2) | ||
| 
     | 
||
| @classmethod | ||
| def from_json(cls, json_str: str) -> "UserCredentials": | ||
| """Create from JSON string.""" | ||
| data = json.loads(json_str) | ||
| return cls.from_dict(data) | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| """MCP Agent Cloud authentication commands.""" | ||
| 
     | 
||
| from .login import login | ||
| from .logout import logout | ||
| from .whoami import whoami | ||
| 
     | 
||
| __all__ = ["login", "logout", "whoami"] | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| import asyncio | ||
| from typing import Optional | ||
| 
     | 
||
| import typer | ||
| from rich.prompt import Confirm, Prompt | ||
| 
     | 
||
| from mcp_agent.cli.auth import ( | ||
| UserCredentials, | ||
| load_credentials, | ||
| save_credentials, | ||
| ) | ||
| from mcp_agent.cli.config import settings | ||
| from mcp_agent.cli.core.api_client import APIClient | ||
| from mcp_agent.cli.exceptions import CLIError | ||
| from mcp_agent.cli.utils.ux import ( | ||
| print_info, | ||
| print_success, | ||
| print_warning, | ||
| ) | ||
| 
     | 
||
| from .constants import DEFAULT_API_AUTH_PATH | ||
| 
     | 
||
| 
     | 
||
| def _load_user_credentials(api_key: str) -> UserCredentials: | ||
| """Load credentials with user profile data fetched from API. | ||
| Args: | ||
| api_key: The API key | ||
| Returns: | ||
| UserCredentials object with profile data if available | ||
| """ | ||
| 
     | 
||
| async def fetch_profile() -> UserCredentials: | ||
| """Fetch user profile from the API.""" | ||
| client = APIClient(settings.API_BASE_URL, api_key) | ||
| 
     | 
||
| response = await client.post("user/get_profile", {}) | ||
| user_data = response.json() | ||
| 
     | 
||
| user_profile = user_data.get("user", {}) | ||
| 
     | 
||
| return UserCredentials( | ||
| api_key=api_key, | ||
| username=user_profile.get("name"), | ||
| email=user_profile.get("email"), | ||
| ) | ||
| 
     | 
||
| try: | ||
| return asyncio.run(fetch_profile()) | ||
| except Exception as e: | ||
| print_warning(f"Could not fetch user profile: {str(e)}") | ||
| # Fallback to minimal credentials | ||
| return UserCredentials(api_key=api_key) | ||
| 
     | 
||
| 
     | 
||
| def login( | ||
| api_key: Optional[str] = typer.Option( | ||
| None, | ||
| "--api-key", | ||
| help="Optionally set an existing API key to use for authentication, bypassing manual login.", | ||
| envvar="MCP_API_KEY", | ||
| ), | ||
| no_open: bool = typer.Option( | ||
| False, | ||
| "--no-open", | ||
| help="Don't automatically open browser for authentication.", | ||
| ), | ||
| ) -> str: | ||
| """Authenticate to MCP Agent Cloud API. | ||
                
      
                  jtcorbett marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| Direct to the api keys page for obtaining credentials, routing through login. | ||
| Args: | ||
| api_key: Optionally set an existing API key to use for authentication, bypassing manual login. | ||
| no_open: Don't automatically open browser for authentication. | ||
| Returns: | ||
| API key string. Prints success message if login is successful. | ||
| """ | ||
| 
     | 
||
| existing_credentials = load_credentials() | ||
| if existing_credentials and not existing_credentials.is_token_expired: | ||
| if not Confirm.ask("You are already logged in. Do you want to login again?"): | ||
| print_info("Using existing credentials.") | ||
| return existing_credentials.api_key | ||
| 
     | 
||
| if api_key: | ||
| print_info("Using provided API key for authentication (MCP_API_KEY).") | ||
| if not _is_valid_api_key(api_key): | ||
| raise CLIError("Invalid API key provided.") | ||
| 
     | 
||
| credentials = _load_user_credentials(api_key) | ||
| 
     | 
||
| save_credentials(credentials) | ||
| print_success("API key set.") | ||
| if credentials.username: | ||
| print_info(f"Logged in as: {credentials.username}") | ||
| return api_key | ||
| 
     | 
||
| base_url = settings.API_BASE_URL | ||
| 
     | 
||
| return _handle_browser_auth(base_url, no_open) | ||
| 
     | 
||
| 
     | 
||
| def _handle_browser_auth(base_url: str, no_open: bool) -> str: | ||
| """Handle browser-based authentication flow. | ||
| Args: | ||
| base_url: API base URL | ||
| no_open: Whether to skip automatic browser opening | ||
| Returns: | ||
| API key string | ||
| """ | ||
| auth_url = f"{base_url}/{DEFAULT_API_AUTH_PATH}" | ||
| 
         
      Comment on lines
    
      +106
     to 
      +116
    
   
  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Align browser-auth helper with non-returning flow. Make it return None and not pass secrets upward. -def _handle_browser_auth(base_url: str, no_open: bool) -> str:
+def _handle_browser_auth(base_url: str, no_open: bool) -> None:
     """Handle browser-based authentication flow.
 
     Args:
         base_url: API base URL
         no_open: Whether to skip automatic browser opening
 
-    Returns:
-        API key string
+    Returns:
+        None
     """
     auth_url = f"{base_url}/{DEFAULT_API_AUTH_PATH}"
@@
-    return _handle_manual_key_input()
+    _handle_manual_key_input()
+    return NoneAlso applies to: 129-129 🤖 Prompt for AI Agents | 
||
| 
     | 
||
| # TODO: This flow should be updated to OAuth2. Probably need to spin up local server to handle | ||
| # the oauth2 callback url. | ||
| if not no_open: | ||
| print_info("Opening MCP Agent Cloud API login in browser...") | ||
| print_info( | ||
| f"If the browser doesn't automatically open, you can manually visit: {auth_url}" | ||
| ) | ||
| typer.launch(auth_url) | ||
| else: | ||
| print_info(f"Please visit: {auth_url}") | ||
| 
     | 
||
| return _handle_manual_key_input() | ||
| 
     | 
||
| 
     | 
||
| def _handle_manual_key_input() -> str: | ||
| """Handle manual API key input. | ||
| Returns: | ||
| API key string | ||
| """ | ||
| input_api_key = Prompt.ask("Please enter your API key :key:") | ||
                
      
                  jtcorbett marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
| if not input_api_key: | ||
| print_warning("No API key provided.") | ||
| raise CLIError("Failed to set valid API key") | ||
| 
     | 
||
| if not _is_valid_api_key(input_api_key): | ||
| print_warning("Invalid API key provided.") | ||
| raise CLIError("Failed to set valid API key") | ||
                
      
                  jtcorbett marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
| credentials = _load_user_credentials(input_api_key) | ||
| 
     | 
||
| save_credentials(credentials) | ||
| print_success("API key set.") | ||
| if credentials.username: | ||
| print_info(f"Logged in as: {credentials.username}") | ||
| 
     | 
||
| return input_api_key | ||
| 
     | 
||
| 
     | 
||
| def _is_valid_api_key(api_key: str) -> bool: | ||
| """Validate the API key. | ||
| Args: | ||
| api_key: The API key to validate. | ||
| Returns: | ||
| bool: True if the API key is valid, False otherwise. | ||
| """ | ||
| return api_key.startswith("lm_mcp_api_") | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| """MCP Agent Cloud logout command.""" | ||
| 
     | 
||
| from .main import logout | ||
| 
     | 
||
| __all__ = ["logout"] | 
Uh oh!
There was an error while loading. Please reload this page.