From 61ecf0e9958ba972e8756f493eeb792d7a0e88b0 Mon Sep 17 00:00:00 2001 From: Rushil Patel Date: Sat, 16 Aug 2025 00:04:25 -0700 Subject: [PATCH 1/2] fix: add --background command --- src/codegen/cli/commands/claude/main.py | 119 ++++++++++++++++++------ 1 file changed, 92 insertions(+), 27 deletions(-) diff --git a/src/codegen/cli/commands/claude/main.py b/src/codegen/cli/commands/claude/main.py index e14a64b23..0d32d2a46 100644 --- a/src/codegen/cli/commands/claude/main.py +++ b/src/codegen/cli/commands/claude/main.py @@ -7,36 +7,82 @@ import threading import time +import requests import typer +from rich import box +from rich.panel import Panel + + +from codegen.cli.api.endpoints import API_ENDPOINT +from codegen.cli.auth.token_manager import get_current_token from codegen.cli.commands.claude.claude_log_watcher import ClaudeLogWatcherManager from codegen.cli.commands.claude.claude_session_api import end_claude_session, generate_session_id from codegen.cli.commands.claude.config.mcp_setup import add_codegen_mcp_server, cleanup_codegen_mcp_server from codegen.cli.commands.claude.hooks import cleanup_claude_hook, ensure_claude_hook, get_codegen_url from codegen.cli.commands.claude.quiet_console import console +from rich.console import Console + +t_console = Console() + +from codegen.cli.rich.spinners import create_spinner from codegen.cli.utils.org import resolve_org_id -def claude( - org_id: int | None = typer.Option(None, help="Organization ID (defaults to CODEGEN_ORG_ID/REPOSITORY_ORG_ID or auto-detect)"), - no_mcp: bool | None = typer.Option(False, "--no-mcp", help="Disable Codegen's MCP server with additional capabilities over HTTP"), -): - """Run Claude Code with session tracking. +def _run_claude_background(resolved_org_id: int, prompt: str | None) -> None: + """Create a background agent run with Claude context and exit.""" + token = get_current_token() + if not token: + console.print("[red]Error:[/red] Not authenticated. Please run 'codegen login' first.") + raise typer.Exit(1) + + payload = {"prompt": prompt or "Start a Claude Code background session"} - This command runs Claude Code and tracks the session in the backend API: - - Generates a unique session ID - - Creates an agent run when Claude starts - - Updates the agent run status when Claude exits - """ + spinner = create_spinner("Creating agent run...") + spinner.start() + try: + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "x-codegen-client": "codegen__claude_code", + } + url = f"{API_ENDPOINT.rstrip('/')}/v1/organizations/{resolved_org_id}/agent/run" + response = requests.post(url, headers=headers, json=payload) + response.raise_for_status() + agent_run_data = response.json() + finally: + spinner.stop() + + run_id = agent_run_data.get("id", "Unknown") + status = agent_run_data.get("status", "Unknown") + web_url = agent_run_data.get("web_url", "") + + result_lines = [ + f"[cyan]Agent Run ID:[/cyan] {run_id}", + f"[cyan]Status:[/cyan] {status}", + ] + if web_url: + result_lines.append(f"[cyan]Web URL:[/cyan] {web_url}") + + t_console.print( + Panel( + "\n".join(result_lines), + title="šŸ¤– [bold]Background Agent Run Created[/bold]", + border_style="green", + box=box.ROUNDED, + padding=(1, 2), + ) + ) + t_console.print("\n[dim]šŸ’” Track progress with:[/dim] [cyan]codegen agents[/cyan]") + if web_url: + t_console.print(f"[dim]🌐 View in browser:[/dim] [link]{web_url}[/link]") + + +def _run_claude_interactive(resolved_org_id: int, no_mcp: bool | None) -> None: + """Launch Claude Code with session tracking and log watching.""" # Generate session ID for tracking session_id = generate_session_id() console.print(f"šŸ†” Generated session ID: {session_id[:8]}...", style="dim") - - # Resolve org_id early for session management - resolved_org_id = resolve_org_id(org_id) - if resolved_org_id is None: - console.print("[red]Error:[/red] Organization ID not provided. Pass --org-id, set CODEGEN_ORG_ID, or REPOSITORY_ORG_ID.") - raise typer.Exit(1) - + console.print("šŸš€ Starting Claude Code with session tracking...", style="blue") console.print(f"šŸŽÆ Organization ID: {resolved_org_id}", style="dim") @@ -79,29 +125,27 @@ def claude( url = get_codegen_url(session_id) console.print(f"\nšŸ”µ Codegen URL: {url}\n", style="bold blue") - process = subprocess.Popen(["claude", "--session-id", session_id]) - # Start log watcher for the session console.print("šŸ“‹ Starting log watcher...", style="blue") log_watcher_started = log_watcher_manager.start_watcher( session_id=session_id, org_id=resolved_org_id, - poll_interval=1.0, # Check every second - on_log_entry=None + poll_interval=1.0, + on_log_entry=None, ) - + if not log_watcher_started: console.print("āš ļø Failed to start log watcher", style="yellow") # Handle Ctrl+C gracefully def signal_handler(signum, frame): console.print("\nšŸ›‘ Stopping Claude Code...", style="yellow") - log_watcher_manager.stop_all_watchers() # Stop log watchers + log_watcher_manager.stop_all_watchers() process.terminate() - cleanup_claude_hook() # Clean up our hook - cleanup_codegen_mcp_server() # Clean up MCP Server + cleanup_claude_hook() + cleanup_codegen_mcp_server() end_claude_session(session_id, "ERROR", resolved_org_id) sys.exit(0) @@ -140,7 +184,7 @@ def signal_handler(signum, frame): log_watcher_manager.stop_all_watchers() except Exception as e: console.print(f"āš ļø Error stopping log watchers: {e}", style="yellow") - + cleanup_claude_hook() # Show final session info @@ -148,4 +192,25 @@ def signal_handler(signum, frame): console.print(f"\nšŸ”µ Session URL: {url}", style="bold blue") console.print(f"šŸ†” Session ID: {session_id}", style="dim") console.print(f"šŸŽÆ Organization ID: {resolved_org_id}", style="dim") - console.print("šŸ’” Check your backend to see the session data", style="dim") \ No newline at end of file + console.print("šŸ’” Check your backend to see the session data", style="dim") + + +def claude( + org_id: int | None = typer.Option(None, help="Organization ID (defaults to CODEGEN_ORG_ID/REPOSITORY_ORG_ID or auto-detect)"), + no_mcp: bool | None = typer.Option(False, "--no-mcp", help="Disable Codegen's MCP server with additional capabilities over HTTP"), + background: str | None = typer.Option(None, "--background", "-b", help="Create a background agent run with this prompt instead of launching Claude Code"), +): + """Run Claude Code with session tracking or create a background run.""" + # Resolve org_id early for session management + resolved_org_id = resolve_org_id(org_id) + if resolved_org_id is None: + console.print("[red]Error:[/red] Organization ID not provided. Pass --org-id, set CODEGEN_ORG_ID, or REPOSITORY_ORG_ID.") + raise typer.Exit(1) + + if background is not None: + # Use the value from --background as the prompt, with --prompt as fallback + final_prompt = background or prompt + _run_claude_background(resolved_org_id, final_prompt) + return + + _run_claude_interactive(resolved_org_id, no_mcp) \ No newline at end of file From a68218f2bfa9607ba19682a461ddfb1b8b31f847 Mon Sep 17 00:00:00 2001 From: Rushil Patel Date: Sun, 17 Aug 2025 10:07:21 -0700 Subject: [PATCH 2/2] fix: new tui pages, org, repo selector, background agents --- src/codegen/cli/auth/token_manager.py | 174 +++++++++++- src/codegen/cli/cli.py | 4 + src/codegen/cli/commands/org/__init__.py | 5 + src/codegen/cli/commands/org/main.py | 129 +++++++++ src/codegen/cli/commands/org/tui.py | 325 ++++++++++++++++++++++ src/codegen/cli/commands/profile/main.py | 13 +- src/codegen/cli/commands/repo/__init__.py | 5 + src/codegen/cli/commands/repo/main.py | 159 +++++++++++ src/codegen/cli/commands/repo/tui.py | 303 ++++++++++++++++++++ src/codegen/cli/tui/agent_detail.py | 309 ++++++++++++++++++++ src/codegen/cli/tui/app.py | 180 ++++++++++-- src/codegen/cli/tui/codegen_theme.tcss | 185 ++++++++++++ src/codegen/cli/utils/org.py | 142 ++++++---- src/codegen/cli/utils/repo.py | 236 ++++++++++++++++ 14 files changed, 2083 insertions(+), 86 deletions(-) create mode 100644 src/codegen/cli/commands/org/__init__.py create mode 100644 src/codegen/cli/commands/org/main.py create mode 100644 src/codegen/cli/commands/org/tui.py create mode 100644 src/codegen/cli/commands/repo/__init__.py create mode 100644 src/codegen/cli/commands/repo/main.py create mode 100644 src/codegen/cli/commands/repo/tui.py create mode 100644 src/codegen/cli/tui/agent_detail.py create mode 100644 src/codegen/cli/tui/codegen_theme.tcss create mode 100644 src/codegen/cli/utils/repo.py diff --git a/src/codegen/cli/auth/token_manager.py b/src/codegen/cli/auth/token_manager.py index e25e22b52..2154032c2 100644 --- a/src/codegen/cli/auth/token_manager.py +++ b/src/codegen/cli/auth/token_manager.py @@ -58,8 +58,15 @@ def save_token_with_org_info(self, token: str) -> None: # Add organization info if available orgs = org_data.get("items", []) if orgs and len(orgs) > 0: - primary_org = orgs[0] # Use first org as primary - auth_data["organization"] = {"id": primary_org.get("id"), "name": primary_org.get("name"), "all_orgs": [{"id": org.get("id"), "name": org.get("name")} for org in orgs]} + # Store ALL organizations in cache for local resolution + all_orgs = [{"id": org.get("id"), "name": org.get("name")} for org in orgs] + primary_org = orgs[0] # Use first org as primary/default + auth_data["organization"] = { + "id": primary_org.get("id"), + "name": primary_org.get("name"), + "all_orgs": all_orgs + } + auth_data["organizations_cache"] = all_orgs # Separate cache for easy access except requests.RequestException as e: # If we can't fetch org info, still save the token but without org data @@ -171,6 +178,53 @@ def get_user_info(self) -> dict | None: return auth_data["user"] return None + def get_cached_organizations(self) -> list[dict] | None: + """Get all cached organizations. + + Returns: + List of organization dictionaries with 'id' and 'name' keys, or None if no cache. + """ + auth_data = self.get_auth_data() + if auth_data and "organizations_cache" in auth_data: + return auth_data["organizations_cache"] + # Fallback to legacy format + if auth_data and "organization" in auth_data and "all_orgs" in auth_data["organization"]: + return auth_data["organization"]["all_orgs"] + return None + + def is_org_id_in_cache(self, org_id: int) -> bool: + """Check if an organization ID exists in the local cache. + + Args: + org_id: The organization ID to check + + Returns: + True if the organization ID is found in cache, False otherwise. + """ + cached_orgs = self.get_cached_organizations() + if not cached_orgs: + return False + + return any(org.get("id") == org_id for org in cached_orgs) + + def get_org_name_from_cache(self, org_id: int) -> str | None: + """Get organization name from cache by ID. + + Args: + org_id: The organization ID to look up + + Returns: + Organization name if found in cache, None otherwise. + """ + cached_orgs = self.get_cached_organizations() + if not cached_orgs: + return None + + for org in cached_orgs: + if org.get("id") == org_id: + return org.get("name") + return None + def get_current_token() -> str | None: """Get the current authentication token if one exists. @@ -233,6 +287,42 @@ def get_current_org_name() -> str | None: return token_manager.get_org_name() +def get_cached_organizations() -> list[dict] | None: + """Get all cached organizations. + + Returns: + List of organization dictionaries with 'id' and 'name' keys, or None if no cache. + """ + token_manager = TokenManager() + return token_manager.get_cached_organizations() + + +def is_org_id_cached(org_id: int) -> bool: + """Check if an organization ID exists in the local cache. + + Args: + org_id: The organization ID to check + + Returns: + True if the organization ID is found in cache, False otherwise. + """ + token_manager = TokenManager() + return token_manager.is_org_id_in_cache(org_id) + + +def get_org_name_from_cache(org_id: int) -> str | None: + """Get organization name from cache by ID. + + Args: + org_id: The organization ID to look up + + Returns: + Organization name if found in cache, None otherwise. + """ + token_manager = TokenManager() + return token_manager.get_org_name_from_cache(org_id) + + def get_current_user_info() -> dict | None: """Get the stored user info if available. @@ -241,3 +331,83 @@ def get_current_user_info() -> dict | None: """ token_manager = TokenManager() return token_manager.get_user_info() + + +# Repository caching functions (similar to organization caching) + +def get_cached_repositories() -> list[dict] | None: + """Get all cached repositories. + + Returns: + List of repository dictionaries with 'id' and 'name' keys, or None if no cache. + """ + token_manager = TokenManager() + auth_data = token_manager.get_auth_data() + if auth_data and "repositories_cache" in auth_data: + return auth_data["repositories_cache"] + return None + + +def cache_repositories(repositories: list[dict]) -> None: + """Cache repositories to local storage. + + Args: + repositories: List of repository dictionaries to cache + """ + token_manager = TokenManager() + auth_data = token_manager.get_auth_data() + if auth_data: + auth_data["repositories_cache"] = repositories + # Save back to file + try: + import json + with open(token_manager.token_file, 'w') as f: + json.dump(auth_data, f, indent=2) + except Exception: + pass # Fail silently + + +def is_repo_id_cached(repo_id: int) -> bool: + """Check if a repository ID exists in the local cache. + + Args: + repo_id: The repository ID to check + + Returns: + True if the repository ID is found in cache, False otherwise. + """ + cached_repos = get_cached_repositories() + if not cached_repos: + return False + + return any(repo.get("id") == repo_id for repo in cached_repos) + + +def get_repo_name_from_cache(repo_id: int) -> str | None: + """Get repository name from cache by ID. + + Args: + repo_id: The repository ID to look up + + Returns: + Repository name if found in cache, None otherwise. + """ + cached_repos = get_cached_repositories() + if not cached_repos: + return None + + for repo in cached_repos: + if repo.get("id") == repo_id: + return repo.get("name") + + return None + + +def get_current_repo_name() -> str | None: + """Get the current repository name from environment or cache.""" + from codegen.cli.utils.repo import get_current_repo_id + + repo_id = get_current_repo_id() + if repo_id: + return get_repo_name_from_cache(repo_id) + return None diff --git a/src/codegen/cli/cli.py b/src/codegen/cli/cli.py index b1ca50c70..768177221 100644 --- a/src/codegen/cli/cli.py +++ b/src/codegen/cli/cli.py @@ -14,7 +14,9 @@ from codegen.cli.commands.integrations.main import integrations_app from codegen.cli.commands.login.main import login from codegen.cli.commands.logout.main import logout +from codegen.cli.commands.org.main import org from codegen.cli.commands.profile.main import profile +from codegen.cli.commands.repo.main import repo from codegen.cli.commands.style_debug.main import style_debug from codegen.cli.commands.tools.main import tools from codegen.cli.commands.tui.main import tui @@ -39,7 +41,9 @@ def version_callback(value: bool): main.command("init", help="Initialize or update the Codegen folder.")(init) main.command("login", help="Store authentication token.")(login) main.command("logout", help="Clear stored authentication token.")(logout) +main.command("org", help="Manage and switch between organizations.")(org) main.command("profile", help="Display information about the currently authenticated user.")(profile) +main.command("repo", help="Manage repository configuration and environment variables.")(repo) main.command("style-debug", help="Debug command to visualize CLI styling (spinners, etc).")(style_debug) main.command("tools", help="List available tools from the Codegen API.")(tools) main.command("tui", help="Launch the interactive TUI interface.")(tui) diff --git a/src/codegen/cli/commands/org/__init__.py b/src/codegen/cli/commands/org/__init__.py new file mode 100644 index 000000000..ba2d89354 --- /dev/null +++ b/src/codegen/cli/commands/org/__init__.py @@ -0,0 +1,5 @@ +"""Organization management command.""" + +from .main import org + +__all__ = ["org"] \ No newline at end of file diff --git a/src/codegen/cli/commands/org/main.py b/src/codegen/cli/commands/org/main.py new file mode 100644 index 000000000..5b5f2a4d2 --- /dev/null +++ b/src/codegen/cli/commands/org/main.py @@ -0,0 +1,129 @@ +"""Organization management command for switching between organizations.""" + +import os + +import typer +from rich.console import Console + +from codegen.cli.auth.token_manager import get_cached_organizations, get_current_token +from codegen.cli.commands.org.tui import OrgSelectorApp + +console = Console() + + +def org( + set_default: int | None = typer.Option(None, "--set-default", "-s", help="Set default organization ID"), + list_orgs: bool = typer.Option(False, "--list", "-l", help="List available organizations"), +): + """Manage and switch between organizations.""" + # Check if user is authenticated + token = get_current_token() + if not token: + console.print("[red]Error:[/red] Not authenticated. Please run 'codegen login' first.") + raise typer.Exit(1) + + # Get cached organizations + cached_orgs = get_cached_organizations() + if not cached_orgs: + console.print("[red]Error:[/red] No organizations found in cache. Please run 'codegen login' to refresh.") + raise typer.Exit(1) + + # Handle list mode + if list_orgs: + _list_organizations(cached_orgs) + return + + # Handle set default mode + if set_default is not None: + _set_default_organization(set_default, cached_orgs) + return + + # No flags provided, launch TUI + _run_org_selector_tui() + + +def _list_organizations(cached_orgs: list[dict]) -> None: + """List all available organizations.""" + from rich.table import Table + + table = Table(title="Available Organizations") + table.add_column("ID", style="cyan", no_wrap=True) + table.add_column("Name", style="green") + + for org in cached_orgs: + table.add_row(str(org["id"]), org["name"]) + + console.print(table) + + +def _set_default_organization(org_id: int, cached_orgs: list[dict]) -> None: + """Set the default organization via environment variable.""" + # Check if org ID exists in cache + org_found = None + for org in cached_orgs: + if org["id"] == org_id: + org_found = org + break + + if not org_found: + available_orgs = ", ".join([f"{org['name']} ({org['id']})" for org in cached_orgs]) + console.print(f"[red]Error:[/red] Organization ID {org_id} not found in your accessible organizations.") + console.print(f"[yellow]Available organizations:[/yellow] {available_orgs}") + raise typer.Exit(1) + + # Set the environment variable + os.environ["CODEGEN_ORG_ID"] = str(org_id) + + # Try to update .env file if it exists + env_file_path = ".env" + if os.path.exists(env_file_path): + _update_env_file(env_file_path, "CODEGEN_ORG_ID", str(org_id)) + console.print(f"[green]āœ“ Updated {env_file_path} with CODEGEN_ORG_ID={org_id}[/green]") + else: + console.print(f"[yellow]Info:[/yellow] No .env file found. Set environment variable manually:") + console.print(f"[cyan]export CODEGEN_ORG_ID={org_id}[/cyan]") + + console.print(f"[green]āœ“ Default organization set to:[/green] {org_found['name']} ({org_id})") + + +def _update_env_file(file_path: str, key: str, value: str) -> None: + """Update or add an environment variable in the .env file.""" + lines = [] + key_found = False + + # Read existing lines + try: + with open(file_path) as f: + lines = f.readlines() + except FileNotFoundError: + pass + + # Ensure all lines end with newline + for i, line in enumerate(lines): + if not line.endswith('\n'): + lines[i] = line + '\n' + + # Update existing key or note if we need to add it + for i, line in enumerate(lines): + if line.strip().startswith(f"{key}="): + lines[i] = f"{key}={value}\n" + key_found = True + break + + # Add new key if not found + if not key_found: + lines.append(f"{key}={value}\n") + + # Write back to file + with open(file_path, "w") as f: + f.writelines(lines) + + +def _run_org_selector_tui() -> None: + """Launch the organization selector TUI.""" + try: + app = OrgSelectorApp() + app.run() + except Exception as e: + console.print(f"[red]Error launching TUI:[/red] {e}") + raise typer.Exit(1) \ No newline at end of file diff --git a/src/codegen/cli/commands/org/tui.py b/src/codegen/cli/commands/org/tui.py new file mode 100644 index 000000000..f1640103d --- /dev/null +++ b/src/codegen/cli/commands/org/tui.py @@ -0,0 +1,325 @@ +"""Organization selector TUI using Textual - Fixed version.""" + +import os + +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Container, Vertical +from textual.screen import Screen +from textual.widgets import DataTable, Footer, Header, Static + +from codegen.cli.auth.token_manager import get_cached_organizations, get_current_org_id +from codegen.cli.utils.org import resolve_org_id + + +class OrgSelectorTUI(Screen): + """TUI for selecting and switching organizations.""" + + BINDINGS = [ + Binding("escape,ctrl+c", "quit", "Quit", priority=True), + Binding("enter", "select_org", "Select", show=True), + Binding("q", "quit", "Quit", show=True), + ] + + def __init__(self): + super().__init__() + self.organizations = get_cached_organizations() or [] + self.current_org_id = get_current_org_id() + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Header() + + if not self.organizations: + yield Container( + Static("āš ļø No organizations found. Please run 'codegen login' first.", classes="warning-message"), + id="no-orgs-warning" + ) + else: + with Vertical(): + yield Static("šŸ¢ Select Your Organization", classes="title") + yield Static("Use ↑↓ to navigate, Enter to select, Q/Esc to quit", classes="help") + + table = DataTable(id="orgs-table", cursor_type="row") + table.add_columns("Current", "ID", "Organization Name") + + # Get the actual current org ID (checks environment variables first) + actual_current_org_id = resolve_org_id() + + for org in self.organizations: + org_id = org["id"] + org_name = org["name"] + is_current = "ā—" if org_id == actual_current_org_id else " " + + table.add_row(is_current, str(org_id), org_name, key=str(org_id)) + + yield table + + yield Static( + "\nšŸ’” Selecting an organization will update your CODEGEN_ORG_ID environment variable.", + classes="help" + ) + + yield Footer() + + def on_mount(self) -> None: + """Called when the screen is mounted.""" + # Set focus on the table if it exists + if self.organizations: + try: + table = self.query_one("#orgs-table", DataTable) + table.focus() + except Exception: + pass + + def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: + """Handle DataTable row selection (Enter key).""" + if event.data_table.id == "orgs-table": + self._handle_org_selection() + + def action_select_org(self) -> None: + """Select the highlighted organization (fallback action).""" + self._handle_org_selection() + + def _handle_org_selection(self) -> None: + """Handle organization selection logic.""" + if not self.organizations: + self.notify("āŒ No organizations available", severity="error") + return + + try: + table = self.query_one("#orgs-table", DataTable) + + if table.cursor_row is not None and table.cursor_row < len(self.organizations): + # Get the selected organization directly from the cursor position + selected_org = self.organizations[table.cursor_row] + selected_org_id = selected_org["id"] + + # Set the organization + self._set_organization(selected_org_id, selected_org["name"]) + else: + self.notify(f"āŒ Invalid cursor position: {table.cursor_row}/{len(self.organizations)}", severity="error") + except Exception as e: + self.notify(f"āŒ Error in select org: {e}", severity="error") + + def _set_organization(self, org_id: int, org_name: str) -> None: + """Set the selected organization as default.""" + # Set environment variable + os.environ["CODEGEN_ORG_ID"] = str(org_id) + + # Try to update .env file + env_updated = self._update_env_file(org_id) + + if env_updated: + self.notify(f"āœ“ Set default organization: {org_name} (ID: {org_id})") + self.notify("āœ“ Updated .env file with CODEGEN_ORG_ID") + else: + self.notify(f"āœ“ Set organization: {org_name} (ID: {org_id})") + self.notify("ℹ Add 'export CODEGEN_ORG_ID={org_id}' to your shell for persistence") + + # Wait a moment for user to see the notifications, then close + self.set_timer(2.0, self._close_screen) + + def _update_env_file(self, org_id: int) -> bool: + """Update the .env file with the new organization ID.""" + env_file_path = ".env" + + try: + lines = [] + key_found = False + + # Read existing lines if file exists + if os.path.exists(env_file_path): + with open(env_file_path) as f: + lines = f.readlines() + + # Ensure all lines end with newline + for i, line in enumerate(lines): + if not line.endswith('\n'): + lines[i] = line + '\n' + + # Update existing CODEGEN_ORG_ID or note that we need to add it + for i, line in enumerate(lines): + if line.strip().startswith("CODEGEN_ORG_ID="): + lines[i] = f"CODEGEN_ORG_ID={org_id}\n" + key_found = True + break + + # Add new line if not found + if not key_found: + lines.append(f"CODEGEN_ORG_ID={org_id}\n") + + # Write back to file + with open(env_file_path, "w") as f: + f.writelines(lines) + + return True + + except Exception: + return False + + def _close_screen(self) -> None: + """Close the screen.""" + try: + # Pop ourselves from the screen stack + self.app.pop_screen() + except Exception: + # Fallback - try to dismiss the screen + self.dismiss() + + def action_quit(self) -> None: + """Quit the application or close the screen.""" + self._close_screen() + + +class OrgSelectorApp(App): + """Standalone app wrapper for the organization selector.""" + + CSS_PATH = "../../tui/codegen_theme.tcss" # Use custom Codegen theme + TITLE = "Organization Selector - Codegen CLI" + BINDINGS = [ + Binding("escape,ctrl+c", "quit", "Quit", priority=True), + Binding("enter", "select_org", "Select", show=True), + Binding("q", "quit", "Quit", show=True), + ] + + def __init__(self): + super().__init__() + self.organizations = get_cached_organizations() or [] + self.current_org_id = get_current_org_id() + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Header() + + if not self.organizations: + yield Container( + Static("āš ļø No organizations found. Please run 'codegen login' first.", classes="warning-message"), + id="no-orgs-warning" + ) + else: + with Vertical(): + yield Static("šŸ¢ Select Your Organization", classes="title") + yield Static("Use ↑↓ to navigate, Enter to select, Q/Esc to quit", classes="help") + + table = DataTable(id="orgs-table", cursor_type="row") + table.add_columns("Current", "ID", "Organization Name") + + # Get the actual current org ID (checks environment variables first) + actual_current_org_id = resolve_org_id() + + for org in self.organizations: + org_id = org["id"] + org_name = org["name"] + is_current = "ā—" if org_id == actual_current_org_id else " " + + table.add_row(is_current, str(org_id), org_name, key=str(org_id)) + + yield table + + yield Static( + "\nšŸ’” Selecting an organization will update your CODEGEN_ORG_ID environment variable.", + classes="help" + ) + + yield Footer() + + def on_mount(self) -> None: + """Called when the app mounts.""" + # Set focus on the table if it exists + if self.organizations: + try: + table = self.query_one("#orgs-table", DataTable) + table.focus() + except Exception: + pass + + def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: + """Handle DataTable row selection (Enter key).""" + if event.data_table.id == "orgs-table": + self._handle_org_selection() + + def action_select_org(self) -> None: + """Select the highlighted organization (fallback action).""" + self._handle_org_selection() + + def _handle_org_selection(self) -> None: + """Handle organization selection logic.""" + if not self.organizations: + self.notify("āŒ No organizations available", severity="error") + return + + try: + table = self.query_one("#orgs-table", DataTable) + + if table.cursor_row is not None and table.cursor_row < len(self.organizations): + # Get the selected organization directly from the cursor position + selected_org = self.organizations[table.cursor_row] + selected_org_id = selected_org["id"] + + # Set the organization + self._set_organization(selected_org_id, selected_org["name"]) + else: + self.notify(f"āŒ Invalid cursor position: {table.cursor_row}/{len(self.organizations)}", severity="error") + except Exception as e: + self.notify(f"āŒ Error in select org: {e}", severity="error") + + def _set_organization(self, org_id: int, org_name: str) -> None: + """Set the selected organization as default.""" + # Set environment variable + os.environ["CODEGEN_ORG_ID"] = str(org_id) + + # Try to update .env file + env_updated = self._update_env_file(org_id) + + if env_updated: + self.notify(f"āœ“ Set default organization: {org_name} (ID: {org_id})") + self.notify("āœ“ Updated .env file with CODEGEN_ORG_ID") + else: + self.notify(f"āœ“ Set organization: {org_name} (ID: {org_id})") + self.notify("ℹ Add 'export CODEGEN_ORG_ID={org_id}' to your shell for persistence") + + # Wait a moment for user to see the notifications, then exit + self.set_timer(2.0, self.exit) + + def _update_env_file(self, org_id: int) -> bool: + """Update the .env file with the new organization ID.""" + env_file_path = ".env" + + try: + lines = [] + key_found = False + + # Read existing lines if file exists + if os.path.exists(env_file_path): + with open(env_file_path) as f: + lines = f.readlines() + + # Ensure all lines end with newline + for i, line in enumerate(lines): + if not line.endswith('\n'): + lines[i] = line + '\n' + + # Update existing CODEGEN_ORG_ID or note that we need to add it + for i, line in enumerate(lines): + if line.strip().startswith("CODEGEN_ORG_ID="): + lines[i] = f"CODEGEN_ORG_ID={org_id}\n" + key_found = True + break + + # Add new line if not found + if not key_found: + lines.append(f"CODEGEN_ORG_ID={org_id}\n") + + # Write back to file + with open(env_file_path, "w") as f: + f.writelines(lines) + + return True + + except Exception: + return False + + def action_quit(self) -> None: + """Quit the application.""" + self.exit() \ No newline at end of file diff --git a/src/codegen/cli/commands/profile/main.py b/src/codegen/cli/commands/profile/main.py index d5c6d4795..ae49c47a3 100644 --- a/src/codegen/cli/commands/profile/main.py +++ b/src/codegen/cli/commands/profile/main.py @@ -7,7 +7,12 @@ from rich.panel import Panel from codegen.cli.api.endpoints import API_ENDPOINT -from codegen.cli.auth.token_manager import get_current_org_name, get_current_token, get_current_user_info +from codegen.cli.auth.token_manager import ( + get_cached_organizations, + get_current_org_name, + get_current_token, + get_current_user_info, +) from codegen.cli.rich.spinners import create_spinner from codegen.cli.utils.org import resolve_org_id @@ -97,6 +102,12 @@ def profile(): if role: profile_info.append(f"[cyan]Role:[/cyan] {role}") + # Add available organizations from cache + cached_orgs = get_cached_organizations() + if cached_orgs and len(cached_orgs) > 1: + org_names = [f"{org['name']} ({org['id']})" for org in cached_orgs] + profile_info.append(f"[cyan]Available Orgs:[/cyan] {', '.join(org_names)}") + profile_text = "\n".join(profile_info) if profile_info else "No profile information available" console.print( diff --git a/src/codegen/cli/commands/repo/__init__.py b/src/codegen/cli/commands/repo/__init__.py new file mode 100644 index 000000000..af83c5eba --- /dev/null +++ b/src/codegen/cli/commands/repo/__init__.py @@ -0,0 +1,5 @@ +"""Repository management commands.""" + +from .main import repo + +__all__ = ["repo"] \ No newline at end of file diff --git a/src/codegen/cli/commands/repo/main.py b/src/codegen/cli/commands/repo/main.py new file mode 100644 index 000000000..d3eb00fa7 --- /dev/null +++ b/src/codegen/cli/commands/repo/main.py @@ -0,0 +1,159 @@ +"""Repository management command for managing repository configuration.""" + +import typer +from rich.console import Console +from rich.table import Table + +from codegen.cli.utils.repo import ( + get_current_repo_id, + get_repo_env_status, + set_repo_env_variable, + update_env_file_with_repo, + clear_repo_env_variables, + ensure_repositories_cached +) +from codegen.cli.auth.token_manager import get_current_token + +console = Console() + + +def repo( + set_default: int | None = typer.Option(None, "--set-default", "-s", help="Set default repository ID"), + clear: bool = typer.Option(False, "--clear", "-c", help="Clear repository configuration"), + list_config: bool = typer.Option(False, "--list", "-l", help="List current repository configuration"), + list_repos: bool = typer.Option(False, "--list-repos", "-lr", help="List available repositories"), +): + """Manage repository configuration and environment variables.""" + + # Handle list repositories mode + if list_repos: + _list_repositories() + return + + # Handle list config mode + if list_config: + _list_repo_config() + return + + # Handle clear mode + if clear: + _clear_repo_config() + return + + # Handle set default mode + if set_default is not None: + _set_default_repository(set_default) + return + + # No flags provided, launch TUI + _run_repo_selector_tui() + + +def _list_repo_config() -> None: + """List current repository configuration.""" + table = Table(title="Repository Configuration") + table.add_column("Setting", style="cyan", no_wrap=True) + table.add_column("Value", style="green") + table.add_column("Status", style="yellow") + + # Current repository ID + current_repo_id = get_current_repo_id() + if current_repo_id: + table.add_row("Current Repository ID", str(current_repo_id), "āœ… Active") + else: + table.add_row("Current Repository ID", "Not configured", "āŒ Inactive") + + # Environment variables + env_status = get_repo_env_status() + for var_name, value in env_status.items(): + status = "āœ… Set" if value != "Not set" else "āŒ Not set" + table.add_row(var_name, value, status) + + console.print(table) + + +def _list_repositories() -> None: + """List all available repositories.""" + # Check if user is authenticated + token = get_current_token() + if not token: + console.print("[red]Error:[/red] Not authenticated. Please run 'codegen login' first.") + raise typer.Exit(1) + + # Get cached or fetch repositories + repositories = ensure_repositories_cached() + if not repositories: + console.print("[red]Error:[/red] No repositories found.") + raise typer.Exit(1) + + table = Table(title="Available Repositories") + table.add_column("ID", style="cyan", no_wrap=True) + table.add_column("Name", style="green") + table.add_column("Description", style="dim") + table.add_column("Current", style="yellow") + + current_repo_id = get_current_repo_id() + + for repo in repositories: + repo_id = repo.get("id", "Unknown") + repo_name = repo.get("name", "Unknown") + repo_desc = repo.get("description", "") + is_current = "ā—" if repo_id == current_repo_id else "" + + table.add_row(str(repo_id), repo_name, repo_desc, is_current) + + console.print(table) + + +def _set_default_repository(repo_id: int) -> None: + """Set default repository ID.""" + try: + # Set in environment + success = set_repo_env_variable(repo_id, "CODEGEN_REPO_ID") + if not success: + console.print("[red]Error:[/red] Failed to set repository ID in environment.") + raise typer.Exit(1) + + # Try to update .env file + env_updated = update_env_file_with_repo(repo_id) + + if env_updated: + console.print(f"[green]āœ“[/green] Set default repository ID to: [cyan]{repo_id}[/cyan]") + console.print("[green]āœ“[/green] Updated .env file with CODEGEN_REPO_ID") + else: + console.print(f"[green]āœ“[/green] Set repository ID to: [cyan]{repo_id}[/cyan]") + console.print("[yellow]ℹ[/yellow] Could not update .env file. Add 'export CODEGEN_REPO_ID={repo_id}' to your shell for persistence") + + except Exception as e: + console.print(f"[red]Error:[/red] Failed to set default repository: {e}") + raise typer.Exit(1) + + +def _clear_repo_config() -> None: + """Clear repository configuration.""" + try: + clear_repo_env_variables() + console.print("[green]āœ“[/green] Cleared repository configuration from environment variables") + + # Note: We don't automatically clear the .env file to avoid data loss + console.print("[yellow]ℹ[/yellow] To permanently remove from .env file, manually delete the CODEGEN_REPO_ID line") + + except Exception as e: + console.print(f"[red]Error:[/red] Failed to clear repository configuration: {e}") + raise typer.Exit(1) + + +def _run_repo_selector_tui() -> None: + """Launch the repository selector TUI.""" + try: + from codegen.cli.commands.repo.tui import RepoSelectorApp + + app = RepoSelectorApp() + app.run() + + except ImportError: + console.print("[red]Error:[/red] Repository selector TUI not available") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]Error:[/red] Failed to launch repository selector: {e}") + raise typer.Exit(1) \ No newline at end of file diff --git a/src/codegen/cli/commands/repo/tui.py b/src/codegen/cli/commands/repo/tui.py new file mode 100644 index 000000000..a9368724c --- /dev/null +++ b/src/codegen/cli/commands/repo/tui.py @@ -0,0 +1,303 @@ +"""Repository selector TUI using Textual.""" + +import os + +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Container, Vertical +from textual.screen import Screen +from textual.widgets import DataTable, Footer, Header, Static + +from codegen.cli.auth.token_manager import get_cached_repositories, get_current_token +from codegen.cli.utils.repo import ( + get_current_repo_id, + set_repo_env_variable, + update_env_file_with_repo, + ensure_repositories_cached +) +from codegen.cli.utils.org import resolve_org_id + + +class RepoSelectorTUI(Screen): + """TUI for selecting and switching repositories.""" + + BINDINGS = [ + Binding("escape,ctrl+c", "quit", "Quit", priority=True), + Binding("enter", "select_repo", "Select", show=True), + Binding("q", "quit", "Quit", show=True), + ] + + def __init__(self): + super().__init__() + self.repositories = ensure_repositories_cached() or [] + self.current_repo_id = get_current_repo_id() + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Header() + + if not self.repositories: + yield Container( + Static("āš ļø No repositories found. Fetching repositories...", classes="warning-message"), + id="no-repos-warning" + ) + else: + with Vertical(): + yield Static("šŸ—‚ļø Select Your Repository", classes="title") + yield Static("Use ↑↓ to navigate, Enter to select, Q/Esc to quit", classes="help") + + table = DataTable(id="repos-table", cursor_type="row") + table.add_columns("Current", "ID", "Repository Name") + + # Get the actual current repo ID (checks environment variables first) + actual_current_repo_id = get_current_repo_id() + + for repo in self.repositories: + repo_id = repo["id"] + repo_name = repo["name"] + is_current = "ā—" if repo_id == actual_current_repo_id else " " + + table.add_row(is_current, str(repo_id), repo_name, key=str(repo_id)) + + yield table + + yield Static( + "\nšŸ’” Selecting a repository will update your CODEGEN_REPO_ID environment variable.", + classes="help" + ) + + yield Footer() + + def on_mount(self) -> None: + """Called when the screen is mounted.""" + if self.repositories: + try: + table = self.query_one("#repos-table", DataTable) + table.focus() + except Exception: + pass + + def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: + """Handle DataTable row selection (Enter key).""" + if event.data_table.id == "repos-table": + self._handle_repo_selection() + + def action_select_repo(self) -> None: + """Select repository (fallback for direct key binding).""" + self._handle_repo_selection() + + def _handle_repo_selection(self) -> None: + """Handle repository selection logic.""" + try: + table = self.query_one("#repos-table", DataTable) + if table.cursor_row is not None and table.cursor_row < len(self.repositories): + selected_repo = self.repositories[table.cursor_row] + repo_id = selected_repo["id"] + repo_name = selected_repo["name"] + + self._set_repository(repo_id, repo_name) + except Exception as e: + self.notify(f"āŒ Error selecting repository: {e}", severity="error") + + def _set_repository(self, repo_id: int, repo_name: str) -> None: + """Set the selected repository as the current one.""" + # Update environment variable + os.environ["CODEGEN_REPO_ID"] = str(repo_id) + + # Try to update .env file + env_updated = self._update_env_file(repo_id) + + if env_updated: + self.notify(f"āœ“ Set default repository: {repo_name} (ID: {repo_id})") + self.notify("āœ“ Updated .env file with CODEGEN_REPO_ID") + else: + self.notify(f"āœ“ Set repository: {repo_name} (ID: {repo_id})") + self.notify("ℹ Add 'export CODEGEN_REPO_ID={repo_id}' to your shell for persistence") + + # Wait a moment for user to see the notifications, then exit + self.set_timer(2.0, self._close_screen) + + def _update_env_file(self, repo_id: int) -> bool: + """Update the .env file with the new repository ID.""" + env_file_path = ".env" + + try: + lines = [] + key_updated = False + key_to_update = "CODEGEN_REPO_ID" + + # Read existing .env file if it exists + if os.path.exists(env_file_path): + with open(env_file_path, "r") as f: + lines = f.readlines() + + # Update or add the key + for i, line in enumerate(lines): + if line.strip().startswith(f"{key_to_update}="): + lines[i] = f"{key_to_update}={repo_id}\n" + key_updated = True + break + + # If key wasn't found, add it + if not key_updated: + if lines and not lines[-1].endswith('\n'): + lines.append('\n') + lines.append(f"{key_to_update}={repo_id}\n") + + # Write back to file + with open(env_file_path, "w") as f: + f.writelines(lines) + + return True + + except Exception: + return False + + def _close_screen(self) -> None: + """Close the screen.""" + if hasattr(self.app, 'pop_screen'): + self.app.pop_screen() + else: + self.app.exit() + + def action_quit(self) -> None: + """Quit the TUI.""" + self._close_screen() + + +class RepoSelectorApp(App): + """Standalone app wrapper for the repository selector.""" + + CSS_PATH = "../../tui/codegen_theme.tcss" # Use custom Codegen theme + TITLE = "Repository Selector - Codegen CLI" + BINDINGS = [ + Binding("escape,ctrl+c", "quit", "Quit", priority=True), + Binding("enter", "select_repo", "Select", show=True), + Binding("q", "quit", "Quit", show=True), + ] + + def __init__(self): + super().__init__() + self.repositories = ensure_repositories_cached() or [] + self.current_repo_id = get_current_repo_id() + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Header() + + if not self.repositories: + yield Container( + Static("āš ļø No repositories found. Fetching repositories...", classes="warning-message"), + id="no-repos-warning" + ) + else: + with Vertical(): + yield Static("šŸ—‚ļø Select Your Repository", classes="title") + yield Static("Use ↑↓ to navigate, Enter to select, Q/Esc to quit", classes="help") + + table = DataTable(id="repos-table", cursor_type="row") + table.add_columns("Current", "ID", "Repository Name") + + # Get the actual current repo ID (checks environment variables first) + actual_current_repo_id = get_current_repo_id() + + for repo in self.repositories: + repo_id = repo["id"] + repo_name = repo["name"] + is_current = "ā—" if repo_id == actual_current_repo_id else " " + + table.add_row(is_current, str(repo_id), repo_name, key=str(repo_id)) + + yield table + + yield Static( + "\nšŸ’” Selecting a repository will update your CODEGEN_REPO_ID environment variable.", + classes="help" + ) + + yield Footer() + + def on_mount(self) -> None: + """Called when the app starts.""" + if self.repositories: + try: + table = self.query_one("#repos-table", DataTable) + table.focus() + except Exception: + pass + + def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: + """Handle DataTable row selection (Enter key).""" + if event.data_table.id == "repos-table": + self._handle_repo_selection() + + def action_select_repo(self) -> None: + """Select repository (fallback for direct key binding).""" + self._handle_repo_selection() + + def _handle_repo_selection(self) -> None: + """Handle repository selection logic.""" + try: + table = self.query_one("#repos-table", DataTable) + if table.cursor_row is not None and table.cursor_row < len(self.repositories): + selected_repo = self.repositories[table.cursor_row] + repo_id = selected_repo["id"] + repo_name = selected_repo["name"] + + self._set_repository(repo_id, repo_name) + except Exception as e: + self.notify(f"āŒ Error selecting repository: {e}", severity="error") + + def _set_repository(self, repo_id: int, repo_name: str) -> None: + """Set the selected repository as the current one.""" + # Update environment variable + os.environ["CODEGEN_REPO_ID"] = str(repo_id) + + # Try to update .env file + env_updated = self._update_env_file(repo_id) + + if env_updated: + self.notify(f"āœ“ Set default repository: {repo_name} (ID: {repo_id})") + self.notify("āœ“ Updated .env file with CODEGEN_REPO_ID") + else: + self.notify(f"āœ“ Set repository: {repo_name} (ID: {repo_id})") + self.notify("ℹ Add 'export CODEGEN_REPO_ID={repo_id}' to your shell for persistence") + + # Wait a moment for user to see the notifications, then exit + self.set_timer(2.0, self.exit) + + def _update_env_file(self, repo_id: int) -> bool: + """Update the .env file with the new repository ID.""" + env_file_path = ".env" + + try: + lines = [] + key_updated = False + key_to_update = "CODEGEN_REPO_ID" + + # Read existing .env file if it exists + if os.path.exists(env_file_path): + with open(env_file_path, "r") as f: + lines = f.readlines() + + # Update or add the key + for i, line in enumerate(lines): + if line.strip().startswith(f"{key_to_update}="): + lines[i] = f"{key_to_update}={repo_id}\n" + key_updated = True + break + + # If key wasn't found, add it + if not key_updated: + if lines and not lines[-1].endswith('\n'): + lines.append('\n') + lines.append(f"{key_to_update}={repo_id}\n") + + # Write back to file + with open(env_file_path, "w") as f: + f.writelines(lines) + + return True + + except Exception: + return False \ No newline at end of file diff --git a/src/codegen/cli/tui/agent_detail.py b/src/codegen/cli/tui/agent_detail.py new file mode 100644 index 000000000..d1ddd931b --- /dev/null +++ b/src/codegen/cli/tui/agent_detail.py @@ -0,0 +1,309 @@ +"""Agent Detail TUI screen for viewing individual agent runs.""" + +import asyncio +import json +from pathlib import Path +from typing import Any, Dict + +import requests +from textual.app import ComposeResult +from textual.binding import Binding +from textual.containers import Container, Vertical, Horizontal +from textual.screen import Screen +from textual.widgets import Button, Footer, Header, Static, DataTable + +from codegen.cli.auth.token_manager import get_current_token +from codegen.cli.utils.org import resolve_org_id +from codegen.cli.api.endpoints import API_ENDPOINT +from codegen.git.repo_operator.local_git_repo import LocalGitRepo + + +class AgentDetailTUI(Screen): + """TUI screen for viewing agent run details and performing actions.""" + + CSS_PATH = "codegen_theme.tcss" + BINDINGS = [ + Binding("escape,q", "back", "Back", show=True), + Binding("j", "view_json", "View JSON", show=True), + Binding("p", "pull_branch", "Pull Branch", show=True), + Binding("w", "open_web", "Open Web", show=True), + ] + + def __init__(self, agent_run: Dict[str, Any], org_id: int | None = None): + super().__init__() + self.agent_run = agent_run + self.org_id = org_id or resolve_org_id() + self.agent_data: Dict[str, Any] | None = None + self.is_loading = False + + def compose(self) -> ComposeResult: + """Create child widgets for the agent detail screen.""" + run_id = self.agent_run.get("id", "Unknown") + summary = self.agent_run.get("summary", "No summary available") + + yield Header() + + with Vertical(): + yield Static(f"šŸ¤– Agent Run Details - ID: {run_id}", classes="title", id="detail-title") + yield Static("Use J for JSON, P to pull branch, W for web, Q/Esc to go back", classes="help") + + # Basic info section + info_table = DataTable(id="info-table", cursor_type="none") + info_table.add_columns("Property", "Value") + yield info_table + + # Actions section + with Horizontal(id="actions-section"): + yield Button("šŸ“„ View JSON", id="json-btn", variant="primary") + yield Button("šŸ”€ Pull Branch", id="pull-btn", variant="default") + yield Button("🌐 Open Web", id="web-btn", variant="default") + yield Button("ā¬…ļø Back", id="back-btn", variant="default") + + # Status/loading area + yield Static("", id="status-text") + + yield Footer() + + def on_mount(self) -> None: + """Called when the screen is mounted.""" + self._populate_basic_info() + # Load detailed data in background + task = asyncio.create_task(self._load_detailed_data()) + self._load_task = task + + def _populate_basic_info(self) -> None: + """Populate the info table with basic agent run information.""" + info_table = self.query_one("#info-table", DataTable) + + # Basic info from the agent run data + run_id = self.agent_run.get("id", "Unknown") + status = self.agent_run.get("status", "Unknown") + created_at = self.agent_run.get("created_at", "Unknown") + summary = self.agent_run.get("summary", "No summary available") + web_url = self.agent_run.get("web_url", "Not available") + + # Format status with emoji + status_display = status + if status == "COMPLETE": + status_display = "āœ… Complete" + elif status == "RUNNING": + status_display = "šŸƒ Running" + elif status == "FAILED": + status_display = "āŒ Failed" + elif status == "STOPPED": + status_display = "ā¹ļø Stopped" + elif status == "PENDING": + status_display = "ā³ Pending" + + # Add rows to info table + info_table.add_row("ID", str(run_id)) + info_table.add_row("Status", status_display) + info_table.add_row("Created", created_at) + info_table.add_row("Summary", summary) + info_table.add_row("Web URL", web_url) + + async def _load_detailed_data(self) -> None: + """Load detailed agent run data from the API.""" + if self.is_loading: + return + + self.is_loading = True + status_text = self.query_one("#status-text", Static) + status_text.update("šŸ”„ Loading detailed agent data...") + + try: + token = get_current_token() + if not token: + status_text.update("āŒ Not authenticated") + return + + run_id = self.agent_run.get("id") + if not run_id: + status_text.update("āŒ No agent run ID available") + return + + headers = {"Authorization": f"Bearer {token}"} + url = f"{API_ENDPOINT.rstrip('/')}/v1/organizations/{self.org_id}/agent/run/{run_id}" + + response = requests.get(url, headers=headers) + response.raise_for_status() + self.agent_data = response.json() + + # Update info table with additional details + self._update_info_with_detailed_data() + status_text.update("āœ… Agent data loaded successfully") + + except requests.HTTPError as e: + if e.response.status_code == 404: + status_text.update(f"āŒ Agent run {run_id} not found") + elif e.response.status_code == 403: + status_text.update(f"āŒ Access denied to agent run {run_id}") + else: + status_text.update(f"āŒ HTTP {e.response.status_code}: {e}") + except Exception as e: + status_text.update(f"āŒ Error loading data: {e}") + finally: + self.is_loading = False + + def _update_info_with_detailed_data(self) -> None: + """Update the info table with detailed data from the API.""" + if not self.agent_data: + return + + info_table = self.query_one("#info-table", DataTable) + + # Check for GitHub PRs + github_prs = self.agent_data.get("github_pull_requests", []) + if github_prs: + pr_info = f"{len(github_prs)} PR(s) available" + for i, pr in enumerate(github_prs[:3]): # Show up to 3 PRs + branch = pr.get("head", {}).get("ref", "unknown") + pr_info += f"\n • {branch}" + if len(github_prs) > 3: + pr_info += f"\n • ... and {len(github_prs) - 3} more" + else: + pr_info = "No PRs available" + + info_table.add_row("PR Branches", pr_info) + + # Add model info if available + model = self.agent_data.get("model", "Unknown") + info_table.add_row("Model", model) + + # Action handlers + def action_back(self) -> None: + """Go back to the main screen.""" + self.app.pop_screen() + + def action_view_json(self) -> None: + """View the full JSON data for the agent run.""" + if not self.agent_data: + self.notify("āŒ Detailed data not loaded yet", severity="error") + return + + # Create a JSON viewer screen + json_screen = JSONViewerTUI(self.agent_data) + self.app.push_screen(json_screen) + + def action_pull_branch(self) -> None: + """Pull the PR branch for this agent run.""" + if not self.agent_data: + self.notify("āŒ Detailed data not loaded yet", severity="error") + return + + # Check if we're in a git repository + try: + current_repo = LocalGitRepo(Path.cwd()) + if not current_repo.has_remote(): + self.notify("āŒ Not in a git repository with remotes", severity="error") + return + except Exception: + self.notify("āŒ Not in a valid git repository", severity="error") + return + + # Check for GitHub PRs + github_prs = self.agent_data.get("github_pull_requests", []) + if not github_prs: + self.notify("āŒ No PR branches available for this agent run", severity="error") + return + + # For now, take the first PR - in the future we could show a selector + pr = github_prs[0] + branch_name = pr.get("head", {}).get("ref") + repo_clone_url = pr.get("head", {}).get("repo", {}).get("clone_url") + + if not branch_name or not repo_clone_url: + self.notify("āŒ Invalid PR data", severity="error") + return + + # Start the pull process + task = asyncio.create_task(self._pull_branch_async(branch_name, repo_clone_url)) + self._pull_task = task + + async def _pull_branch_async(self, branch_name: str, repo_clone_url: str) -> None: + """Asynchronously pull the PR branch.""" + status_text = self.query_one("#status-text", Static) + status_text.update(f"šŸ”„ Pulling branch {branch_name}...") + + try: + current_repo = LocalGitRepo(Path.cwd()) + + # Add remote if it doesn't exist + remote_name = "codegen-pr" + try: + current_repo.add_remote(remote_name, repo_clone_url) + except Exception: + # Remote might already exist + pass + + # Fetch and checkout the branch + current_repo.fetch_remote(remote_name) + current_repo.checkout_branch(f"{remote_name}/{branch_name}", branch_name) + + status_text.update(f"āœ… Successfully checked out branch: {branch_name}") + self.notify(f"āœ… Switched to branch: {branch_name}") + + except Exception as e: + error_msg = f"āŒ Failed to pull branch: {e}" + status_text.update(error_msg) + self.notify(error_msg, severity="error") + + def action_open_web(self) -> None: + """Open the agent run in the web browser.""" + web_url = self.agent_run.get("web_url") + if not web_url: + run_id = self.agent_run.get("id") + web_url = f"https://codegen.com/traces/{run_id}" + + try: + import webbrowser + webbrowser.open(web_url) + self.notify(f"🌐 Opened {web_url}") + except Exception as e: + self.notify(f"āŒ Failed to open URL: {e}", severity="error") + + # Button event handlers + def on_button_pressed(self, event: Button.Pressed) -> None: + """Handle button press events.""" + if event.button.id == "json-btn": + self.action_view_json() + elif event.button.id == "pull-btn": + self.action_pull_branch() + elif event.button.id == "web-btn": + self.action_open_web() + elif event.button.id == "back-btn": + self.action_back() + + +class JSONViewerTUI(Screen): + """TUI screen for viewing JSON data.""" + + CSS_PATH = "codegen_theme.tcss" + BINDINGS = [ + Binding("escape,q", "back", "Back", show=True), + ] + + def __init__(self, data: Dict[str, Any]): + super().__init__() + self.data = data + + def compose(self) -> ComposeResult: + """Create child widgets for the JSON viewer.""" + yield Header() + + with Vertical(): + yield Static("šŸ“„ Agent Run JSON Data", classes="title") + yield Static("Use Q/Esc to go back", classes="help") + + # Format JSON with pretty printing + try: + json_text = json.dumps(self.data, indent=2, sort_keys=True) + yield Static(json_text, id="json-content") + except Exception as e: + yield Static(f"Error formatting JSON: {e}", id="json-content") + + yield Footer() + + def action_back(self) -> None: + """Go back to the agent detail screen.""" + self.app.pop_screen() \ No newline at end of file diff --git a/src/codegen/cli/tui/app.py b/src/codegen/cli/tui/app.py index 8b7f5b0e2..8f9d4a000 100644 --- a/src/codegen/cli/tui/app.py +++ b/src/codegen/cli/tui/app.py @@ -8,19 +8,21 @@ from textual.containers import Container, Vertical from textual.widgets import DataTable, Footer, Header, Static -from codegen.cli.auth.token_manager import get_current_token +from codegen.cli.auth.token_manager import get_current_token, get_cached_organizations, get_current_org_name, get_org_name_from_cache from codegen.cli.utils.org import resolve_org_id class CodegenTUI(App): """Simple Codegen TUI for browsing agent runs.""" - CSS_PATH = "codegen_tui.tcss" - TITLE = "Recent Agent Runs - Codegen CLI" + CSS_PATH = "codegen_theme.tcss" + TITLE = "Codegen CLI" BINDINGS = [ Binding("escape,ctrl+c", "quit", "Quit", priority=True), - Binding("enter", "open_url", "Open", show=True), + Binding("enter", "open_url", "Details", show=True), Binding("r", "refresh", "Refresh", show=True), + Binding("o", "select_org", "Org", show=True), + Binding("p", "select_repo", "Repo", show=True), ] def __init__(self): @@ -38,8 +40,11 @@ def compose(self) -> ComposeResult: yield Container(Static("āš ļø Not authenticated. Please run 'codegen login' first.", classes="warning-message"), id="auth-warning") else: with Vertical(): - yield Static("šŸ¤– Your Recent API Agent Runs", classes="title") - yield Static("Use ↑↓ to navigate, Enter to open, R to refresh, Esc to quit", classes="help") + # Show current organization info - using id for updating + org_name = get_current_org_name() + org_display = f" ({org_name})" if org_name else f" (ID: {self.org_id})" if self.org_id else "" + yield Static(f"šŸ¤– Your Recent API Agent Runs{org_display}", classes="title", id="title-text") + yield Static("Use ↑↓ to navigate, Enter for details, R to refresh, O for org, P for repo, Esc to quit", classes="help") table = DataTable(id="agents-table", cursor_type="row") table.add_columns("Created", "Status", "Summary") yield table @@ -51,6 +56,14 @@ def on_mount(self) -> None: task = asyncio.create_task(self._load_agents_data()) # Store reference to prevent garbage collection self._load_task = task + + # Ensure the table has focus for key events + try: + table = self.query_one("#agents-table", DataTable) + table.focus() + except Exception: + # Table might not be ready yet, will focus after data loads + pass async def _load_agents_data(self) -> None: """Load agents data into the table.""" @@ -139,35 +152,150 @@ async def _load_agents_data(self) -> None: except Exception as e: # If API call fails, show error in table table.add_row("Error", f"Failed to load: {e}", "") + + # Ensure table has focus after data is loaded + table.focus() + + def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: + """Handle DataTable row selection (Enter key).""" + if event.data_table.id == "agents-table": + self.notify("šŸ” Enter key pressed - opening agent details...", timeout=1) + self.action_open_url() def action_open_url(self) -> None: - """Open the selected agent run URL.""" + """Open the selected agent run detail screen.""" table = self.query_one("#agents-table", DataTable) if table.cursor_row is not None and table.cursor_row < len(self.agent_runs): agent_run = self.agent_runs[table.cursor_row] - run_id = agent_run.get("id") - web_url = agent_run.get("web_url") - - if not web_url: - # Construct URL if not provided - web_url = f"https://codegen.com/traces/{run_id}" - - # Try to open URL - try: - webbrowser.open(web_url) - self.notify(f"🌐 Opened {web_url}") - except Exception as e: - self.notify(f"āŒ Failed to open URL: {e}", severity="error") + run_id = agent_run.get("id", "Unknown") + + self.notify(f"šŸ“± Opening details for agent run {run_id}...", timeout=2) + + # Import here to avoid circular imports + from codegen.cli.tui.agent_detail import AgentDetailTUI + + # Create and push the agent detail screen + detail_screen = AgentDetailTUI(agent_run, self.org_id) + self.push_screen(detail_screen) + elif table.cursor_row is None: + self.notify("āŒ No row selected", severity="error", timeout=2) + elif len(self.agent_runs) == 0: + self.notify("āŒ No agent runs available", severity="error", timeout=2) + else: + self.notify(f"āŒ Invalid row selection: {table.cursor_row}/{len(self.agent_runs)}", severity="error", timeout=2) def action_refresh(self) -> None: """Refresh agent runs data.""" - if self.is_authenticated and self.org_id: - self.notify("šŸ”„ Refreshing...", timeout=1) - task = asyncio.create_task(self._load_agents_data()) - # Store reference to prevent garbage collection - self._refresh_task = task + if self.is_authenticated: + # Refresh org ID and title + old_org_id = self.org_id + self.org_id = resolve_org_id() + self._refresh_title() + + if self.org_id: + self.notify("šŸ”„ Refreshing...", timeout=1) + task = asyncio.create_task(self._load_agents_data()) + # Store reference to prevent garbage collection + self._refresh_task = task + else: + self.notify("āŒ No organization configured", severity="error") else: - self.notify("āŒ Not authenticated or no org ID", severity="error") + self.notify("āŒ Not authenticated", severity="error") + + def action_select_org(self) -> None: + """Launch the organization selector TUI.""" + if not self.is_authenticated: + self.notify("āŒ Not authenticated. Please login first.", severity="error") + return + + # Check if organizations are available in cache + cached_orgs = get_cached_organizations() + if not cached_orgs: + self.notify("āŒ No organizations found. Please run 'codegen login' to refresh.", severity="error") + return + + try: + # Import here to avoid circular imports + from codegen.cli.commands.org.tui import OrgSelectorTUI + + # Launch the organization selector as a sub-screen + def on_org_selected(): + # Debug callback trigger + self.notify("šŸ”§ Org selector callback triggered!", timeout=0.5) + + # Refresh the org_id and reload data when org selector closes + old_org_id = self.org_id + self.org_id = resolve_org_id() + + # Debug org ID change + self.notify(f"šŸ”§ Org ID: {old_org_id} → {self.org_id}", timeout=1) + + # Refresh the title to show new organization + self._refresh_title() + + if self.org_id and self.org_id != old_org_id: + self.notify("šŸ”„ Refreshing data for new organization...", timeout=1) + task = asyncio.create_task(self._load_agents_data()) + self._refresh_task = task + elif not self.org_id: + self.notify("āŒ No organization configured", severity="error") + else: + self.notify("ℹ Organization unchanged", timeout=1) + + org_selector = OrgSelectorTUI() + # Set up a callback when the screen is dismissed + self.push_screen(org_selector, callback=lambda _: on_org_selected()) + except Exception as e: + self.notify(f"āŒ Failed to launch org selector: {e}", severity="error") + + def action_select_repo(self) -> None: + """Launch the repository selector TUI.""" + if not self.is_authenticated: + self.notify("āŒ Not authenticated. Please login first.", severity="error") + return + + try: + # Import here to avoid circular imports + from codegen.cli.commands.repo.tui import RepoSelectorTUI + + # Launch the repository selector as a sub-screen + def on_repo_selected(): + # Debug callback trigger + self.notify("šŸ”§ Repo selector callback triggered!", timeout=0.5) + + # Notify user about repo change (repos don't affect agent runs directly in this TUI) + self.notify("āœ… Repository configuration updated!", timeout=2) + + repo_selector = RepoSelectorTUI() + # Set up a callback when the screen is dismissed + self.push_screen(repo_selector, callback=lambda _: on_repo_selected()) + except Exception as e: + self.notify(f"āŒ Failed to launch repo selector: {e}", severity="error") + + def _refresh_title(self) -> None: + """Refresh the title to show current organization.""" + try: + if self.is_authenticated: + title_widget = self.query_one("#title-text", Static) + + # Try to get org name from cache first (more reliable after org change) + org_name = None + if self.org_id: + org_name = get_org_name_from_cache(self.org_id) + + # Fallback to stored org name + if not org_name: + org_name = get_current_org_name() + + org_display = f" ({org_name})" if org_name else f" (ID: {self.org_id})" if self.org_id else "" + new_title = f"šŸ¤– Your Recent API Agent Runs{org_display}" + title_widget.update(new_title) + + # Debug notification + self.notify(f"šŸ”§ Title updated: {new_title}", timeout=0.5) + except Exception as e: + # Debug any errors + self.notify(f"āŒ Error updating title: {e}", severity="error", timeout=1) def action_quit(self) -> None: """Quit the application.""" diff --git a/src/codegen/cli/tui/codegen_theme.tcss b/src/codegen/cli/tui/codegen_theme.tcss new file mode 100644 index 000000000..cfddfd602 --- /dev/null +++ b/src/codegen/cli/tui/codegen_theme.tcss @@ -0,0 +1,185 @@ +/* Codegen Custom Theme - Indigo, Black, White, Teal */ + +/* +Color Palette: +- Indigo: #4f46e5 (primary), #6366f1 (light), #3730a3 (dark) +- Black/Charcoal: #000000, #1a1a1a +- White: #ffffff, #f8fafc +- Teal: #14b8a6 (accent), #2dd4bf (light), #0f766e (dark) +- Grays: #111827, #1f2937, #374151, #4b5563, #9ca3af, #d1d5db, #e5e7eb, #f3f4f6 +*/ + +/* Main app background */ +Screen { + background: #1a1a1a; +} + +/* Header and Footer - Primary Indigo */ +Header { + dock: top; + height: 3; + background: #4f46e5; + color: #ffffff; +} + +Footer { + dock: bottom; + height: 3; + background: #4f46e5; + color: #ffffff; +} + +/* Title styling - Light Indigo with bold text */ +.title { + text-style: bold; + margin: 1; + color: #6366f1; + text-align: center; +} + +/* Help text - Muted gray */ +.help { + margin-bottom: 1; + color: #9ca3af; + text-align: center; +} + +/* Warning messages - Black background with white text and teal border */ +.warning-message { + text-align: center; + margin: 2; + padding: 2; + background: #000000; + color: #ffffff; + text-style: bold; + border: solid #14b8a6; +} + +/* DataTable styling */ +DataTable { + height: 1fr; + margin-top: 1; + background: #111827; +} + +DataTable > .datatable--header { + text-style: bold; + background: #4f46e5; + color: #ffffff; +} + +DataTable > .datatable--odd-row { + background: #1f2937; + color: #ffffff; +} + +DataTable > .datatable--even-row { + background: #111827; + color: #ffffff; +} + +DataTable > .datatable--cursor { + background: #14b8a6; + color: #000000; + text-style: bold; +} + +DataTable > .datatable--hover { + background: #0f766e; + color: #ffffff; +} + +/* Organization selector specific styling */ +#auth-warning { + height: 100%; + align: center middle; +} + +Vertical { + height: 100%; + background: #1a1a1a; +} + +/* Button styling */ +Button { + background: #4f46e5; + color: #ffffff; + border: solid #14b8a6; +} + +Button:hover { + background: #6366f1; + color: #ffffff; +} + +Button.-active { + background: #14b8a6; + color: #000000; +} + +/* Static widget styling */ +Static { + color: #ffffff; +} + +/* Container styling */ +Container { + background: #1a1a1a; +} + +/* Focus styling */ +*:focus { + border: solid #14b8a6; +} + +/* Success/Error/Info colors using our palette */ +.success { + background: #14b8a6; + color: #000000; +} + +.error { + background: #000000; + color: #ffffff; + border: solid #14b8a6; +} + +.info { + background: #4f46e5; + color: #ffffff; +} + +/* Notification styling */ +.notification { + background: #1f2937; + color: #ffffff; + border: solid #14b8a6; +} + +/* Organization-specific styling for current org indicator */ +.org-current { + color: #14b8a6; + text-style: bold; +} + +.org-name { + color: #6366f1; +} + +/* Status indicators */ +.status-active { + color: #14b8a6; +} + +.status-complete { + color: #2dd4bf; +} + +.status-error { + color: #ffffff; + background: #000000; +} + +.status-pending { + color: #9ca3af; +} \ No newline at end of file diff --git a/src/codegen/cli/utils/org.py b/src/codegen/cli/utils/org.py index bbb800751..4dab35a9c 100644 --- a/src/codegen/cli/utils/org.py +++ b/src/codegen/cli/utils/org.py @@ -6,7 +6,13 @@ import requests from codegen.cli.api.endpoints import API_ENDPOINT -from codegen.cli.auth.token_manager import get_current_org_id, get_current_token +from codegen.cli.auth.token_manager import ( + get_cached_organizations, + get_current_org_id, + get_current_token, + get_org_name_from_cache, + is_org_id_cached, +) from codegen.cli.commands.claude.quiet_console import console # Cache for org resolution to avoid repeated API calls @@ -15,75 +21,97 @@ def resolve_org_id(explicit_org_id: int | None = None) -> int | None: - """Resolve the organization id from CLI input or environment. + """Resolve organization ID with fallback strategy and cache validation. Order of precedence: - 1) explicit_org_id passed by the caller - 2) CODEGEN_ORG_ID environment variable (dotenv is loaded by global_env) + 1) explicit_org_id passed by the caller (validated against cache) + 2) CODEGEN_ORG_ID environment variable (validated against cache if available) + 3) REPOSITORY_ORG_ID environment variable (validated against cache if available) + 4) stored org ID from auth data (fast, no API call) + 5) API auto-detection (uses first organization from user's organizations) Returns None if not found. """ global _org_cache + def _validate_org_id_with_cache(org_id: int, source: str) -> int | None: + """Validate an org ID against the cache and show helpful errors.""" + if is_org_id_cached(org_id): + return org_id + + # If we have a cache but the org ID is not in it, show helpful error + cached_orgs = get_cached_organizations() + if cached_orgs: + org_list = ", ".join([f"{org['name']} ({org['id']})" for org in cached_orgs]) + console.print(f"[red]Error:[/red] Organization ID {org_id} from {source} not found in your accessible organizations.") + console.print(f"[yellow]Available organizations:[/yellow] {org_list}") + return None + + # If no cache available, trust the org ID (will be validated by API) + return org_id + if explicit_org_id is not None: - return explicit_org_id + return _validate_org_id_with_cache(explicit_org_id, "command line") env_val = os.environ.get("CODEGEN_ORG_ID") - if env_val is None or env_val == "": - # Try repository-scoped org id from .env - repo_org = os.environ.get("REPOSITORY_ORG_ID") - if repo_org: - try: - return int(repo_org) - except ValueError: - pass - - # Try stored org ID from auth data (fast, no API call) - stored_org_id = get_current_org_id() - if stored_org_id: - return stored_org_id - - # Attempt auto-detection via API: if user belongs to organizations, use the first + if env_val is not None and env_val != "": try: - token = get_current_token() - if not token: - print("No token found") - return None - - # Check cache first - cache_key = f"org_auto_detect_{token[:10]}" # Use first 10 chars as key - current_time = time.time() - - if cache_key in _org_cache: - cached_data, cache_time = _org_cache[cache_key] - if current_time - cache_time < _cache_timeout: - return cached_data - - headers = {"Authorization": f"Bearer {token}"} - url = f"{API_ENDPOINT.rstrip('/')}/v1/organizations" - resp = requests.get(url, headers=headers, timeout=10) - resp.raise_for_status() - data = resp.json() - items = data.get("items") or [] - - org_id = None - if isinstance(items, list) and len(items) >= 1: - org = items[0] - org_id_raw = org.get("id") - try: - org_id = int(org_id_raw) - except Exception: - org_id = None - - # Cache the result - _org_cache[cache_key] = (org_id, current_time) - return org_id + env_org_id = int(env_val) + return _validate_org_id_with_cache(env_org_id, "CODEGEN_ORG_ID") + except ValueError: + console.print(f"[red]Error:[/red] Invalid CODEGEN_ORG_ID value: {env_val}") + return None - except Exception as e: - console.print(f"Exception: {e}") + # Try repository-scoped org id from .env + repo_org = os.environ.get("REPOSITORY_ORG_ID") + if repo_org: + try: + repo_org_id = int(repo_org) + return _validate_org_id_with_cache(repo_org_id, "REPOSITORY_ORG_ID") + except ValueError: + console.print(f"[red]Error:[/red] Invalid REPOSITORY_ORG_ID value: {repo_org}") return None + # Try stored org ID from auth data (fast, no API call) + stored_org_id = get_current_org_id() + if stored_org_id: + return stored_org_id + + # Attempt auto-detection via API: if user belongs to organizations, use the first try: - return int(env_val) - except ValueError: + token = get_current_token() + if not token: + return None + + # Check cache first + cache_key = f"org_auto_detect_{token[:10]}" # Use first 10 chars as key + current_time = time.time() + + if cache_key in _org_cache: + cached_data, cache_time = _org_cache[cache_key] + if current_time - cache_time < _cache_timeout: + return cached_data + + headers = {"Authorization": f"Bearer {token}"} + url = f"{API_ENDPOINT.rstrip('/')}/v1/organizations" + resp = requests.get(url, headers=headers, timeout=10) + resp.raise_for_status() + data = resp.json() + items = data.get("items") or [] + + org_id = None + if isinstance(items, list) and len(items) >= 1: + org = items[0] + org_id_raw = org.get("id") + try: + org_id = int(org_id_raw) + except Exception: + org_id = None + + # Cache the result + _org_cache[cache_key] = (org_id, current_time) + return org_id + + except Exception as e: + console.print(f"Error during organization auto-detection: {e}") return None diff --git a/src/codegen/cli/utils/repo.py b/src/codegen/cli/utils/repo.py new file mode 100644 index 000000000..5e0149af0 --- /dev/null +++ b/src/codegen/cli/utils/repo.py @@ -0,0 +1,236 @@ +"""Repository utilities for managing repository ID resolution and environment variables.""" + +import os +from typing import Dict, List, Any + +from rich.console import Console + +console = Console() + + +def resolve_repo_id(explicit_repo_id: int | None = None) -> int | None: + """Resolve repository ID with fallback strategy. + + Order of precedence: + 1) explicit_repo_id passed by the caller + 2) CODEGEN_REPO_ID environment variable + 3) REPOSITORY_ID environment variable + + Returns None if not found. + """ + if explicit_repo_id is not None: + return explicit_repo_id + + # Check CODEGEN_REPO_ID environment variable + env_val = os.environ.get("CODEGEN_REPO_ID") + if env_val is not None and env_val != "": + try: + return int(env_val) + except ValueError: + console.print(f"[red]Error:[/red] Invalid CODEGEN_REPO_ID value: {env_val}") + return None + + # Check REPOSITORY_ID environment variable + repo_id_env = os.environ.get("REPOSITORY_ID") + if repo_id_env is not None and repo_id_env != "": + try: + return int(repo_id_env) + except ValueError: + console.print(f"[red]Error:[/red] Invalid REPOSITORY_ID value: {repo_id_env}") + return None + + return None + + +def get_current_repo_id() -> int | None: + """Get the current repository ID from environment variables.""" + return resolve_repo_id() + + +def get_repo_env_status() -> Dict[str, str]: + """Get the status of repository-related environment variables.""" + return { + "CODEGEN_REPO_ID": os.environ.get("CODEGEN_REPO_ID", "Not set"), + "REPOSITORY_ID": os.environ.get("REPOSITORY_ID", "Not set"), + } + + +def set_repo_env_variable(repo_id: int, var_name: str = "CODEGEN_REPO_ID") -> bool: + """Set repository ID in environment variable. + + Args: + repo_id: Repository ID to set + var_name: Environment variable name (default: CODEGEN_REPO_ID) + + Returns: + True if successful, False otherwise + """ + try: + os.environ[var_name] = str(repo_id) + return True + except Exception as e: + console.print(f"[red]Error setting {var_name}:[/red] {e}") + return False + + +def clear_repo_env_variables() -> None: + """Clear all repository-related environment variables.""" + env_vars = ["CODEGEN_REPO_ID", "REPOSITORY_ID"] + for var in env_vars: + if var in os.environ: + del os.environ[var] + + +def update_env_file_with_repo(repo_id: int, env_file_path: str = ".env") -> bool: + """Update .env file with repository ID.""" + try: + lines = [] + key_updated = False + key_to_update = "CODEGEN_REPO_ID" + + # Read existing .env file if it exists + if os.path.exists(env_file_path): + with open(env_file_path, "r") as f: + lines = f.readlines() + + # Update or add the key + for i, line in enumerate(lines): + if line.strip().startswith(f"{key_to_update}="): + lines[i] = f"{key_to_update}={repo_id}\n" + key_updated = True + break + + # If key wasn't found, add it + if not key_updated: + if lines and not lines[-1].endswith('\n'): + lines.append('\n') + lines.append(f"{key_to_update}={repo_id}\n") + + # Write back to file + with open(env_file_path, "w") as f: + f.writelines(lines) + + return True + + except Exception as e: + console.print(f"[red]Error updating .env file:[/red] {e}") + return False + + +def get_repo_display_info() -> List[Dict[str, str]]: + """Get repository information for display in TUI.""" + repo_id = get_current_repo_id() + env_status = get_repo_env_status() + + info = [] + + # Current repository ID + if repo_id: + info.append({ + "label": "Current Repository ID", + "value": str(repo_id), + "status": "active" + }) + else: + info.append({ + "label": "Current Repository ID", + "value": "Not configured", + "status": "inactive" + }) + + # Environment variables status + for var_name, value in env_status.items(): + info.append({ + "label": f"{var_name}", + "value": value, + "status": "active" if value != "Not set" else "inactive" + }) + + return info + + +def fetch_repositories_for_org(org_id: int) -> List[Dict[str, Any]]: + """Fetch repositories for an organization. + + Args: + org_id: Organization ID to fetch repositories for + + Returns: + List of repository dictionaries + """ + try: + import requests + from codegen.cli.api.endpoints import API_ENDPOINT + from codegen.cli.auth.token_manager import get_current_token + + token = get_current_token() + if not token: + return [] + + headers = {"Authorization": f"Bearer {token}"} + + # Try the repository endpoint (may not exist yet) + url = f"{API_ENDPOINT.rstrip('/')}/v1/organizations/{org_id}/repositories" + response = requests.get(url, headers=headers) + + if response.status_code == 200: + data = response.json() + return data.get("items", []) + else: + # API endpoint doesn't exist yet, return mock data for demo + return get_mock_repositories() + + except Exception: + # If API fails, return mock data + return get_mock_repositories() + + +def get_mock_repositories() -> List[Dict[str, Any]]: + """Get mock repository data for demonstration. + + Returns: + List of mock repository dictionaries + """ + return [ + {"id": 1, "name": "codegen-sdk", "description": "Codegen SDK repository"}, + {"id": 2, "name": "web-frontend", "description": "Frontend web application"}, + {"id": 3, "name": "api-backend", "description": "Backend API service"}, + {"id": 4, "name": "mobile-app", "description": "Mobile application"}, + {"id": 5, "name": "docs-site", "description": "Documentation website"}, + {"id": 6, "name": "cli-tools", "description": "Command line tools"}, + {"id": 7, "name": "data-pipeline", "description": "Data processing pipeline"}, + {"id": 8, "name": "ml-models", "description": "Machine learning models"}, + ] + + +def ensure_repositories_cached(org_id: int | None = None) -> List[Dict[str, Any]]: + """Ensure repositories are cached for the given organization. + + Args: + org_id: Organization ID (will resolve if not provided) + + Returns: + List of cached repositories + """ + from codegen.cli.auth.token_manager import get_cached_repositories, cache_repositories + from codegen.cli.utils.org import resolve_org_id + + # Get cached repositories first + cached_repos = get_cached_repositories() + if cached_repos: + return cached_repos + + # If no cache, try to fetch from API + if org_id is None: + org_id = resolve_org_id() + + if org_id: + repositories = fetch_repositories_for_org(org_id) + if repositories: + cache_repositories(repositories) + return repositories + + # Fallback to mock data + mock_repos = get_mock_repositories() + cache_repositories(mock_repos) + return mock_repos \ No newline at end of file