diff --git a/src/ghastoolkit/octokit/codescanning.py b/src/ghastoolkit/octokit/codescanning.py index 991c1e3..a8593a3 100644 --- a/src/ghastoolkit/octokit/codescanning.py +++ b/src/ghastoolkit/octokit/codescanning.py @@ -371,20 +371,36 @@ def getAlertsInPR(self, base: str) -> list[CodeAlert]: if not self.repository.reference or not self.repository.isInPullRequest(): raise GHASToolkitError("Repository is not in a Pull Request") - # Try merge and then head - analysis = self.getAnalyses(reference=self.repository.reference) + # Get PR info to determine the correct reference + pr_info = self.repository.getPullRequestInfo() + if not pr_info: + raise GHASToolkitError("Could not get PR information") + + # Try head ref first, then merge ref + head_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/head" + merge_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/merge" + + logger.debug(f"Trying head ref first: {head_ref}") + analysis = self.getAnalyses(reference=head_ref) + + if len(analysis) == 0: + logger.debug( + f"No analyses found for head ref, trying merge ref: {merge_ref}" + ) + analysis = self.getAnalyses(reference=merge_ref) + if len(analysis) == 0: raise GHASToolkitError("No analyses found for the PR") # For CodeQL results using Default Setup - reference = analysis[0].get("ref") + reference = analysis[0].ref if not reference: raise GHASToolkitError("No ref found in the analysis") alerts = self.getAlerts("open", ref=reference) for alert in alerts: - number = alert.get("number") + number = alert.number alert_info = self.getAlertInstances(number, ref=base) if len(alert_info) == 0: results.append(alert) @@ -447,67 +463,69 @@ def getAnalyses( https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository """ ref = reference or self.repository.reference - logger.debug(f"Getting Analyses for {ref}") + logger.debug(f"Getting Analyses for ref: {ref}") + logger.debug(f"Repository reference: {self.repository.reference}") + logger.debug(f"Repository branch: {self.repository.branch}") + logger.debug(f"Is in PR: {self.repository.isInPullRequest()}") + if ref is None: raise GHASToolkitError("Reference is required for getting analyses") counter = 0 - logger.debug( f"Fetching Analyses (retries {self.retry_count} every {self.retry_sleep}s)" ) - while counter < self.retry_count: - counter += 1 - - results = self.rest.get( - "/repos/{org}/{repo}/code-scanning/analyses", - {"tool_name": tool, "ref": ref}, - ) - if not isinstance(results, list): - raise GHASToolkitTypeError( - "Error getting analyses from Repository", - permissions=[ - '"Code scanning alerts" repository permissions (read)' - ], - docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository", + # If we're in a PR, try merge ref first + if self.repository.isInPullRequest() and "/pull/" in ref: + pr_number = self.repository.getPullRequestNumber() + refs_to_try = [ + f"refs/pull/{pr_number}/merge", # Try merge first + f"refs/pull/{pr_number}/head", # Then head + ref, # Then original ref + ] + else: + refs_to_try = [ref] + + for try_ref in refs_to_try: + counter = 0 + while counter < self.retry_count: + counter += 1 + logger.debug( + f"Attempting with ref {try_ref} (attempt {counter}/{self.retry_count})" ) - # Try default setup `head` if no results (required for default setup) - if ( - len(results) == 0 - and self.repository.isInPullRequest() - and (ref.endswith("/merge") or ref.endswith("/head")) - ): - logger.debug("No analyses found for `merge`, trying `head`") - results = self.rest.get( - "/repos/{org}/{repo}/code-scanning/analyses", - {"tool_name": tool, "ref": ref.replace("/merge", "/head")}, - ) - if not isinstance(results, list): - raise GHASToolkitTypeError( - "Error getting analyses from Repository", - permissions=[ - '"Code scanning alerts" repository permissions (read)' - ], - docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository", + try: + results = self.rest.get( + "/repos/{org}/{repo}/code-scanning/analyses", + {"tool_name": tool, "ref": try_ref}, ) - if len(results) == 0: - # If the retry count is less than 1, we don't retry - if self.retry_count > 1: + if isinstance(results, list) and len(results) > 0: + logger.debug( + f"Found {len(results)} analyses with ref {try_ref}" + ) + return [ + loadOctoItem(CodeScanningAnalysis, analysis) + for analysis in results + ] + logger.debug( - f"No analyses found, retrying ({counter}/{self.retry_count})" + f"No results found with ref {try_ref}, will retry or try next ref" ) time.sleep(self.retry_sleep) - else: - return [ - loadOctoItem(CodeScanningAnalysis, analysis) for analysis in results - ] - # If we get here, we have retried the max number of times and still no results + except Exception as e: + logger.debug(f"Error getting analyses with ref {try_ref}: {str(e)}") + if "Code scanning alerts" in str( + e + ) and "repository permissions" in str(e): + raise # Re-raise permission errors immediately + time.sleep(self.retry_sleep) + + # If we get here, we have tried all refs and retried the max number of times raise GHASToolkitError( - "Error getting analyses from Repository (retry limit reached)", + "Error getting analyses from Repository (all refs and retries exhausted)", permissions=['"Code scanning alerts" repository permissions (read)'], docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository", ) diff --git a/src/ghastoolkit/octokit/repository.py b/src/ghastoolkit/octokit/repository.py index 721a943..c6b7c8d 100644 --- a/src/ghastoolkit/octokit/repository.py +++ b/src/ghastoolkit/octokit/repository.py @@ -53,6 +53,12 @@ def __post_init__(self) -> None: if not self.isInPullRequest(): _, _, branch = self.reference.split("/", 2) self.branch = branch + else: + # For PRs, try to get the head branch + pr_info = self.getPullRequestInfo() + if pr_info and "head" in pr_info: + self.branch = pr_info["head"].get("ref") + if self.branch and not self.reference: self.reference = f"refs/heads/{self.branch}"