Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/mcp_agent/cli/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
This package provides utilities for authentication (for now, api keys).
"""

from .main import load_api_key_credentials, save_api_key_credentials
from .main import (
clear_credentials,
load_api_key_credentials,
load_credentials,
save_credentials,
)
from .models import UserCredentials

__all__ = ["load_api_key_credentials", "save_api_key_credentials"]
__all__ = [
"clear_credentials",
"load_api_key_credentials",
"load_credentials",
"save_credentials",
"UserCredentials",
]
2 changes: 1 addition & 1 deletion src/mcp_agent/cli/auth/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Constants for the MCP Agent auth utilities."""

# Default values
DEFAULT_CREDENTIALS_PATH = "~/.mcp_agent/cloud/auth/credentials"
DEFAULT_CREDENTIALS_PATH = "~/.mcp-agent/credentials.json"
59 changes: 48 additions & 11 deletions src/mcp_agent/cli/auth/main.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,69 @@
import json
import os
from typing import Optional

from .constants import DEFAULT_CREDENTIALS_PATH
from .models import UserCredentials


def save_api_key_credentials(api_key: str):
"""Save an API key to the credentials file.
def save_credentials(credentials: UserCredentials) -> None:
"""Save user credentials to the credentials file.

Args:
api_key: API key to persist
credentials: UserCredentials object to persist

Returns:
None
"""
credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH)
os.makedirs(os.path.dirname(credentials_path), exist_ok=True)
with open(credentials_path, "w", encoding="utf-8") as f:
f.write(api_key)
cred_dir = os.path.dirname(credentials_path)
os.makedirs(cred_dir, exist_ok=True)
try:
os.chmod(cred_dir, 0o700)
except OSError:
pass

# Create file with restricted permissions (0600) to prevent leakage
fd = os.open(credentials_path, os.O_WRONLY | os.O_CREAT, 0o600)
with os.fdopen(fd, "w") as f:
f.write(credentials.to_json())

def load_api_key_credentials() -> Optional[str]:
"""Load an API key from the credentials file.

def load_credentials() -> Optional[UserCredentials]:
"""Load user credentials from the credentials file.

Returns:
String. API key if it exists, None otherwise
UserCredentials object if it exists, None otherwise
"""
credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH)
if os.path.exists(credentials_path):
with open(credentials_path, "r", encoding="utf-8") as f:
return f.read().strip()
try:
with open(credentials_path, "r", encoding="utf-8") as f:
return UserCredentials.from_json(f.read())
except (json.JSONDecodeError, KeyError, ValueError):
# Handle corrupted or old format credentials
return None
return None


def clear_credentials() -> bool:
"""Clear stored credentials.

Returns:
bool: True if credentials were cleared, False if none existed
"""
credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH)
if os.path.exists(credentials_path):
os.remove(credentials_path)
return True
return False


def load_api_key_credentials() -> Optional[str]:
"""Load an API key from the credentials file (backward compatibility).

Returns:
String. API key if it exists, None otherwise
"""
credentials = load_credentials()
return credentials.api_key if credentials else None
64 changes: 64 additions & 0 deletions src/mcp_agent/cli/auth/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Authentication models for MCP Agent Cloud CLI."""

import json
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass
class UserCredentials:
"""User authentication credentials and identity information."""

# Authentication
api_key: str = field(repr=False)
token_expires_at: Optional[datetime] = None

# Identity
username: Optional[str] = None
email: Optional[str] = None

@property
def is_token_expired(self) -> bool:
"""Check if the token is expired."""
if not self.token_expires_at:
return False
return datetime.now() > self.token_expires_at

def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
result = {
"api_key": self.api_key,
"username": self.username,
"email": self.email,
}

if self.token_expires_at:
result["token_expires_at"] = self.token_expires_at.isoformat()

return result

@classmethod
def from_dict(cls, data: dict) -> "UserCredentials":
"""Create from dictionary loaded from JSON."""

token_expires_at = None
if "token_expires_at" in data:
token_expires_at = datetime.fromisoformat(data["token_expires_at"])

return cls(
api_key=data["api_key"],
token_expires_at=token_expires_at,
username=data.get("username"),
email=data.get("email"),
)

def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), indent=2)

@classmethod
def from_json(cls, json_str: str) -> "UserCredentials":
"""Create from JSON string."""
data = json.loads(json_str)
return cls.from_dict(data)
4 changes: 2 additions & 2 deletions src/mcp_agent/cli/cloud/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@

from .configure.main import configure_app
from .deploy.main import deploy_config
from .login import login
from .auth import login, logout, whoami

__all__ = ["configure_app", "deploy_config", "login"]
__all__ = ["configure_app", "deploy_config", "login", "logout", "whoami"]
7 changes: 7 additions & 0 deletions src/mcp_agent/cli/cloud/commands/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""MCP Agent Cloud authentication commands."""

