diff --git a/.claude/commands/release.md b/.claude/commands/release.md index 8bf74fb..ee77b4a 100644 --- a/.claude/commands/release.md +++ b/.claude/commands/release.md @@ -1,3 +1,5 @@ -1. Make a vx.y.z tag for the next release based on CHANGELOG.md and push it to origin. -2. Use pbcopy to copy the relevant release notes from CHANGELOG.md to the clipboard. -3. Bump the patch version, commit, and push that to main, updating CHANGELOG.md with the new section. \ No newline at end of file +1. Update CHANGELOG.md with the release notes and date for the current version in pyproject.toml. +2. Commit the changelog changes. +3. Make a vx.y.z tag for the release (using the current version from pyproject.toml) and push it to origin. +4. Use pbcopy to copy the relevant release notes from CHANGELOG.md to the clipboard. +5. Bump the patch version in pyproject.toml to the next version, commit, and push that to main, updating CHANGELOG.md with the new unreleased section. diff --git a/CHANGELOG.md b/CHANGELOG.md index d835d16..5d6d298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,29 @@ ### Added +## 0.1.4 - TBD + +### Added + +## 0.1.3 - 2025-07-16 + +### Added +- Improved release process documentation +- Fixed version management workflow + +### Changed +- Corrected release workflow to maintain proper version sequencing + +## 0.1.2 - 2025-07-16 + +### Added +- --repo argument for specifying repository +- Log parsing fallback improvements +- Better command-line documentation + +### Changed +- Refactored codebase for improved maintainability + ## 0.1.0 - 2025-01-13 Initial release diff --git a/cimonitor/cli.py b/cimonitor/cli.py index 8d10e08..ebbac08 100644 --- a/cimonitor/cli.py +++ b/cimonitor/cli.py @@ -194,7 +194,14 @@ def status(repo, branch, commit, pr, verbose): @click.option("--verbose", "-v", is_flag=True, help="Show verbose output") @click.option("--raw", is_flag=True, help="Show complete raw logs (for debugging)") @click.option("--job-id", type=int, help="Show logs for specific job ID only") -def logs(repo, branch, commit, pr, verbose, raw, job_id): +@click.option( + "--show-groups/--no-show-groups", default=True, help="Show available log groups/steps summary" +) +@click.option("--step-filter", help="Filter to steps matching this pattern (e.g., 'test', 'build')") +@click.option( + "--group-filter", help="Filter to groups matching this pattern (e.g., 'Run actions', 'mise')" +) +def logs(repo, branch, commit, pr, verbose, raw, job_id, show_groups, step_filter, group_filter): """Show error logs for failed CI jobs.""" try: validate_target_options(branch, commit, pr) @@ -205,7 +212,16 @@ def logs(repo, branch, commit, pr, verbose, raw, job_id): # Get logs using business logic log_result = get_job_logs( - fetcher, owner, repo_name, commit_sha, target_description, job_id, raw + fetcher, + owner, + repo_name, + commit_sha, + target_description, + job_id, + raw, + show_groups, + step_filter, + group_filter, ) # Display logs based on type @@ -437,15 +453,49 @@ def _display_raw_logs(log_result): def _display_filtered_logs(log_result): - """Display filtered error logs.""" + """Display filtered error logs with group information.""" target_description = log_result["target_description"] failed_jobs = log_result["failed_jobs"] + show_groups = log_result.get("show_groups", True) + groups = log_result.get("groups", []) + filters = log_result.get("filters", {}) if not log_result["has_failures"]: click.echo(f"✅ No failing CI jobs found for {target_description}!") return - click.echo(f"📄 Error logs for {len(failed_jobs)} failing job(s) in {target_description}:") + # Show group summary and step status at the top + if show_groups and groups: + click.echo(f"📋 Available log groups in {target_description}:") + click.echo("=" * 60) + + # Show groups with nesting + _display_groups_with_nesting(groups) + + # Show step status summary + all_step_status = {} + for job_log in failed_jobs: + if "step_status" in job_log: + all_step_status.update(job_log["step_status"]) + + if all_step_status: + _display_step_status_summary(all_step_status) + + # Show active filters + if filters.get("step_filter") or filters.get("group_filter"): + click.echo("🔍 Active Filters:") + if filters.get("step_filter"): + click.echo(f" • Step filter: '{filters['step_filter']}'") + if filters.get("group_filter"): + click.echo(f" • Group filter: '{filters['group_filter']}'") + + click.echo("=" * 60) + click.echo("💡 Use --step-filter or --group-filter to focus on specific sections") + click.echo('💡 Example: --group-filter="mise run test"') + click.echo("💡 Use --show-groups=false to hide this summary") + click.echo() + + click.echo(f"📄 Error logs for {len(failed_jobs)} failing job(s):") click.echo() for i, job_log in enumerate(failed_jobs, 1): @@ -469,6 +519,54 @@ def _display_filtered_logs(log_result): click.echo() +def _display_groups_with_nesting(groups): + """Display groups with proper nesting indentation.""" + step_groups = [g for g in groups if g["type"] == "step"] + setup_groups = [g for g in groups if g["type"] == "setup"] + + if setup_groups: + click.echo("🔧 Setup/System Groups:") + for group in setup_groups: + indent = " " + " " * group.get("nesting_level", 0) + click.echo(f"{indent}• {group['name']} (line {group['line_number']})") + + if step_groups: + click.echo("🏃 Step Groups:") + for group in step_groups: + indent = " " + " " * group.get("nesting_level", 0) + click.echo(f"{indent}• {group['name']} (line {group['line_number']})") + + +def _display_step_status_summary(step_status): + """Display deterministic step status summary.""" + click.echo("📊 Step Status Summary:") + + success_steps = [] + failed_steps = [] + other_steps = [] + + for step_name, status in step_status.items(): + if status["conclusion"] == "success": + success_steps.append(step_name) + elif status["conclusion"] == "failure": + failed_steps.append(step_name) + else: + other_steps.append((step_name, status["conclusion"])) + + if success_steps: + click.echo(f" ✅ {len(success_steps)} successful steps") + + if failed_steps: + click.echo(f" ❌ {len(failed_steps)} failed steps:") + for step in failed_steps: + click.echo(f" • {step}") + + if other_steps: + for step_name, conclusion in other_steps: + icon = "⏭️" if conclusion == "skipped" else "🚫" if conclusion == "cancelled" else "❓" + click.echo(f" {icon} {step_name} ({conclusion})") + + def _display_failed_jobs_status(fetcher, owner, repo_name, ci_status): """Display failed jobs status with step details.""" click.echo( diff --git a/cimonitor/log_parser.py b/cimonitor/log_parser.py index eef7815..3bf202a 100644 --- a/cimonitor/log_parser.py +++ b/cimonitor/log_parser.py @@ -5,27 +5,257 @@ class LogParser: @staticmethod - def extract_step_logs(full_logs: str, failed_steps: list[dict[str, Any]]) -> dict[str, str]: - """Extract log sections for specific failed steps using GitHub's step markers.""" + def extract_step_logs( + full_logs: str, failed_steps: list[dict[str, Any]], all_steps: list[dict[str, Any]] = None + ) -> dict[str, str]: + """Extract log sections for specific failed steps using exact name matching only.""" step_logs = {} log_lines = full_logs.split("\n") for step in failed_steps: step_name = step["name"] - # Try exact name matching first + # Try exact name matching only - no heuristics step_content = LogParser._extract_step_by_exact_name(log_lines, step_name) if step_content: step_logs[step_name] = step_content - continue - - # Fallback: try partial name matching - step_content = LogParser._extract_step_by_partial_name(log_lines, step_name) - if step_content: - step_logs[step_name] = step_content return step_logs + @staticmethod + def parse_log_groups(full_logs: str) -> list[dict[str, str]]: + """Parse all ##[group] sections in the logs and return metadata with nesting.""" + log_lines = full_logs.split("\n") + groups = [] + group_stack = [] + + for i, line in enumerate(log_lines): + if "##[group]" in line: + # Extract the group name + if "##[group]Run " in line: + # For Run commands, extract the command + group_name = ( + line.split("##[group]Run ", 1)[1] if "##[group]Run " in line else line + ) + group_type = "step" + else: + # For other groups, extract the group name + group_name = line.split("##[group]", 1)[1] if "##[group]" in line else line + group_type = "setup" + + # Extract timestamp if present + timestamp = "" + if line.startswith("20") and "T" in line and "Z" in line: + timestamp = line.split("Z")[0] + "Z" + + # Track nesting level + nesting_level = len(group_stack) + group_info = { + "name": group_name.strip(), + "type": group_type, + "timestamp": timestamp, + "line_number": i + 1, + "nesting_level": nesting_level, + "parent": group_stack[-1]["name"] if group_stack else None, + } + + groups.append(group_info) + group_stack.append(group_info) + + elif "##[endgroup]" in line and group_stack: + group_stack.pop() + + return groups + + @staticmethod + def get_step_status_info(all_steps: list[dict], failed_steps: list[dict]) -> dict[str, dict]: + """Get deterministic status information for all steps.""" + step_status = {} + + for step in all_steps: + step_name = step.get("name", "Unknown") + step_status[step_name] = { + "status": step.get("status", "unknown"), + "conclusion": step.get("conclusion", "unknown"), + "number": step.get("number"), + "started_at": step.get("started_at"), + "completed_at": step.get("completed_at"), + "is_failed": step in failed_steps, + } + + return step_status + + @staticmethod + def _extract_step_by_timestamp( + log_lines: list[str], step_name: str, started_at: str + ) -> str | None: + """Extract step logs using exact timestamp matching with GitHub API step start time.""" + from datetime import datetime + + try: + # Parse the step start time from GitHub API + step_start = datetime.fromisoformat(started_at.replace("Z", "+00:00")) + + # Look for exact timestamp match in logs + for i, line in enumerate(log_lines): + # Look for ##[group]Run markers + if "##[group]Run " not in line: + continue + + # Extract timestamp from log line + if line.startswith("20") and "T" in line and "Z" in line: + try: + # Extract timestamp (format: 2025-07-16T03:13:13.5152643Z) + log_timestamp_str = line.split("Z")[0] + "Z" + log_timestamp = datetime.fromisoformat( + log_timestamp_str.replace("Z", "+00:00") + ) + + # Only accept exact matches within 1 second (to account for subsecond precision) + time_diff = abs((log_timestamp - step_start).total_seconds()) + + if time_diff <= 1.0: + # Extract this section + step_lines = [line] + + # Capture until endgroup + for j in range(i + 1, len(log_lines)): + next_line = log_lines[j] + step_lines.append(next_line) + + if "##[endgroup]" in next_line: + # Continue capturing a few more lines for errors that appear after endgroup + LogParser._capture_post_endgroup_lines(log_lines, j, step_lines) + break + + return "\n".join(step_lines) if step_lines else None + + except Exception: + continue + + except Exception: + # If timestamp parsing fails, fall back to other methods + pass + + return None + + @staticmethod + def _extract_step_by_number_with_context( + log_lines: list[str], step_number: int, step_name: str, all_steps: list[dict[str, Any]] + ) -> str | None: + """Extract step logs using semantic matching between API step name and log markers.""" + # For "Run tests", look for test-related log markers + if "test" in step_name.lower(): + # Find Run markers that contain test-related keywords (be specific to avoid false matches) + test_keywords = ["test", "pytest", "jest", "spec"] + + for i, line in enumerate(log_lines): + if "##[group]Run " in line: + line_lower = line.lower() + if any(keyword in line_lower for keyword in test_keywords): + # Extract this section + step_lines = [line] + + # Capture until endgroup + for j in range(i + 1, len(log_lines)): + next_line = log_lines[j] + step_lines.append(next_line) + + if "##[endgroup]" in next_line: + # Continue capturing a few more lines for errors that appear after endgroup + LogParser._capture_post_endgroup_lines(log_lines, j, step_lines) + break + + return "\n".join(step_lines) if step_lines else None + + # For other step types, try to match by semantic similarity + # Extract key words from step name (excluding "Run") + step_words = [ + word.lower() for word in step_name.replace("Run ", "").split() if len(word) > 2 + ] + + if step_words: + for i, line in enumerate(log_lines): + if "##[group]Run " in line: + line_lower = line.lower() + # Check if any key words from step name appear in the log marker + if any(word in line_lower for word in step_words): + # Extract this section + step_lines = [line] + + # Capture until endgroup + for j in range(i + 1, len(log_lines)): + next_line = log_lines[j] + step_lines.append(next_line) + + if "##[endgroup]" in next_line: + # Continue capturing a few more lines for errors that appear after endgroup + LogParser._capture_post_endgroup_lines(log_lines, j, step_lines) + break + + return "\n".join(step_lines) if step_lines else None + + return None + + @staticmethod + def _extract_step_by_number( + log_lines: list[str], step_number: int, step_name: str + ) -> str | None: + """Extract step logs using step number to find the correct ##[group]Run section. + + Note: This requires access to the full job steps to determine which Run marker + corresponds to the failed step. For now, this is a placeholder that should be + enhanced to use the job context. + """ + # This is a simplified approach - we would need the full job steps context + # to properly map API step numbers to log Run markers + + # For the common case where step_name starts with "Run", we can try to find + # the corresponding log marker by counting Run steps + if not step_name.startswith("Run"): + return None + + # Find all ##[group]Run markers + run_markers = [] + for i, line in enumerate(log_lines): + if "##[group]Run " in line: + run_markers.append(i) + + # This is a heuristic - would be better to use proper job context + # For now, assume the step_number corresponds roughly to run marker position + # This works for many cases but isn't perfect + + # Try different mapping strategies + possible_indices = [ + step_number - 1, # Direct mapping (step 5 -> run 5) + len([i for i in range(1, step_number) if i <= len(run_markers)]) + - 1, # Count preceding runs + ] + + for run_index in possible_indices: + if 0 <= run_index < len(run_markers): + step_start_index = run_markers[run_index] + + # Extract the section from this marker + step_lines = [log_lines[step_start_index]] + + # Capture until endgroup + for j in range(step_start_index + 1, len(log_lines)): + next_line = log_lines[j] + step_lines.append(next_line) + + if "##[endgroup]" in next_line: + # Continue capturing a few more lines for errors that appear after endgroup + LogParser._capture_post_endgroup_lines(log_lines, j, step_lines) + break + + content = "\n".join(step_lines) + # Basic validation - check if this looks like the right step + if step_name.lower().replace(" ", "") in content.lower().replace(" ", ""): + return content + + return None + @staticmethod def _extract_step_by_exact_name(log_lines: list[str], step_name: str) -> str | None: """Extract step logs using exact name matching.""" diff --git a/cimonitor/services.py b/cimonitor/services.py index fef2ad3..6baacea 100644 --- a/cimonitor/services.py +++ b/cimonitor/services.py @@ -106,6 +106,9 @@ def get_job_logs( target_description: str, job_id: int | None = None, raw: bool = False, + show_groups: bool = True, + step_filter: str | None = None, + group_filter: str | None = None, ) -> dict[str, Any]: """Get logs for failed CI jobs. @@ -117,20 +120,34 @@ def get_job_logs( target_description: Human-readable description of target job_id: Specific job ID to get logs for (optional) raw: Whether to return raw logs + show_groups: Whether to show only group summary + step_filter: Filter pattern for steps + group_filter: Filter pattern for groups Returns: Dictionary with log information and metadata """ # Handle specific job ID request if job_id: - return _get_specific_job_logs(fetcher, owner, repo_name, job_id) + return _get_specific_job_logs( + fetcher, owner, repo_name, job_id, show_groups, step_filter, group_filter + ) # Handle raw logs request if raw: return _get_raw_logs_for_commit(fetcher, owner, repo_name, commit_sha) - # Default: return filtered error logs - return _get_filtered_error_logs(fetcher, owner, repo_name, commit_sha, target_description) + # Default: return filtered error logs with group analysis + return _get_filtered_error_logs( + fetcher, + owner, + repo_name, + commit_sha, + target_description, + show_groups, + step_filter, + group_filter, + ) def watch_ci_status( @@ -272,13 +289,32 @@ def _calculate_step_duration(step: dict[str, Any]) -> str: def _get_specific_job_logs( - fetcher: GitHubCIFetcher, owner: str, repo_name: str, job_id: int + fetcher: GitHubCIFetcher, + owner: str, + repo_name: str, + job_id: int, + show_groups: bool = True, + step_filter: str | None = None, + group_filter: str | None = None, ) -> dict[str, Any]: """Get logs for a specific job ID.""" job_info = fetcher.get_job_by_id(owner, repo_name, job_id) logs_content = fetcher.get_job_logs(owner, repo_name, job_id) - return {"type": "specific_job", "job_info": job_info, "logs": logs_content} + # Add group analysis + groups = LogParser.parse_log_groups(logs_content) + + # Apply filters + filtered_groups = _apply_group_filters(groups, step_filter, group_filter) + + return { + "type": "specific_job", + "job_info": job_info, + "logs": logs_content, + "groups": filtered_groups, + "show_groups": show_groups, + "filters": {"step_filter": step_filter, "group_filter": group_filter}, + } def _get_raw_logs_for_commit( @@ -304,8 +340,52 @@ def _get_raw_logs_for_commit( return {"type": "raw_logs", "failed_jobs": job_logs, "has_failures": True} +def _apply_group_filters( + groups: list[dict], step_filter: str | None, group_filter: str | None +) -> list[dict]: + """Apply step and group filters to the list of groups.""" + filtered = groups + + if step_filter: + filtered = [ + g for g in filtered if g["type"] == "step" and step_filter.lower() in g["name"].lower() + ] + + if group_filter: + filtered = [g for g in filtered if group_filter.lower() in g["name"].lower()] + + return filtered + + +def _remove_timestamps(logs: str) -> str: + """Remove timestamp prefixes from log lines for cleaner output.""" + lines = logs.split("\n") + cleaned_lines = [] + + for line in lines: + # Remove timestamp prefix (format: 2025-07-16T03:13:13.5152643Z) + if line.startswith("20") and "T" in line and "Z" in line: + # Find the first space after the timestamp and remove everything before it + parts = line.split(" ", 1) + if len(parts) > 1: + cleaned_lines.append(parts[1]) + else: + cleaned_lines.append(line) + else: + cleaned_lines.append(line) + + return "\n".join(cleaned_lines) + + def _get_filtered_error_logs( - fetcher: GitHubCIFetcher, owner: str, repo_name: str, commit_sha: str, target_description: str + fetcher: GitHubCIFetcher, + owner: str, + repo_name: str, + commit_sha: str, + target_description: str, + show_groups: bool = True, + step_filter: str | None = None, + group_filter: str | None = None, ) -> dict[str, Any]: """Get filtered error logs for failed jobs.""" failed_check_runs = fetcher.find_failed_jobs_in_latest_run(owner, repo_name, commit_sha) @@ -316,24 +396,48 @@ def _get_filtered_error_logs( "target_description": target_description, "failed_jobs": [], "has_failures": False, + "groups": [], + "show_groups": show_groups, + "filters": {"step_filter": step_filter, "group_filter": group_filter}, } job_logs = [] + all_groups = [] + seen_groups = set() # Track unique groups by (name, type) + for check_run in failed_check_runs: - job_log_info = _process_check_run_for_logs(fetcher, owner, repo_name, check_run) + job_log_info = _process_check_run_for_logs( + fetcher, owner, repo_name, check_run, show_groups, step_filter, group_filter + ) if job_log_info: job_logs.append(job_log_info) + # Collect groups from each job, avoiding duplicates + if "groups" in job_log_info: + for group in job_log_info["groups"]: + group_key = (group["name"], group["type"]) + if group_key not in seen_groups: + seen_groups.add(group_key) + all_groups.append(group) return { "type": "filtered_logs", "target_description": target_description, "failed_jobs": job_logs, "has_failures": True, + "groups": all_groups, + "show_groups": show_groups, + "filters": {"step_filter": step_filter, "group_filter": group_filter}, } def _process_check_run_for_logs( - fetcher: GitHubCIFetcher, owner: str, repo_name: str, check_run: dict[str, Any] + fetcher: GitHubCIFetcher, + owner: str, + repo_name: str, + check_run: dict[str, Any], + show_groups: bool = True, + step_filter: str | None = None, + group_filter: str | None = None, ) -> dict[str, Any] | None: """Process a single check run to extract step logs.""" name = check_run.get("name", "Unknown Job") @@ -354,7 +458,9 @@ def _process_check_run_for_logs( return None jobs = fetcher.get_workflow_jobs(owner, repo_name, run_id) - return _extract_step_logs_from_jobs(fetcher, owner, repo_name, jobs, name) + return _extract_step_logs_from_jobs( + fetcher, owner, repo_name, jobs, name, show_groups, step_filter, group_filter + ) except Exception as e: return { @@ -366,7 +472,14 @@ def _process_check_run_for_logs( def _extract_step_logs_from_jobs( - fetcher: GitHubCIFetcher, owner: str, repo_name: str, jobs: list[dict[str, Any]], job_name: str + fetcher: GitHubCIFetcher, + owner: str, + repo_name: str, + jobs: list[dict[str, Any]], + job_name: str, + show_groups: bool = True, + step_filter: str | None = None, + group_filter: str | None = None, ) -> dict[str, Any] | None: """Extract step logs from workflow jobs.""" for job in jobs: @@ -380,35 +493,47 @@ def _extract_step_logs_from_jobs( continue logs_content = fetcher.get_job_logs(owner, repo_name, job_id) - step_logs = LogParser.extract_step_logs(logs_content, failed_steps) + all_steps = job.get("steps", []) + + # Add group analysis and step status + groups = LogParser.parse_log_groups(logs_content) + filtered_groups = _apply_group_filters(groups, step_filter, group_filter) + step_status = LogParser.get_step_status_info(all_steps, failed_steps) + + step_logs = LogParser.extract_step_logs(logs_content, failed_steps, all_steps) if step_logs: - # Filter each step's logs for errors + # Filter each step's logs for errors and remove timestamps filtered_step_logs = {} for step_name, step_log in step_logs.items(): if step_log.strip(): shown_lines = LogParser.filter_error_lines(step_log) if shown_lines: - filtered_step_logs[step_name] = "\n".join(shown_lines) + clean_log = "\n".join(shown_lines) + filtered_step_logs[step_name] = _remove_timestamps(clean_log) else: # Fallback to last few lines step_lines = step_log.split("\n") - filtered_step_logs[step_name] = "\n".join( - line for line in step_lines[-10:] if line.strip() - ) + clean_log = "\n".join(line for line in step_lines[-10:] if line.strip()) + filtered_step_logs[step_name] = _remove_timestamps(clean_log) return { "name": job_name, "job_name": job.get("name", "Unknown"), "step_logs": filtered_step_logs, + "groups": filtered_groups, + "step_status": step_status, "error": None, } else: - # Fallback: show all logs when step parsing fails + # Fallback: show all logs when step parsing fails (remove timestamps) + clean_logs = _remove_timestamps(logs_content) return { "name": job_name, "job_name": job.get("name", "Unknown"), - "step_logs": {"Full Job Logs": logs_content}, + "step_logs": {"Full Job Logs": clean_logs}, + "groups": filtered_groups, + "step_status": step_status, "error": None, } diff --git a/pyproject.toml b/pyproject.toml index e5ad178..174c826 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cimonitor" -version = "0.1.1" +version = "0.1.4" description = "Monitor GitHub CI workflows, fetch logs, and track build status" readme = "README.md" license = { text = "MIT" } diff --git a/tests/test_cli_refactored.py b/tests/test_cli_refactored.py index b1a1c30..6c040e1 100644 --- a/tests/test_cli_refactored.py +++ b/tests/test_cli_refactored.py @@ -162,7 +162,7 @@ def test_logs_command_filtered(mock_get_job_logs, mock_get_target_info, mock_fet result = runner.invoke(cli, ["logs"]) assert result.exit_code == 0 - assert "📄 Error logs for 1 failing job(s) in test branch:" in result.output + assert "📄 Error logs for 1 failing job(s):" in result.output assert "LOGS #1: Test Job" in result.output assert "📄 Logs for Failed Step: Test Step" in result.output assert "error log content" in result.output diff --git a/tests/test_log_parser.py b/tests/test_log_parser.py index 93721d4..e055aa5 100644 --- a/tests/test_log_parser.py +++ b/tests/test_log_parser.py @@ -33,7 +33,7 @@ def test_filter_error_lines(): def test_extract_step_logs(): - """Test extracting logs for specific failed steps using real GitHub Actions log structure.""" + """Test extracting logs for specific failed steps with exact name matching.""" # This is based on actual GitHub Actions log data full_logs = """2025-07-13T04:07:48.8923897Z ##[group]Run actions/checkout@v4 2025-07-13T04:07:48.8924737Z with: @@ -55,7 +55,7 @@ def test_extract_step_logs(): failed_steps = [ { - "name": "Intentionally failing step", + "name": 'echo "This step will fail intentionally to test CI log fetching"', # Exact match (without "Run ") "number": 3, "started_at": "2025-07-13T04:07:49.4696757Z", "completed_at": "2025-07-13T04:07:49.4732326Z", @@ -64,8 +64,8 @@ def test_extract_step_logs(): step_logs = LogParser.extract_step_logs(full_logs, failed_steps) - assert "Intentionally failing step" in step_logs - failing_log = step_logs["Intentionally failing step"] + assert 'echo "This step will fail intentionally to test CI log fetching"' in step_logs + failing_log = step_logs['echo "This step will fail intentionally to test CI log fetching"'] # Should include the step content and error context after endgroup assert ( @@ -80,8 +80,8 @@ def test_extract_step_logs(): assert "Run actions/checkout@v4" not in failing_log -def test_extract_step_logs_with_partial_name_matching(): - """Test that step extraction works with partial name matching when exact names don't match.""" +def test_extract_step_logs_fallback_when_no_match(): + """Test that step extraction returns empty dict when no exact match is found.""" # Real scenario where step name doesn't exactly match the ##[group]Run line full_logs = """2025-07-13T04:07:49.4696757Z ##[group]Run echo "This step will fail intentionally to test CI log fetching" 2025-07-13T04:07:49.4697730Z echo "This step will fail intentionally to test CI log fetching" @@ -93,7 +93,7 @@ def test_extract_step_logs_with_partial_name_matching(): # The actual step name as reported by GitHub API vs the command in ##[group]Run failed_steps = [ { - "name": "Intentionally failing step", # This is how GitHub API reports it + "name": "Intentionally failing step", # This won't match exactly "number": 3, "started_at": "2025-07-13T04:07:49.4696757Z", "completed_at": "2025-07-13T04:07:49.4732326Z", @@ -102,7 +102,5 @@ def test_extract_step_logs_with_partial_name_matching(): step_logs = LogParser.extract_step_logs(full_logs, failed_steps) - # Should find the step using partial name matching ("intentionally" keyword) - assert "Intentionally failing step" in step_logs - failing_log = step_logs["Intentionally failing step"] - assert "ERROR: Simulated failure for testing purposes" in failing_log + # Should return empty dict when no exact match is found (deterministic behavior) + assert step_logs == {} diff --git a/uv.lock b/uv.lock index 6f65465..afddbd3 100644 --- a/uv.lock +++ b/uv.lock @@ -83,7 +83,7 @@ wheels = [ [[package]] name = "cimonitor" -version = "0.1.1" +version = "0.1.4" source = { editable = "." } dependencies = [ { name = "click" },