diff --git a/README.md b/README.md index 775c4f3..265921a 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ CI Monitor is a command-line tool that lets AI agents and humans instantly acces ```bash # Single command to investigate and fix your active branch's CI failures -cimonitor | claude "If the tests fail, fix and push. Notify me when finished or if you can't solve the problem." +cimonitor | claude "If the tests fail, fix and push. Notify me when finished or if you can't solve the problem. Think hard." # Auto-retry flaky tests and get notified only for real failures cimonitor watch --pr 123 --retry 3 | claude \ diff --git a/cimonitor/cli.py b/cimonitor/cli.py index e32cbe1..1444824 100644 --- a/cimonitor/cli.py +++ b/cimonitor/cli.py @@ -2,12 +2,17 @@ import sys import time -from datetime import datetime import click from .fetcher import GitHubCIFetcher -from .log_parser import LogParser +from .services import ( + get_ci_status, + get_job_details_for_status, + get_job_logs, + retry_failed_workflows, + watch_ci_status, +) # Shared options for targeting commits/PRs/branches @@ -82,76 +87,22 @@ def cli(ctx): def status(branch, commit, pr, verbose): """Show CI status for the target commit/branch/PR.""" try: - # Validate options first, before any API calls validate_target_options(branch, commit, pr) - fetcher = GitHubCIFetcher() owner, repo_name, commit_sha, target_description = get_target_info( fetcher, branch, commit, pr, verbose ) - # Find failed jobs for the target commit - failed_check_runs = fetcher.find_failed_jobs_in_latest_run(owner, repo_name, commit_sha) + # Get CI status using business logic + ci_status = get_ci_status(fetcher, owner, repo_name, commit_sha, target_description) - if not failed_check_runs: + # Early return for no failures + if not ci_status.has_failures: click.echo(f"āœ… No failing CI jobs found for {target_description}!") return - click.echo(f"āŒ Found {len(failed_check_runs)} failing CI job(s) for {target_description}:") - click.echo() - - for i, check_run in enumerate(failed_check_runs, 1): - name = check_run.get("name", "Unknown Job") - conclusion = check_run.get("conclusion", "unknown") - html_url = check_run.get("html_url", "") - - click.echo(f"{'=' * 60}") - click.echo(f"FAILED JOB #{i}: {name}") - click.echo(f"Status: {conclusion}") - click.echo(f"URL: {html_url}") - click.echo(f"{'=' * 60}") - - # Try to get workflow run info and step details - if "actions/runs" in html_url: - try: - # Extract run ID from URL - run_id = html_url.split("/runs/")[1].split("/")[0] - jobs = fetcher.get_workflow_jobs(owner, repo_name, int(run_id)) - - for job in jobs: - if job.get("conclusion") == "failure": - job_name = job.get("name", "Unknown") - - # Show failed steps summary - failed_steps = fetcher.get_failed_steps(job) - - if failed_steps: - click.echo(f"\\nšŸ“‹ Failed Steps in {job_name}:") - for step in failed_steps: - step_name = step["name"] - step_num = step["number"] - duration = "Unknown" - - if step["started_at"] and step["completed_at"]: - start = datetime.fromisoformat( - step["started_at"].replace("Z", "+00:00") - ) - end = datetime.fromisoformat( - step["completed_at"].replace("Z", "+00:00") - ) - duration = f"{(end - start).total_seconds():.1f}s" - - click.echo( - f" āŒ Step {step_num}: {step_name} (took {duration})" - ) - - click.echo() - except Exception as e: - click.echo(f"Error processing job details: {e}") - else: - click.echo("Cannot retrieve detailed information for this check run type") - - click.echo() + # Display failed jobs + _display_failed_jobs_status(fetcher, owner, repo_name, ci_status) except ValueError as e: click.echo(f"Error: {e}", err=True) @@ -169,135 +120,19 @@ def status(branch, commit, pr, verbose): def logs(branch, commit, pr, verbose, raw, job_id): """Show error logs for failed CI jobs.""" try: - # Validate options first, before any API calls validate_target_options(branch, commit, pr) - fetcher = GitHubCIFetcher() owner, repo_name, commit_sha, target_description = get_target_info( fetcher, branch, commit, pr, verbose ) - # Handle specific job ID request - if job_id: - click.echo(f"šŸ“„ Raw logs for job ID {job_id}:") - click.echo("=" * 80) - job_info = fetcher.get_job_by_id(owner, repo_name, job_id) - click.echo(f"Job: {job_info.get('name', 'Unknown')}") - click.echo(f"Status: {job_info.get('conclusion', 'unknown')}") - click.echo(f"URL: {job_info.get('html_url', '')}") - click.echo("-" * 80) - logs_content = fetcher.get_job_logs(owner, repo_name, job_id) - click.echo(logs_content) - return - - # Handle raw logs for all failed jobs - if raw: - all_jobs = fetcher.get_all_jobs_for_commit(owner, repo_name, commit_sha) - failed_jobs = [job for job in all_jobs if job.get("conclusion") == "failure"] - - if not failed_jobs: - click.echo("āœ… No failing jobs found for this commit!") - return - - click.echo(f"šŸ“„ Raw logs for {len(failed_jobs)} failed job(s):") - click.echo() - - for i, job in enumerate(failed_jobs, 1): - job_name = job.get("name", "Unknown") - job_id = job.get("id") - - click.echo(f"{'=' * 80}") - click.echo(f"RAW LOGS #{i}: {job_name} (ID: {job_id})") - click.echo(f"{'=' * 80}") - - if job_id: - logs_content = fetcher.get_job_logs(owner, repo_name, job_id) - click.echo(logs_content) - else: - click.echo("No job ID available") - - click.echo("\\n" + "=" * 80 + "\\n") - return - - # Default: show filtered error logs - failed_check_runs = fetcher.find_failed_jobs_in_latest_run(owner, repo_name, commit_sha) - - if not failed_check_runs: - click.echo(f"āœ… No failing CI jobs found for {target_description}!") - return - - click.echo( - f"šŸ“„ Error logs for {len(failed_check_runs)} failing job(s) in {target_description}:" + # Get logs using business logic + log_result = get_job_logs( + fetcher, owner, repo_name, commit_sha, target_description, job_id, raw ) - click.echo() - for i, check_run in enumerate(failed_check_runs, 1): - name = check_run.get("name", "Unknown Job") - html_url = check_run.get("html_url", "") - - click.echo(f"{'=' * 60}") - click.echo(f"LOGS #{i}: {name}") - click.echo(f"{'=' * 60}") - - # Try to get workflow run info and step details - if "actions/runs" in html_url: - try: - # Extract run ID from URL - run_id = html_url.split("/runs/")[1].split("/")[0] - jobs = fetcher.get_workflow_jobs(owner, repo_name, int(run_id)) - - for job in jobs: - if job.get("conclusion") == "failure": - job_name = job.get("name", "Unknown") - job_id = job.get("id") - - # Get failed steps - failed_steps = fetcher.get_failed_steps(job) - - if job_id and failed_steps: - logs_content = fetcher.get_job_logs(owner, repo_name, job_id) - - # Extract logs for just the failed steps - step_logs = LogParser.extract_step_logs(logs_content, failed_steps) - - if step_logs: - for step_name, step_log in step_logs.items(): - click.echo(f"\\nšŸ“„ Logs for Failed Step: {step_name}") - click.echo("-" * 50) - - # Show only the step-specific logs - if step_log.strip(): - # Filter for error-related content within the step - shown_lines = LogParser.filter_error_lines(step_log) - - if shown_lines: - for line in shown_lines: - if line.strip(): - click.echo(line) - else: - # Fallback to last few lines of the step - step_lines = step_log.split("\\n") - for line in step_lines[-10:]: - if line.strip(): - click.echo(line) - else: - click.echo("No logs found for this step") - else: - click.echo( - f"\\nšŸ“„ Could not extract step-specific logs for {job_name}" - ) - click.echo("šŸ’” This might be due to log format differences") - else: - click.echo("Could not retrieve job logs") - - click.echo() - - except Exception as e: - click.echo(f"Error processing job details: {e}") - else: - click.echo("Cannot retrieve detailed information for this check run type") - - click.echo() + # Display logs based on type + _display_job_logs(log_result) except ValueError as e: click.echo(f"Error: {e}", err=True) @@ -316,151 +151,25 @@ def logs(branch, commit, pr, verbose, raw, job_id): def watch(branch, commit, pr, verbose, until_complete, until_fail, retry): """Watch CI status with real-time updates.""" try: - # Validate options first, before any API calls validate_target_options(branch, commit, pr) - - # Validate watch options - if until_complete and until_fail: - click.echo("Error: Cannot specify both --until-complete and --until-fail", err=True) - sys.exit(1) - - if retry is not None and retry < 1: - click.echo("Error: --retry must be a positive integer", err=True) - sys.exit(1) - - if retry and (until_complete or until_fail): - click.echo( - "Error: Cannot specify --retry with other watch options (retry includes polling)", - err=True, - ) - sys.exit(1) + _validate_watch_options(until_complete, until_fail, retry) fetcher = GitHubCIFetcher() owner, repo_name, commit_sha, target_description = get_target_info( fetcher, branch, commit, pr, verbose ) - click.echo(f"šŸ”„ Watching CI status for {target_description}...") - click.echo(f"šŸ“‹ Commit: {commit_sha}") - if retry: - click.echo(f"šŸ” Will retry failed jobs up to {retry} time(s)") - click.echo("Press Ctrl+C to stop watching\\n") - - poll_interval = 10 # seconds - max_polls = 120 # 20 minutes total - poll_count = 0 - retry_count = 0 - - try: - while poll_count < max_polls: - workflow_runs = fetcher.get_workflow_runs_for_commit(owner, repo_name, commit_sha) - - if not workflow_runs: - click.echo("ā³ No workflow runs found yet...") - else: - click.echo(f"šŸ“Š Found {len(workflow_runs)} workflow run(s):") - - all_completed = True - any_failed = False - failed_runs = [] - - for run in workflow_runs: - name = run.get("name", "Unknown Workflow") - status = run.get("status", "unknown") - conclusion = run.get("conclusion") - created_at = run.get("created_at", "") - updated_at = run.get("updated_at", "") - run_id = run.get("id") - - # Calculate duration - try: - start = datetime.fromisoformat(created_at.replace("Z", "+00:00")) - if updated_at: - end = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) - else: - end = datetime.now(start.tzinfo) - duration = end - start - duration_str = f"{int(duration.total_seconds())}s" - except Exception: - duration_str = "unknown" - - # Status emoji and tracking - if status == "completed": - if conclusion == "success": - emoji = "āœ…" - elif conclusion == "failure": - emoji = "āŒ" - any_failed = True - failed_runs.append(run_id) - elif conclusion == "cancelled": - emoji = "🚫" - any_failed = True - failed_runs.append(run_id) - else: - emoji = "āš ļø" - any_failed = True - failed_runs.append(run_id) - elif status == "in_progress": - emoji = "šŸ”„" - all_completed = False - elif status == "queued": - emoji = "ā³" - all_completed = False - else: - emoji = "ā“" - all_completed = False - - click.echo(f" {emoji} {name} ({status}) - {duration_str}") - - # Check stopping conditions - if until_fail and any_failed: - click.echo("\\nšŸ’„ Stopping on first failure!") - sys.exit(1) - - if all_completed: - if any_failed and retry and retry_count < retry: - retry_count += 1 - click.echo( - f"\\nšŸ” Retrying failed jobs (attempt {retry_count}/{retry})..." - ) - - # Retry failed runs - for run_id in failed_runs: - if fetcher.rerun_failed_jobs(owner, repo_name, run_id): - click.echo(f" āœ… Restarted failed jobs in run {run_id}") - else: - click.echo(f" āŒ Failed to restart jobs in run {run_id}") - - # Reset polling for the retry - poll_count = 0 - time.sleep(30) # Wait a bit longer before starting to poll again - continue - elif any_failed: - if retry and retry_count >= retry: - click.echo( - f"\\nšŸ’„ Max retries ({retry}) reached. Some workflows still failed!" - ) - else: - click.echo("\\nšŸ’„ Some workflows failed!") - sys.exit(1) - else: - click.echo("\\nšŸŽ‰ All workflows completed successfully!") - sys.exit(0) - - if poll_count < max_polls - 1: # Don't sleep on last iteration - click.echo( - f"\\nā° Waiting {poll_interval}s... (poll {poll_count + 1}/{max_polls})" - ) - time.sleep(poll_interval) - - poll_count += 1 - - click.echo("\\nā° Polling timeout reached") - sys.exit(1) - - except KeyboardInterrupt: - click.echo("\\nšŸ‘‹ Watching stopped by user") - sys.exit(0) + _display_watch_header(target_description, commit_sha, retry) + _run_watch_loop( + fetcher, + owner, + repo_name, + commit_sha, + target_description, + until_complete, + until_fail, + retry, + ) except ValueError as e: click.echo(f"Error: {e}", err=True) @@ -470,6 +179,255 @@ def watch(branch, commit, pr, verbose, until_complete, until_fail, retry): sys.exit(1) +def _validate_watch_options(until_complete, until_fail, retry): + """Validate watch command options with early returns.""" + if until_complete and until_fail: + click.echo("Error: Cannot specify both --until-complete and --until-fail", err=True) + sys.exit(1) + + if retry is not None and retry < 1: + click.echo("Error: --retry must be a positive integer", err=True) + sys.exit(1) + + if retry and (until_complete or until_fail): + click.echo( + "Error: Cannot specify --retry with other watch options (retry includes polling)", + err=True, + ) + sys.exit(1) + + +def _display_watch_header(target_description, commit_sha, retry): + """Display header information for watch command.""" + click.echo(f"šŸ”„ Watching CI status for {target_description}...") + click.echo(f"šŸ“‹ Commit: {commit_sha}") + if retry: + click.echo(f"šŸ” Will retry failed jobs up to {retry} time(s)") + click.echo("Press Ctrl+C to stop watching\\n") + + +def _run_watch_loop( + fetcher, owner, repo_name, commit_sha, target_description, until_complete, until_fail, retry +): + """Run the main watch polling loop.""" + poll_interval = 10 # seconds + max_polls = 120 # 20 minutes total + poll_count = 0 + retry_count = 0 + + try: + while poll_count < max_polls: + # Get status for this poll cycle + watch_result = watch_ci_status( + fetcher, + owner, + repo_name, + commit_sha, + target_description, + until_complete, + until_fail, + retry_count if retry else None, + ) + + # Display current status + _display_watch_status(watch_result) + + # Handle watch result with early returns + if not watch_result["continue_watching"]: + _handle_watch_completion(watch_result, retry, retry_count) + return + + if watch_result["status"] == "retry_needed" and retry and retry_count < retry: + retry_count += 1 + click.echo(f"\\nšŸ” Retrying failed jobs (attempt {retry_count}/{retry})...") + + # Retry failed runs + retry_results = retry_failed_workflows( + fetcher, owner, repo_name, watch_result["failed_runs"] + ) + _display_retry_results(retry_results) + + # Reset polling for the retry + poll_count = 0 + time.sleep(30) # Wait longer before starting to poll again + continue + + # Continue polling + if poll_count < max_polls - 1: # Don't sleep on last iteration + click.echo(f"\\nā° Waiting {poll_interval}s... (poll {poll_count + 1}/{max_polls})") + time.sleep(poll_interval) + + poll_count += 1 + + click.echo("\\nā° Polling timeout reached") + sys.exit(1) + + except KeyboardInterrupt: + click.echo("\\nšŸ‘‹ Watching stopped by user") + sys.exit(0) + + +def _display_watch_status(watch_result): + """Display status for current watch poll.""" + if watch_result["status"] == "no_runs": + click.echo("ā³ No workflow runs found yet...") + return + + workflows = watch_result.get("workflows", []) + click.echo(f"šŸ“Š Found {len(workflows)} workflow run(s):") + + for workflow in workflows: + click.echo( + f" {workflow['emoji']} {workflow['name']} ({workflow['status']}) - {workflow['duration']}" + ) + + +def _handle_watch_completion(watch_result, retry, retry_count): + """Handle watch completion with appropriate exit codes.""" + status = watch_result["status"] + + if status == "stop_on_failure": + click.echo("\\nšŸ’„ Stopping on first failure!") + sys.exit(1) + elif status == "failed": + if retry and retry_count >= retry: + click.echo(f"\\nšŸ’„ Max retries ({retry}) reached. Some workflows still failed!") + else: + click.echo("\\nšŸ’„ Some workflows failed!") + sys.exit(1) + elif status == "success": + click.echo("\\nšŸŽ‰ All workflows completed successfully!") + sys.exit(0) + + +def _display_retry_results(retry_results): + """Display results of retry attempts.""" + for run_id, success in retry_results.items(): + if success: + click.echo(f" āœ… Restarted failed jobs in run {run_id}") + else: + click.echo(f" āŒ Failed to restart jobs in run {run_id}") + + +def _display_job_logs(log_result): + """Display job logs based on result type.""" + log_type = log_result["type"] + + if log_type == "specific_job": + _display_specific_job_logs(log_result) + elif log_type == "raw_logs": + _display_raw_logs(log_result) + elif log_type == "filtered_logs": + _display_filtered_logs(log_result) + + +def _display_specific_job_logs(log_result): + """Display logs for a specific job ID.""" + job_info = log_result["job_info"] + logs = log_result["logs"] + + click.echo(f"šŸ“„ Raw logs for job ID {job_info.get('id', 'Unknown')}:") + click.echo("=" * 80) + click.echo(f"Job: {job_info.get('name', 'Unknown')}") + click.echo(f"Status: {job_info.get('conclusion', 'unknown')}") + click.echo(f"URL: {job_info.get('html_url', '')}") + click.echo("-" * 80) + click.echo(logs) + + +def _display_raw_logs(log_result): + """Display raw logs for all failed jobs.""" + failed_jobs = log_result["failed_jobs"] + + if not log_result["has_failures"]: + click.echo("āœ… No failing jobs found for this commit!") + return + + click.echo(f"šŸ“„ Raw logs for {len(failed_jobs)} failed job(s):") + click.echo() + + for i, job_log in enumerate(failed_jobs, 1): + job = job_log["job"] + logs = job_log["logs"] + job_name = job.get("name", "Unknown") + job_id = job.get("id") + + click.echo(f"{'=' * 80}") + click.echo(f"RAW LOGS #{i}: {job_name} (ID: {job_id})") + click.echo(f"{'=' * 80}") + click.echo(logs) + click.echo("\\n" + "=" * 80 + "\\n") + + +def _display_filtered_logs(log_result): + """Display filtered error logs.""" + target_description = log_result["target_description"] + failed_jobs = log_result["failed_jobs"] + + if not log_result["has_failures"]: + click.echo(f"āœ… No failing CI jobs found for {target_description}!") + return + + click.echo(f"šŸ“„ Error logs for {len(failed_jobs)} failing job(s) in {target_description}:") + click.echo() + + for i, job_log in enumerate(failed_jobs, 1): + click.echo(f"{'=' * 60}") + click.echo(f"LOGS #{i}: {job_log['name']}") + click.echo(f"{'=' * 60}") + + if job_log.get("error"): + click.echo(job_log["error"]) + elif job_log["step_logs"]: + for step_name, step_log in job_log["step_logs"].items(): + click.echo(f"\\nšŸ“„ Logs for Failed Step: {step_name}") + click.echo("-" * 50) + if step_log.strip(): + click.echo(step_log) + else: + click.echo("No logs found for this step") + else: + click.echo("Could not retrieve job logs") + + click.echo() + + +def _display_failed_jobs_status(fetcher, owner, repo_name, ci_status): + """Display failed jobs status with step details.""" + click.echo( + f"āŒ Found {len(ci_status.failed_check_runs)} failing CI job(s) for {ci_status.target_description}:" + ) + click.echo() + + for i, check_run in enumerate(ci_status.failed_check_runs, 1): + name = check_run.get("name", "Unknown Job") + conclusion = check_run.get("conclusion", "unknown") + html_url = check_run.get("html_url", "") + + click.echo(f"{'=' * 60}") + click.echo(f"FAILED JOB #{i}: {name}") + click.echo(f"Status: {conclusion}") + click.echo(f"URL: {html_url}") + click.echo(f"{'=' * 60}") + + # Get detailed job information + job_details = get_job_details_for_status(fetcher, owner, repo_name, check_run) + + if not job_details: + click.echo("Cannot retrieve detailed information for this check run type") + click.echo() + continue + + if job_details.failed_steps: + click.echo(f"\\nšŸ“‹ Failed Steps in {job_details.name}:") + for step in job_details.failed_steps: + click.echo(f" āŒ Step {step.number}: {step.name} (took {step.duration})") + click.echo() + else: + click.echo("Cannot retrieve detailed information for this check run type") + click.echo() + + def main(): """Entry point for the CLI.""" cli() diff --git a/cimonitor/log_parser.py b/cimonitor/log_parser.py index 51fd646..eef7815 100644 --- a/cimonitor/log_parser.py +++ b/cimonitor/log_parser.py @@ -12,92 +12,115 @@ def extract_step_logs(full_logs: str, failed_steps: list[dict[str, Any]]) -> dic for step in failed_steps: step_name = step["name"] - step_lines = [] - - # GitHub Actions uses ##[group]Run STEP_NAME and ##[endgroup] as boundaries - # Look for the step by name in the ##[group]Run pattern - capturing = False - - for i, line in enumerate(log_lines): - # Start capturing when we find the step's group marker - if f"##[group]Run {step_name}" in line: - capturing = True - step_lines.append(line) - elif capturing: - step_lines.append(line) - - # Stop capturing when we hit the endgroup for this step - if "##[endgroup]" in line: - # Continue capturing a few more lines for errors that appear after endgroup - for j in range(i + 1, min(i + 10, len(log_lines))): - next_line = log_lines[j] - step_lines.append(next_line) - - # Stop if we hit another group or significant marker - if "##[group]" in next_line or "Post job cleanup" in next_line: - break - break - - if step_lines: - step_logs[step_name] = "\n".join(step_lines) - else: - # Fallback: try partial name matching for steps with complex names - for i, line in enumerate(log_lines): - # Look for key words from the step name in group markers - if "##[group]Run" in line and any( - word.lower() in line.lower() for word in step_name.split() if len(word) > 3 - ): - capturing = True - step_lines = [line] - - # Capture until endgroup - for j in range(i + 1, len(log_lines)): - next_line = log_lines[j] - step_lines.append(next_line) - - if "##[endgroup]" in next_line: - # Get a few more lines for error context - for k in range(j + 1, min(j + 10, len(log_lines))): - error_line = log_lines[k] - step_lines.append(error_line) - if ( - "##[group]" in error_line - or "Post job cleanup" in error_line - ): - break - break - - if step_lines: - step_logs[step_name] = "\n".join(step_lines) - break + + # Try exact name matching first + step_content = LogParser._extract_step_by_exact_name(log_lines, step_name) + if step_content: + step_logs[step_name] = step_content + continue + + # Fallback: try partial name matching + step_content = LogParser._extract_step_by_partial_name(log_lines, step_name) + if step_content: + step_logs[step_name] = step_content return step_logs + @staticmethod + def _extract_step_by_exact_name(log_lines: list[str], step_name: str) -> str | None: + """Extract step logs using exact name matching.""" + step_lines = [] + capturing = False + + for i, line in enumerate(log_lines): + # Early return if we find the exact step + if f"##[group]Run {step_name}" in line: + capturing = True + step_lines.append(line) + continue + + if not capturing: + continue + + step_lines.append(line) + + # Stop capturing when we hit the endgroup for this step + if "##[endgroup]" in line: + # Continue capturing a few more lines for errors that appear after endgroup + LogParser._capture_post_endgroup_lines(log_lines, i, step_lines) + break + + return "\n".join(step_lines) if step_lines else None + + @staticmethod + def _extract_step_by_partial_name(log_lines: list[str], step_name: str) -> str | None: + """Extract step logs using partial name matching as fallback.""" + keywords = [word for word in step_name.split() if len(word) > 3] + if not keywords: + return None + + for i, line in enumerate(log_lines): + # Look for key words from the step name in group markers + if "##[group]Run" not in line: + continue + + if not any(word.lower() in line.lower() for word in keywords): + continue + + step_lines = [line] + + # Capture until endgroup + for j in range(i + 1, len(log_lines)): + next_line = log_lines[j] + step_lines.append(next_line) + + if "##[endgroup]" in next_line: + # Get a few more lines for error context + LogParser._capture_post_endgroup_lines(log_lines, j, step_lines) + break + + return "\n".join(step_lines) if step_lines else None + + return None + + @staticmethod + def _capture_post_endgroup_lines( + log_lines: list[str], endgroup_index: int, step_lines: list[str] + ) -> None: + """Capture additional lines after ##[endgroup] for error context.""" + for k in range(endgroup_index + 1, min(endgroup_index + 10, len(log_lines))): + error_line = log_lines[k] + step_lines.append(error_line) + + # Stop if we hit another group or significant marker + if "##[group]" in error_line or "Post job cleanup" in error_line: + break + @staticmethod def filter_error_lines(step_log: str) -> list[str]: """Filter step logs to show only error-related content.""" step_lines = step_log.split("\n") shown_lines = [] + error_keywords = ["error", "failed", "failure", "āŒ", "āœ—", "exit code", "##[error]"] + for line in step_lines: - # Show lines with error indicators or important info - if ( - any( - keyword in line.lower() - for keyword in [ - "error", - "failed", - "failure", - "āŒ", - "āœ—", - "exit code", - "##[error]", - ] - ) - or "##[group]" in line - or "##[endgroup]" in line - or not line.startswith("2025-") - ): # Include non-timestamp lines + # Early continue for empty lines + if not line.strip(): + continue + + # Include lines with error keywords + if any(keyword in line.lower() for keyword in error_keywords): + shown_lines.append(line) + continue + + # Include GitHub Actions markers + if "##[group]" in line or "##[endgroup]" in line: + shown_lines.append(line) + continue + + # Include non-timestamp lines (command output, not just timestamps) + if not line.startswith("2025-"): shown_lines.append(line) return shown_lines diff --git a/cimonitor/services.py b/cimonitor/services.py new file mode 100644 index 0000000..f503359 --- /dev/null +++ b/cimonitor/services.py @@ -0,0 +1,495 @@ +"""Business logic services for CI Monitor. + +This module contains the core business logic separated from presentation concerns. +All functions use early-return patterns to avoid deep nesting. +""" + +from datetime import datetime +from typing import Any + +from .fetcher import GitHubCIFetcher +from .log_parser import LogParser + + +class CIStatusResult: + """Result object for CI status operations.""" + + def __init__(self, failed_check_runs: list[dict[str, Any]], target_description: str): + self.failed_check_runs = failed_check_runs + self.target_description = target_description + self.has_failures = len(failed_check_runs) > 0 + + +class WorkflowStepInfo: + """Information about a workflow step.""" + + def __init__(self, name: str, number: int, duration: str = "Unknown"): + self.name = name + self.number = number + self.duration = duration + + +class JobDetails: + """Detailed information about a failed job.""" + + def __init__(self, name: str, html_url: str, conclusion: str): + self.name = name + self.html_url = html_url + self.conclusion = conclusion + self.failed_steps: list[WorkflowStepInfo] = [] + self.step_logs: dict[str, str] = {} + + +def get_ci_status( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, commit_sha: str, target_description: str +) -> CIStatusResult: + """Get CI status for a commit with detailed job information. + + Args: + fetcher: GitHub CI fetcher instance + owner: Repository owner + repo_name: Repository name + commit_sha: Target commit SHA + target_description: Human-readable description of target + + Returns: + CIStatusResult with failed jobs and details + """ + failed_check_runs = fetcher.find_failed_jobs_in_latest_run(owner, repo_name, commit_sha) + return CIStatusResult(failed_check_runs, target_description) + + +def get_job_details_for_status( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, check_run: dict[str, Any] +) -> JobDetails | None: + """Get detailed information for a failed job for status display. + + Args: + fetcher: GitHub CI fetcher instance + owner: Repository owner + repo_name: Repository name + check_run: Check run data from GitHub API + + Returns: + JobDetails object or None if details cannot be retrieved + """ + name = check_run.get("name", "Unknown Job") + conclusion = check_run.get("conclusion", "unknown") + html_url = check_run.get("html_url", "") + + job_details = JobDetails(name, html_url, conclusion) + + # Early return if not a workflow run + if "actions/runs" not in html_url: + return job_details + + try: + run_id = _extract_run_id_from_url(html_url) + if not run_id: + return job_details + + jobs = fetcher.get_workflow_jobs(owner, repo_name, run_id) + _add_failed_steps_to_job_details(fetcher, jobs, job_details) + + except Exception: + # Return basic job details even if we can't get step information + pass + + return job_details + + +def get_job_logs( + fetcher: GitHubCIFetcher, + owner: str, + repo_name: str, + commit_sha: str, + target_description: str, + job_id: int | None = None, + raw: bool = False, +) -> dict[str, Any]: + """Get logs for failed CI jobs. + + Args: + fetcher: GitHub CI fetcher instance + owner: Repository owner + repo_name: Repository name + commit_sha: Target commit SHA + target_description: Human-readable description of target + job_id: Specific job ID to get logs for (optional) + raw: Whether to return raw logs + + Returns: + Dictionary with log information and metadata + """ + # Handle specific job ID request + if job_id: + return _get_specific_job_logs(fetcher, owner, repo_name, job_id) + + # Handle raw logs request + if raw: + return _get_raw_logs_for_commit(fetcher, owner, repo_name, commit_sha) + + # Default: return filtered error logs + return _get_filtered_error_logs(fetcher, owner, repo_name, commit_sha, target_description) + + +def watch_ci_status( + fetcher: GitHubCIFetcher, + owner: str, + repo_name: str, + commit_sha: str, + target_description: str, + until_complete: bool = False, + until_fail: bool = False, + retry_count: int | None = None, +) -> dict[str, Any]: + """Watch CI status for a single poll cycle. + + Args: + fetcher: GitHub CI fetcher instance + owner: Repository owner + repo_name: Repository name + commit_sha: Target commit SHA + target_description: Human-readable description of target + until_complete: Whether to wait until completion + until_fail: Whether to stop on first failure + retry_count: Number of retries remaining + + Returns: + Dictionary with workflow status and recommended actions + """ + workflow_runs = fetcher.get_workflow_runs_for_commit(owner, repo_name, commit_sha) + + if not workflow_runs: + return { + "status": "no_runs", + "message": "No workflow runs found yet", + "continue_watching": True, + } + + status_summary = _analyze_workflow_runs(workflow_runs) + + # Check stopping conditions with early returns + if until_fail and status_summary["any_failed"]: + return { + "status": "stop_on_failure", + "workflows": status_summary["workflows"], + "continue_watching": False, + } + + if not status_summary["all_completed"]: + return { + "status": "in_progress", + "workflows": status_summary["workflows"], + "continue_watching": True, + } + + # All completed - check if we need to retry + if status_summary["any_failed"] and retry_count and retry_count > 0: + return { + "status": "retry_needed", + "workflows": status_summary["workflows"], + "failed_runs": status_summary["failed_runs"], + "continue_watching": True, + } + + if status_summary["any_failed"]: + return { + "status": "failed", + "workflows": status_summary["workflows"], + "continue_watching": False, + } + + return { + "status": "success", + "workflows": status_summary["workflows"], + "continue_watching": False, + } + + +def retry_failed_workflows( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, failed_run_ids: list[int] +) -> dict[int, bool]: + """Retry failed workflow runs. + + Args: + fetcher: GitHub CI fetcher instance + owner: Repository owner + repo_name: Repository name + failed_run_ids: List of run IDs to retry + + Returns: + Dictionary mapping run_id to success status + """ + results = {} + for run_id in failed_run_ids: + results[run_id] = fetcher.rerun_failed_jobs(owner, repo_name, run_id) + return results + + +# Private helper functions + + +def _extract_run_id_from_url(html_url: str) -> int | None: + """Extract run ID from GitHub Actions URL.""" + if "actions/runs" not in html_url: + return None + try: + return int(html_url.split("/runs/")[1].split("/")[0]) + except (IndexError, ValueError): + return None + + +def _add_failed_steps_to_job_details( + fetcher: GitHubCIFetcher, jobs: list[dict[str, Any]], job_details: JobDetails +) -> None: + """Add failed step information to job details.""" + for job in jobs: + if job.get("conclusion") != "failure": + continue + + failed_steps = fetcher.get_failed_steps(job) + if not failed_steps: + continue + + for step in failed_steps: + duration = _calculate_step_duration(step) + step_info = WorkflowStepInfo(step["name"], step["number"], duration) + job_details.failed_steps.append(step_info) + + +def _calculate_step_duration(step: dict[str, Any]) -> str: + """Calculate duration for a workflow step.""" + if not step.get("started_at") or not step.get("completed_at"): + return "Unknown" + + try: + start = datetime.fromisoformat(step["started_at"].replace("Z", "+00:00")) + end = datetime.fromisoformat(step["completed_at"].replace("Z", "+00:00")) + return f"{(end - start).total_seconds():.1f}s" + except Exception: + return "Unknown" + + +def _get_specific_job_logs( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, job_id: int +) -> dict[str, Any]: + """Get logs for a specific job ID.""" + job_info = fetcher.get_job_by_id(owner, repo_name, job_id) + logs_content = fetcher.get_job_logs(owner, repo_name, job_id) + + return {"type": "specific_job", "job_info": job_info, "logs": logs_content} + + +def _get_raw_logs_for_commit( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, commit_sha: str +) -> dict[str, Any]: + """Get raw logs for all failed jobs in a commit.""" + all_jobs = fetcher.get_all_jobs_for_commit(owner, repo_name, commit_sha) + failed_jobs = [job for job in all_jobs if job.get("conclusion") == "failure"] + + if not failed_jobs: + return {"type": "raw_logs", "failed_jobs": [], "has_failures": False} + + # Fetch logs for each failed job + job_logs = [] + for job in failed_jobs: + job_id = job.get("id") + if not job_id: + continue + + logs_content = fetcher.get_job_logs(owner, repo_name, job_id) + job_logs.append({"job": job, "logs": logs_content}) + + return {"type": "raw_logs", "failed_jobs": job_logs, "has_failures": True} + + +def _get_filtered_error_logs( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, commit_sha: str, target_description: str +) -> dict[str, Any]: + """Get filtered error logs for failed jobs.""" + failed_check_runs = fetcher.find_failed_jobs_in_latest_run(owner, repo_name, commit_sha) + + if not failed_check_runs: + return { + "type": "filtered_logs", + "target_description": target_description, + "failed_jobs": [], + "has_failures": False, + } + + job_logs = [] + for check_run in failed_check_runs: + job_log_info = _process_check_run_for_logs(fetcher, owner, repo_name, check_run) + if job_log_info: + job_logs.append(job_log_info) + + return { + "type": "filtered_logs", + "target_description": target_description, + "failed_jobs": job_logs, + "has_failures": True, + } + + +def _process_check_run_for_logs( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, check_run: dict[str, Any] +) -> dict[str, Any] | None: + """Process a single check run to extract step logs.""" + name = check_run.get("name", "Unknown Job") + html_url = check_run.get("html_url", "") + + # Early return if not a workflow run + if "actions/runs" not in html_url: + return { + "name": name, + "html_url": html_url, + "step_logs": {}, + "error": "Cannot retrieve detailed information for this check run type", + } + + try: + run_id = _extract_run_id_from_url(html_url) + if not run_id: + return None + + jobs = fetcher.get_workflow_jobs(owner, repo_name, run_id) + return _extract_step_logs_from_jobs(fetcher, owner, repo_name, jobs, name) + + except Exception as e: + return { + "name": name, + "html_url": html_url, + "step_logs": {}, + "error": f"Error processing job details: {e}", + } + + +def _extract_step_logs_from_jobs( + fetcher: GitHubCIFetcher, owner: str, repo_name: str, jobs: list[dict[str, Any]], job_name: str +) -> dict[str, Any] | None: + """Extract step logs from workflow jobs.""" + for job in jobs: + if job.get("conclusion") != "failure": + continue + + job_id = job.get("id") + failed_steps = fetcher.get_failed_steps(job) + + if not job_id or not failed_steps: + continue + + logs_content = fetcher.get_job_logs(owner, repo_name, job_id) + step_logs = LogParser.extract_step_logs(logs_content, failed_steps) + + if step_logs: + # Filter each step's logs for errors + filtered_step_logs = {} + for step_name, step_log in step_logs.items(): + if step_log.strip(): + shown_lines = LogParser.filter_error_lines(step_log) + if shown_lines: + filtered_step_logs[step_name] = "\n".join(shown_lines) + else: + # Fallback to last few lines + step_lines = step_log.split("\n") + filtered_step_logs[step_name] = "\n".join( + line for line in step_lines[-10:] if line.strip() + ) + + return { + "name": job_name, + "job_name": job.get("name", "Unknown"), + "step_logs": filtered_step_logs, + "error": None, + } + + return {"name": job_name, "step_logs": {}, "error": "Could not retrieve job logs"} + + +def _analyze_workflow_runs(workflow_runs: list[dict[str, Any]]) -> dict[str, Any]: + """Analyze workflow runs and return status summary.""" + all_completed = True + any_failed = False + failed_runs = [] + workflows = [] + + for run in workflow_runs: + workflow_info = _process_single_workflow_run(run) + workflows.append(workflow_info) + + if not workflow_info["completed"]: + all_completed = False + + if workflow_info["failed"]: + any_failed = True + failed_runs.append(run.get("id")) + + return { + "all_completed": all_completed, + "any_failed": any_failed, + "failed_runs": failed_runs, + "workflows": workflows, + } + + +def _process_single_workflow_run(run: dict[str, Any]) -> dict[str, Any]: + """Process a single workflow run and return status info.""" + name = run.get("name", "Unknown Workflow") + status = run.get("status", "unknown") + conclusion = run.get("conclusion") + + # Calculate duration + duration_str = _calculate_workflow_duration(run) + + # Determine status info + if status == "completed": + completed = True + if conclusion == "success": + emoji = "āœ…" + failed = False + else: + emoji = "āŒ" if conclusion == "failure" else "🚫" if conclusion == "cancelled" else "āš ļø" + failed = True + elif status == "in_progress": + emoji = "šŸ”„" + completed = False + failed = False + elif status == "queued": + emoji = "ā³" + completed = False + failed = False + else: + emoji = "ā“" + completed = False + failed = False + + return { + "name": name, + "status": status, + "conclusion": conclusion, + "emoji": emoji, + "duration": duration_str, + "completed": completed, + "failed": failed, + } + + +def _calculate_workflow_duration(run: dict[str, Any]) -> str: + """Calculate duration for a workflow run.""" + created_at = run.get("created_at", "") + updated_at = run.get("updated_at", "") + + if not created_at: + return "unknown" + + try: + start = datetime.fromisoformat(created_at.replace("Z", "+00:00")) + if updated_at: + end = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) + else: + end = datetime.now(start.tzinfo) + duration = end - start + return f"{int(duration.total_seconds())}s" + except Exception: + return "unknown" diff --git a/tests/test_cli_refactored.py b/tests/test_cli_refactored.py new file mode 100644 index 0000000..b1a1c30 --- /dev/null +++ b/tests/test_cli_refactored.py @@ -0,0 +1,241 @@ +"""Tests for the refactored CLI presentation logic.""" + +from unittest.mock import Mock, patch + +import pytest +from click.testing import CliRunner + +from cimonitor.cli import cli +from cimonitor.services import CIStatusResult, JobDetails, WorkflowStepInfo + + +@pytest.fixture +def runner(): + """Create a Click test runner.""" + return CliRunner() + + +@pytest.fixture +def mock_fetcher(): + """Create a mock GitHubCIFetcher.""" + return Mock() + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.get_ci_status") +def test_status_command_no_failures( + mock_get_ci_status, mock_get_target_info, mock_fetcher_class, runner +): + """Test status command with no failures.""" + # Setup mocks + mock_fetcher_class.return_value = Mock() + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + mock_get_ci_status.return_value = CIStatusResult([], "test branch") + + result = runner.invoke(cli, ["status"]) + + assert result.exit_code == 0 + assert "āœ… No failing CI jobs found for test branch!" in result.output + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.get_ci_status") +@patch("cimonitor.cli.get_job_details_for_status") +def test_status_command_with_failures( + mock_get_job_details, mock_get_ci_status, mock_get_target_info, mock_fetcher_class, runner +): + """Test status command with failures.""" + # Setup mocks + mock_fetcher = Mock() + mock_fetcher_class.return_value = mock_fetcher + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + + failed_runs = [{"name": "Test Job", "conclusion": "failure", "html_url": "https://example.com"}] + mock_get_ci_status.return_value = CIStatusResult(failed_runs, "test branch") + + job_details = JobDetails("Test Job", "https://example.com", "failure") + job_details.failed_steps = [WorkflowStepInfo("Test Step", 1, "30.0s")] + mock_get_job_details.return_value = job_details + + result = runner.invoke(cli, ["status"]) + + assert result.exit_code == 0 + assert "āŒ Found 1 failing CI job(s) for test branch:" in result.output + assert "FAILED JOB #1: Test Job" in result.output + assert "šŸ“‹ Failed Steps in Test Job:" in result.output + assert "āŒ Step 1: Test Step (took 30.0s)" in result.output + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.get_job_logs") +def test_logs_command_no_failures( + mock_get_job_logs, mock_get_target_info, mock_fetcher_class, runner +): + """Test logs command with no failures.""" + # Setup mocks + mock_fetcher_class.return_value = Mock() + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + mock_get_job_logs.return_value = { + "type": "filtered_logs", + "target_description": "test branch", + "failed_jobs": [], + "has_failures": False, + } + + result = runner.invoke(cli, ["logs"]) + + assert result.exit_code == 0 + assert "āœ… No failing CI jobs found for test branch!" in result.output + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.get_job_logs") +def test_logs_command_with_specific_job( + mock_get_job_logs, mock_get_target_info, mock_fetcher_class, runner +): + """Test logs command with specific job ID.""" + # Setup mocks + mock_fetcher_class.return_value = Mock() + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + mock_get_job_logs.return_value = { + "type": "specific_job", + "job_info": { + "id": 123, + "name": "Test Job", + "conclusion": "failure", + "html_url": "https://example.com", + }, + "logs": "test log content", + } + + result = runner.invoke(cli, ["logs", "--job-id", "123"]) + + assert result.exit_code == 0 + assert "šŸ“„ Raw logs for job ID 123:" in result.output + assert "Job: Test Job" in result.output + assert "test log content" in result.output + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.get_job_logs") +def test_logs_command_raw(mock_get_job_logs, mock_get_target_info, mock_fetcher_class, runner): + """Test logs command with raw flag.""" + # Setup mocks + mock_fetcher_class.return_value = Mock() + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + mock_get_job_logs.return_value = { + "type": "raw_logs", + "failed_jobs": [{"job": {"name": "Test Job", "id": 123}, "logs": "raw log content"}], + "has_failures": True, + } + + result = runner.invoke(cli, ["logs", "--raw"]) + + assert result.exit_code == 0 + assert "šŸ“„ Raw logs for 1 failed job(s):" in result.output + assert "RAW LOGS #1: Test Job (ID: 123)" in result.output + assert "raw log content" in result.output + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.get_job_logs") +def test_logs_command_filtered(mock_get_job_logs, mock_get_target_info, mock_fetcher_class, runner): + """Test logs command with filtered output.""" + # Setup mocks + mock_fetcher_class.return_value = Mock() + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + mock_get_job_logs.return_value = { + "type": "filtered_logs", + "target_description": "test branch", + "failed_jobs": [ + {"name": "Test Job", "step_logs": {"Test Step": "error log content"}, "error": None} + ], + "has_failures": True, + } + + result = runner.invoke(cli, ["logs"]) + + assert result.exit_code == 0 + assert "šŸ“„ Error logs for 1 failing job(s) in test branch:" in result.output + assert "LOGS #1: Test Job" in result.output + assert "šŸ“„ Logs for Failed Step: Test Step" in result.output + assert "error log content" in result.output + + +def test_validate_watch_options(runner): + """Test watch option validation.""" + # Test conflicting options + result = runner.invoke(cli, ["watch", "--until-complete", "--until-fail"]) + assert result.exit_code == 1 + assert "Cannot specify both --until-complete and --until-fail" in result.output + + # Test invalid retry count + result = runner.invoke(cli, ["watch", "--retry", "0"]) + assert result.exit_code == 1 + assert "--retry must be a positive integer" in result.output + + # Test retry with other options + result = runner.invoke(cli, ["watch", "--retry", "2", "--until-complete"]) + assert result.exit_code == 1 + assert "Cannot specify --retry with other watch options" in result.output + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.watch_ci_status") +@patch("cimonitor.cli.time.sleep") # Mock sleep to speed up test +def test_watch_command_success( + mock_sleep, mock_watch_ci_status, mock_get_target_info, mock_fetcher_class, runner +): + """Test watch command with successful completion.""" + # Setup mocks + mock_fetcher_class.return_value = Mock() + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + mock_watch_ci_status.return_value = { + "status": "success", + "continue_watching": False, + "workflows": [{"name": "Test", "emoji": "āœ…", "status": "completed", "duration": "300s"}], + } + + result = runner.invoke(cli, ["watch"]) + + assert result.exit_code == 0 + assert "šŸ”„ Watching CI status for test branch..." in result.output + assert "šŸ“Š Found 1 workflow run(s):" in result.output + assert "šŸŽ‰ All workflows completed successfully!" in result.output + + +@patch("cimonitor.cli.GitHubCIFetcher") +@patch("cimonitor.cli.get_target_info") +@patch("cimonitor.cli.watch_ci_status") +def test_watch_command_failure( + mock_watch_ci_status, mock_get_target_info, mock_fetcher_class, runner +): + """Test watch command with failure.""" + # Setup mocks + mock_fetcher_class.return_value = Mock() + mock_get_target_info.return_value = ("owner", "repo", "abc123", "test branch") + mock_watch_ci_status.return_value = { + "status": "failed", + "continue_watching": False, + "workflows": [{"name": "Test", "emoji": "āŒ", "status": "completed", "duration": "300s"}], + } + + result = runner.invoke(cli, ["watch"]) + + assert result.exit_code == 1 + assert "šŸ’„ Some workflows failed!" in result.output + + +def test_target_validation(runner): + """Test target option validation.""" + result = runner.invoke(cli, ["status", "--branch", "main", "--commit", "abc123"]) + + assert result.exit_code == 1 + assert "Please specify only one of --branch, --commit, or --pr" in result.output diff --git a/tests/test_services.py b/tests/test_services.py new file mode 100644 index 0000000..d430207 --- /dev/null +++ b/tests/test_services.py @@ -0,0 +1,358 @@ +"""Tests for the services module.""" + +from unittest.mock import Mock + +import pytest + +from cimonitor.services import ( + CIStatusResult, + JobDetails, + WorkflowStepInfo, + get_ci_status, + get_job_details_for_status, + get_job_logs, + retry_failed_workflows, + watch_ci_status, +) + + +@pytest.fixture +def mock_fetcher(): + """Create a mock GitHubCIFetcher for testing.""" + return Mock() + + +def test_ci_status_result(): + """Test CIStatusResult class.""" + failed_runs = [{"name": "test", "conclusion": "failure"}] + result = CIStatusResult(failed_runs, "test branch") + + assert result.failed_check_runs == failed_runs + assert result.target_description == "test branch" + assert result.has_failures is True + + # Test with no failures + empty_result = CIStatusResult([], "test branch") + assert empty_result.has_failures is False + + +def test_workflow_step_info(): + """Test WorkflowStepInfo class.""" + step = WorkflowStepInfo("Test Step", 1, "30.5s") + + assert step.name == "Test Step" + assert step.number == 1 + assert step.duration == "30.5s" + + +def test_job_details(): + """Test JobDetails class.""" + job = JobDetails("Test Job", "https://example.com", "failure") + + assert job.name == "Test Job" + assert job.html_url == "https://example.com" + assert job.conclusion == "failure" + assert job.failed_steps == [] + assert job.step_logs == {} + + +def test_get_ci_status(mock_fetcher): + """Test get_ci_status function.""" + failed_runs = [{"name": "test", "conclusion": "failure"}] + mock_fetcher.find_failed_jobs_in_latest_run.return_value = failed_runs + + result = get_ci_status(mock_fetcher, "owner", "repo", "abc123", "test branch") + + assert isinstance(result, CIStatusResult) + assert result.failed_check_runs == failed_runs + assert result.target_description == "test branch" + assert result.has_failures is True + + mock_fetcher.find_failed_jobs_in_latest_run.assert_called_once_with("owner", "repo", "abc123") + + +def test_get_job_details_for_status_basic(mock_fetcher): + """Test get_job_details_for_status with basic check run.""" + check_run = { + "name": "Test Job", + "conclusion": "failure", + "html_url": "https://example.com/basic", + } + + result = get_job_details_for_status(mock_fetcher, "owner", "repo", check_run) + + assert result.name == "Test Job" + assert result.conclusion == "failure" + assert result.html_url == "https://example.com/basic" + assert result.failed_steps == [] + + +def test_get_job_details_for_status_with_workflow(mock_fetcher): + """Test get_job_details_for_status with workflow run.""" + check_run = { + "name": "Test Job", + "conclusion": "failure", + "html_url": "https://github.com/owner/repo/actions/runs/123456", + } + + mock_jobs = [ + { + "conclusion": "failure", + "name": "Job 1", + "steps": [ + { + "name": "Step 1", + "number": 1, + "conclusion": "failure", + "started_at": "2025-01-01T10:00:00Z", + "completed_at": "2025-01-01T10:01:00Z", + } + ], + } + ] + + mock_fetcher.get_workflow_jobs.return_value = mock_jobs + mock_fetcher.get_failed_steps.return_value = [ + { + "name": "Step 1", + "number": 1, + "started_at": "2025-01-01T10:00:00Z", + "completed_at": "2025-01-01T10:01:00Z", + } + ] + + result = get_job_details_for_status(mock_fetcher, "owner", "repo", check_run) + + assert result.name == "Test Job" + assert len(result.failed_steps) == 1 + assert result.failed_steps[0].name == "Step 1" + assert result.failed_steps[0].number == 1 + assert result.failed_steps[0].duration == "60.0s" + + +def test_get_job_logs_specific_job(mock_fetcher): + """Test get_job_logs with specific job ID.""" + job_info = {"id": 123, "name": "Test Job", "conclusion": "failure"} + logs = "test log content" + + mock_fetcher.get_job_by_id.return_value = job_info + mock_fetcher.get_job_logs.return_value = logs + + result = get_job_logs(mock_fetcher, "owner", "repo", "abc123", "test", job_id=123) + + assert result["type"] == "specific_job" + assert result["job_info"] == job_info + assert result["logs"] == logs + + +def test_get_job_logs_raw(mock_fetcher): + """Test get_job_logs with raw flag.""" + failed_jobs = [{"id": 123, "name": "Test Job", "conclusion": "failure"}] + + mock_fetcher.get_all_jobs_for_commit.return_value = failed_jobs + mock_fetcher.get_job_logs.return_value = "test logs" + + result = get_job_logs(mock_fetcher, "owner", "repo", "abc123", "test", raw=True) + + assert result["type"] == "raw_logs" + assert result["has_failures"] is True + assert len(result["failed_jobs"]) == 1 + assert result["failed_jobs"][0]["job"] == failed_jobs[0] + assert result["failed_jobs"][0]["logs"] == "test logs" + + +def test_get_job_logs_filtered(mock_fetcher): + """Test get_job_logs with filtered output.""" + failed_runs = [ + {"name": "Test Job", "html_url": "https://github.com/owner/repo/actions/runs/123"} + ] + + mock_fetcher.find_failed_jobs_in_latest_run.return_value = failed_runs + mock_fetcher.get_workflow_jobs.return_value = [] + + result = get_job_logs(mock_fetcher, "owner", "repo", "abc123", "test") + + assert result["type"] == "filtered_logs" + assert result["target_description"] == "test" + assert result["has_failures"] is True + + +def test_watch_ci_status_no_runs(mock_fetcher): + """Test watch_ci_status with no workflow runs.""" + mock_fetcher.get_workflow_runs_for_commit.return_value = [] + + result = watch_ci_status(mock_fetcher, "owner", "repo", "abc123", "test") + + assert result["status"] == "no_runs" + assert result["continue_watching"] is True + + +def test_watch_ci_status_success(mock_fetcher): + """Test watch_ci_status with successful workflows.""" + mock_runs = [ + { + "name": "Test Workflow", + "status": "completed", + "conclusion": "success", + "created_at": "2025-01-01T10:00:00Z", + "updated_at": "2025-01-01T10:05:00Z", + "id": 123, + } + ] + + mock_fetcher.get_workflow_runs_for_commit.return_value = mock_runs + + result = watch_ci_status(mock_fetcher, "owner", "repo", "abc123", "test") + + assert result["status"] == "success" + assert result["continue_watching"] is False + assert len(result["workflows"]) == 1 + assert result["workflows"][0]["name"] == "Test Workflow" + assert result["workflows"][0]["emoji"] == "āœ…" + + +def test_watch_ci_status_failure(mock_fetcher): + """Test watch_ci_status with failed workflows.""" + mock_runs = [ + { + "name": "Test Workflow", + "status": "completed", + "conclusion": "failure", + "created_at": "2025-01-01T10:00:00Z", + "updated_at": "2025-01-01T10:05:00Z", + "id": 123, + } + ] + + mock_fetcher.get_workflow_runs_for_commit.return_value = mock_runs + + result = watch_ci_status(mock_fetcher, "owner", "repo", "abc123", "test") + + assert result["status"] == "failed" + assert result["continue_watching"] is False + assert result["workflows"][0]["emoji"] == "āŒ" + + +def test_watch_ci_status_in_progress(mock_fetcher): + """Test watch_ci_status with in-progress workflows.""" + mock_runs = [ + { + "name": "Test Workflow", + "status": "in_progress", + "conclusion": None, + "created_at": "2025-01-01T10:00:00Z", + "updated_at": "2025-01-01T10:02:00Z", + "id": 123, + } + ] + + mock_fetcher.get_workflow_runs_for_commit.return_value = mock_runs + + result = watch_ci_status(mock_fetcher, "owner", "repo", "abc123", "test") + + assert result["status"] == "in_progress" + assert result["continue_watching"] is True + assert result["workflows"][0]["emoji"] == "šŸ”„" + + +def test_watch_ci_status_until_fail(mock_fetcher): + """Test watch_ci_status with until_fail flag.""" + mock_runs = [ + { + "name": "Test Workflow", + "status": "completed", + "conclusion": "failure", + "created_at": "2025-01-01T10:00:00Z", + "updated_at": "2025-01-01T10:05:00Z", + "id": 123, + } + ] + + mock_fetcher.get_workflow_runs_for_commit.return_value = mock_runs + + result = watch_ci_status(mock_fetcher, "owner", "repo", "abc123", "test", until_fail=True) + + assert result["status"] == "stop_on_failure" + assert result["continue_watching"] is False + + +def test_watch_ci_status_retry_needed(mock_fetcher): + """Test watch_ci_status with retry needed.""" + mock_runs = [ + { + "name": "Test Workflow", + "status": "completed", + "conclusion": "failure", + "created_at": "2025-01-01T10:00:00Z", + "updated_at": "2025-01-01T10:05:00Z", + "id": 123, + } + ] + + mock_fetcher.get_workflow_runs_for_commit.return_value = mock_runs + + result = watch_ci_status(mock_fetcher, "owner", "repo", "abc123", "test", retry_count=1) + + assert result["status"] == "retry_needed" + assert result["continue_watching"] is True + assert 123 in result["failed_runs"] + + +def test_retry_failed_workflows(mock_fetcher): + """Test retry_failed_workflows function.""" + mock_fetcher.rerun_failed_jobs.side_effect = [True, False] + + result = retry_failed_workflows(mock_fetcher, "owner", "repo", [123, 456]) + + assert result[123] is True + assert result[456] is False + assert mock_fetcher.rerun_failed_jobs.call_count == 2 + + +def test_workflow_duration_calculation(): + """Test workflow duration calculation edge cases.""" + from cimonitor.services import _calculate_workflow_duration + + # Test with missing created_at + run = {"updated_at": "2025-01-01T10:05:00Z"} + assert _calculate_workflow_duration(run) == "unknown" + + # Test with invalid datetime format + run = {"created_at": "invalid", "updated_at": "2025-01-01T10:05:00Z"} + assert _calculate_workflow_duration(run) == "unknown" + + # Test with valid datetimes + run = {"created_at": "2025-01-01T10:00:00Z", "updated_at": "2025-01-01T10:05:00Z"} + assert _calculate_workflow_duration(run) == "300s" + + +def test_step_duration_calculation(): + """Test step duration calculation edge cases.""" + from cimonitor.services import _calculate_step_duration + + # Test with missing timestamps + step = {"started_at": None, "completed_at": "2025-01-01T10:01:00Z"} + assert _calculate_step_duration(step) == "Unknown" + + # Test with invalid datetime format + step = {"started_at": "invalid", "completed_at": "2025-01-01T10:01:00Z"} + assert _calculate_step_duration(step) == "Unknown" + + # Test with valid timestamps + step = {"started_at": "2025-01-01T10:00:00Z", "completed_at": "2025-01-01T10:01:00Z"} + assert _calculate_step_duration(step) == "60.0s" + + +def test_extract_run_id_from_url(): + """Test run ID extraction from GitHub URLs.""" + from cimonitor.services import _extract_run_id_from_url + + # Test valid URL + url = "https://github.com/owner/repo/actions/runs/123456/jobs/789" + assert _extract_run_id_from_url(url) == 123456 + + # Test invalid URL + assert _extract_run_id_from_url("https://example.com") is None + + # Test malformed URL + assert _extract_run_id_from_url("https://github.com/actions/runs/invalid") is None