from .login import login
from .logout import logout
from .whoami import whoami

__all__ = ["login", "logout", "whoami"]
167 changes: 167 additions & 0 deletions src/mcp_agent/cli/cloud/commands/auth/login/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import asyncio
from typing import Optional

import typer
from rich.prompt import Confirm, Prompt

from mcp_agent.cli.auth import (
UserCredentials,
load_credentials,
save_credentials,
)
from mcp_agent.cli.config import settings
from mcp_agent.cli.core.api_client import APIClient
from mcp_agent.cli.exceptions import CLIError
from mcp_agent.cli.utils.ux import (
print_info,
print_success,
print_warning,
)

from .constants import DEFAULT_API_AUTH_PATH


def _load_user_credentials(api_key: str) -> UserCredentials:
"""Load credentials with user profile data fetched from API.

Args:
api_key: The API key

Returns:
UserCredentials object with profile data if available
"""

async def fetch_profile() -> UserCredentials:
"""Fetch user profile from the API."""
client = APIClient(settings.API_BASE_URL, api_key)

response = await client.post("user/get_profile", {})
user_data = response.json()

user_profile = user_data.get("user", {})

return UserCredentials(
api_key=api_key,
username=user_profile.get("name"),
email=user_profile.get("email"),
)

try:
return asyncio.run(fetch_profile())
except Exception as e:
print_warning(f"Could not fetch user profile: {str(e)}")
# Fallback to minimal credentials
return UserCredentials(api_key=api_key)


def login(
api_key: Optional[str] = typer.Option(
None,
"--api-key",
help="Optionally set an existing API key to use for authentication, bypassing manual login.",
envvar="MCP_API_KEY",
),
no_open: bool = typer.Option(
False,
"--no-open",
help="Don't automatically open browser for authentication.",
),
) -> str:
"""Authenticate to MCP Agent Cloud API.

Direct to the api keys page for obtaining credentials, routing through login.

Args:
api_key: Optionally set an existing API key to use for authentication, bypassing manual login.
no_open: Don't automatically open browser for authentication.

Returns:
API key string. Prints success message if login is successful.
"""

existing_credentials = load_credentials()
if existing_credentials and not existing_credentials.is_token_expired:
if not Confirm.ask("You are already logged in. Do you want to login again?"):
print_info("Using existing credentials.")
return existing_credentials.api_key

if api_key:
print_info("Using provided API key for authentication (MCP_API_KEY).")
if not _is_valid_api_key(api_key):
raise CLIError("Invalid API key provided.")

credentials = _load_user_credentials(api_key)

save_credentials(credentials)
print_success("API key set.")
if credentials.username:
print_info(f"Logged in as: {credentials.username}")
return api_key

base_url = settings.API_BASE_URL

return _handle_browser_auth(base_url, no_open)


def _handle_browser_auth(base_url: str, no_open: bool) -> str:
"""Handle browser-based authentication flow.

Args:
base_url: API base URL
no_open: Whether to skip automatic browser opening

Returns:
API key string
"""
auth_url = f"{base_url}/{DEFAULT_API_AUTH_PATH}"

# TODO: This flow should be updated to OAuth2. Probably need to spin up local server to handle
# the oauth2 callback url.
if not no_open:
print_info("Opening MCP Agent Cloud API login in browser...")
print_info(
f"If the browser doesn't automatically open, you can manually visit: {auth_url}"
)
typer.launch(auth_url)
else:
print_info(f"Please visit: {auth_url}")

return _handle_manual_key_input()


def _handle_manual_key_input() -> str:
"""Handle manual API key input.

Returns:
API key string
"""
input_api_key = Prompt.ask("Please enter your API key :key:")

if not input_api_key:
print_warning("No API key provided.")
raise CLIError("Failed to set valid API key")

if not _is_valid_api_key(input_api_key):
print_warning("Invalid API key provided.")
raise CLIError("Failed to set valid API key")

credentials = _load_user_credentials(input_api_key)

save_credentials(credentials)
print_success("API key set.")
if credentials.username:
print_info(f"Logged in as: {credentials.username}")

return input_api_key


def _is_valid_api_key(api_key: str) -> bool:
"""Validate the API key.

Args:
api_key: The API key to validate.

Returns:
bool: True if the API key is valid, False otherwise.
"""
return api_key.startswith("lm_mcp_api_")
5 changes: 5 additions & 0 deletions src/mcp_agent/cli/cloud/commands/auth/logout/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""MCP Agent Cloud logout command."""

from .main import logout

__all__ = ["logout"]
Loading
Loading