From ed45728d7e965baa7bd851eaa5e8535fd952555f Mon Sep 17 00:00:00 2001 From: Flipmonster Date: Thu, 6 Mar 2025 19:09:33 +0100 Subject: [PATCH 1/4] fix: improve PR reference handling for workflow_dispatch events - Add better PR reference handling by trying head ref first, then merge ref - Add debug logging for reference attempts - Fix property access for CodeAlert and CodeScanningAnalysis classes - Improve error messages and validation --- src/ghastoolkit/octokit/codescanning.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/ghastoolkit/octokit/codescanning.py b/src/ghastoolkit/octokit/codescanning.py index 991c1e3..feacc70 100644 --- a/src/ghastoolkit/octokit/codescanning.py +++ b/src/ghastoolkit/octokit/codescanning.py @@ -371,20 +371,34 @@ def getAlertsInPR(self, base: str) -> list[CodeAlert]: if not self.repository.reference or not self.repository.isInPullRequest(): raise GHASToolkitError("Repository is not in a Pull Request") - # Try merge and then head - analysis = self.getAnalyses(reference=self.repository.reference) + # Get PR info to determine the correct reference + pr_info = self.repository.getPullRequestInfo() + if not pr_info: + raise GHASToolkitError("Could not get PR information") + + # Try head ref first, then merge ref + head_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/head" + merge_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/merge" + + logger.debug(f"Trying head ref first: {head_ref}") + analysis = self.getAnalyses(reference=head_ref) + + if len(analysis) == 0: + logger.debug(f"No analyses found for head ref, trying merge ref: {merge_ref}") + analysis = self.getAnalyses(reference=merge_ref) + if len(analysis) == 0: raise GHASToolkitError("No analyses found for the PR") # For CodeQL results using Default Setup - reference = analysis[0].get("ref") + reference = analysis[0].ref if not reference: raise GHASToolkitError("No ref found in the analysis") alerts = self.getAlerts("open", ref=reference) for alert in alerts: - number = alert.get("number") + number = alert.number alert_info = self.getAlertInstances(number, ref=base) if len(alert_info) == 0: results.append(alert) From be0cd1c7bcee09b6612c0e30357dec8bf6bf0d1f Mon Sep 17 00:00:00 2001 From: Flipmonster Date: Thu, 6 Mar 2025 19:20:05 +0100 Subject: [PATCH 2/4] add repo changes --- src/ghastoolkit/octokit/codescanning.py | 16 +++++++++++++--- src/ghastoolkit/octokit/repository.py | 8 +++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/ghastoolkit/octokit/codescanning.py b/src/ghastoolkit/octokit/codescanning.py index feacc70..33658f1 100644 --- a/src/ghastoolkit/octokit/codescanning.py +++ b/src/ghastoolkit/octokit/codescanning.py @@ -461,7 +461,11 @@ def getAnalyses( https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository """ ref = reference or self.repository.reference - logger.debug(f"Getting Analyses for {ref}") + logger.debug(f"Getting Analyses for ref: {ref}") + logger.debug(f"Repository reference: {self.repository.reference}") + logger.debug(f"Repository branch: {self.repository.branch}") + logger.debug(f"Is in PR: {self.repository.isInPullRequest()}") + if ref is None: raise GHASToolkitError("Reference is required for getting analyses") @@ -478,6 +482,8 @@ def getAnalyses( "/repos/{org}/{repo}/code-scanning/analyses", {"tool_name": tool, "ref": ref}, ) + logger.debug(f"Initial API response for ref {ref}: {len(results) if isinstance(results, list) else 'error'} results") + if not isinstance(results, list): raise GHASToolkitTypeError( "Error getting analyses from Repository", @@ -494,10 +500,14 @@ def getAnalyses( and (ref.endswith("/merge") or ref.endswith("/head")) ): logger.debug("No analyses found for `merge`, trying `head`") + head_ref = ref.replace("/merge", "/head") + logger.debug(f"Trying head ref: {head_ref}") results = self.rest.get( "/repos/{org}/{repo}/code-scanning/analyses", - {"tool_name": tool, "ref": ref.replace("/merge", "/head")}, + {"tool_name": tool, "ref": head_ref}, ) + logger.debug(f"Head ref API response: {len(results) if isinstance(results, list) else 'error'} results") + if not isinstance(results, list): raise GHASToolkitTypeError( "Error getting analyses from Repository", @@ -695,4 +705,4 @@ def downloadExtractorPack(self, repository_name: str, output: str) -> Optional[s version = latest_release.get("tag_name", "0.0.0") logger.debug(f"Latest Releases :: {version}") - # for asset in latest_release.get("assets", []): + # for asset in latest_release.get("assets", []): \ No newline at end of file diff --git a/src/ghastoolkit/octokit/repository.py b/src/ghastoolkit/octokit/repository.py index 721a943..95e6300 100644 --- a/src/ghastoolkit/octokit/repository.py +++ b/src/ghastoolkit/octokit/repository.py @@ -53,6 +53,12 @@ def __post_init__(self) -> None: if not self.isInPullRequest(): _, _, branch = self.reference.split("/", 2) self.branch = branch + else: + # For PRs, try to get the head branch + pr_info = self.getPullRequestInfo() + if pr_info and "head" in pr_info: + self.branch = pr_info["head"].get("ref") + if self.branch and not self.reference: self.reference = f"refs/heads/{self.branch}" @@ -270,4 +276,4 @@ def parseRepository(name: str) -> "Repository": owner, repo, path = name.split("/", 2) else: owner, repo = name.split("/", 1) - return Repository(owner, repo, reference=ref, branch=branch, path=path) + return Repository(owner, repo, reference=ref, branch=branch, path=path) \ No newline at end of file From 91a1067aff06f7ae12ffbac416fb66b64c444396 Mon Sep 17 00:00:00 2001 From: Philip von Wielligh Date: Fri, 7 Mar 2025 17:26:37 +0100 Subject: [PATCH 3/4] Improve code scanning analysis ref handling --- src/ghastoolkit/octokit/codescanning.py | 95 ++++++++++--------------- 1 file changed, 37 insertions(+), 58 deletions(-) diff --git a/src/ghastoolkit/octokit/codescanning.py b/src/ghastoolkit/octokit/codescanning.py index 33658f1..9e21dbe 100644 --- a/src/ghastoolkit/octokit/codescanning.py +++ b/src/ghastoolkit/octokit/codescanning.py @@ -470,68 +470,47 @@ def getAnalyses( raise GHASToolkitError("Reference is required for getting analyses") counter = 0 - - logger.debug( - f"Fetching Analyses (retries {self.retry_count} every {self.retry_sleep}s)" - ) - - while counter < self.retry_count: - counter += 1 - - results = self.rest.get( - "/repos/{org}/{repo}/code-scanning/analyses", - {"tool_name": tool, "ref": ref}, - ) - logger.debug(f"Initial API response for ref {ref}: {len(results) if isinstance(results, list) else 'error'} results") - - if not isinstance(results, list): - raise GHASToolkitTypeError( - "Error getting analyses from Repository", - permissions=[ - '"Code scanning alerts" repository permissions (read)' - ], - docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository", - ) - - # Try default setup `head` if no results (required for default setup) - if ( - len(results) == 0 - and self.repository.isInPullRequest() - and (ref.endswith("/merge") or ref.endswith("/head")) - ): - logger.debug("No analyses found for `merge`, trying `head`") - head_ref = ref.replace("/merge", "/head") - logger.debug(f"Trying head ref: {head_ref}") - results = self.rest.get( - "/repos/{org}/{repo}/code-scanning/analyses", - {"tool_name": tool, "ref": head_ref}, - ) - logger.debug(f"Head ref API response: {len(results) if isinstance(results, list) else 'error'} results") - - if not isinstance(results, list): - raise GHASToolkitTypeError( - "Error getting analyses from Repository", - permissions=[ - '"Code scanning alerts" repository permissions (read)' - ], - docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository", - ) - - if len(results) == 0: - # If the retry count is less than 1, we don't retry - if self.retry_count > 1: - logger.debug( - f"No analyses found, retrying ({counter}/{self.retry_count})" + logger.debug(f"Fetching Analyses (retries {self.retry_count} every {self.retry_sleep}s)") + + # If we're in a PR, try merge ref first + if self.repository.isInPullRequest() and "/pull/" in ref: + pr_number = self.repository.getPullRequestNumber() + refs_to_try = [ + f"refs/pull/{pr_number}/merge", # Try merge first + f"refs/pull/{pr_number}/head", # Then head + ref # Then original ref + ] + else: + refs_to_try = [ref] + + for try_ref in refs_to_try: + counter = 0 + while counter < self.retry_count: + counter += 1 + logger.debug(f"Attempting with ref {try_ref} (attempt {counter}/{self.retry_count})") + + try: + results = self.rest.get( + "/repos/{org}/{repo}/code-scanning/analyses", + {"tool_name": tool, "ref": try_ref}, ) + + if isinstance(results, list) and len(results) > 0: + logger.debug(f"Found {len(results)} analyses with ref {try_ref}") + return [loadOctoItem(CodeScanningAnalysis, analysis) for analysis in results] + + logger.debug(f"No results found with ref {try_ref}, will retry or try next ref") + time.sleep(self.retry_sleep) + + except Exception as e: + logger.debug(f"Error getting analyses with ref {try_ref}: {str(e)}") + if "Code scanning alerts" in str(e) and "repository permissions" in str(e): + raise # Re-raise permission errors immediately time.sleep(self.retry_sleep) - else: - return [ - loadOctoItem(CodeScanningAnalysis, analysis) for analysis in results - ] - # If we get here, we have retried the max number of times and still no results + # If we get here, we have tried all refs and retried the max number of times raise GHASToolkitError( - "Error getting analyses from Repository (retry limit reached)", + "Error getting analyses from Repository (all refs and retries exhausted)", permissions=['"Code scanning alerts" repository permissions (read)'], docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository", ) From f7a3ad9af4dd8af3e6a4f0c1a85d7f0f1f97dd12 Mon Sep 17 00:00:00 2001 From: GeekMasher Date: Wed, 19 Mar 2025 13:00:05 +0000 Subject: [PATCH 4/4] style: Update style using black --- src/ghastoolkit/octokit/codescanning.py | 49 ++++++++++++++++--------- src/ghastoolkit/octokit/repository.py | 4 +- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/ghastoolkit/octokit/codescanning.py b/src/ghastoolkit/octokit/codescanning.py index 9e21dbe..a8593a3 100644 --- a/src/ghastoolkit/octokit/codescanning.py +++ b/src/ghastoolkit/octokit/codescanning.py @@ -375,16 +375,18 @@ def getAlertsInPR(self, base: str) -> list[CodeAlert]: pr_info = self.repository.getPullRequestInfo() if not pr_info: raise GHASToolkitError("Could not get PR information") - + # Try head ref first, then merge ref head_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/head" merge_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/merge" - + logger.debug(f"Trying head ref first: {head_ref}") analysis = self.getAnalyses(reference=head_ref) - + if len(analysis) == 0: - logger.debug(f"No analyses found for head ref, trying merge ref: {merge_ref}") + logger.debug( + f"No analyses found for head ref, trying merge ref: {merge_ref}" + ) analysis = self.getAnalyses(reference=merge_ref) if len(analysis) == 0: @@ -465,20 +467,22 @@ def getAnalyses( logger.debug(f"Repository reference: {self.repository.reference}") logger.debug(f"Repository branch: {self.repository.branch}") logger.debug(f"Is in PR: {self.repository.isInPullRequest()}") - + if ref is None: raise GHASToolkitError("Reference is required for getting analyses") counter = 0 - logger.debug(f"Fetching Analyses (retries {self.retry_count} every {self.retry_sleep}s)") + logger.debug( + f"Fetching Analyses (retries {self.retry_count} every {self.retry_sleep}s)" + ) # If we're in a PR, try merge ref first if self.repository.isInPullRequest() and "/pull/" in ref: pr_number = self.repository.getPullRequestNumber() refs_to_try = [ f"refs/pull/{pr_number}/merge", # Try merge first - f"refs/pull/{pr_number}/head", # Then head - ref # Then original ref + f"refs/pull/{pr_number}/head", # Then head + ref, # Then original ref ] else: refs_to_try = [ref] @@ -487,24 +491,35 @@ def getAnalyses( counter = 0 while counter < self.retry_count: counter += 1 - logger.debug(f"Attempting with ref {try_ref} (attempt {counter}/{self.retry_count})") + logger.debug( + f"Attempting with ref {try_ref} (attempt {counter}/{self.retry_count})" + ) try: results = self.rest.get( "/repos/{org}/{repo}/code-scanning/analyses", {"tool_name": tool, "ref": try_ref}, ) - + if isinstance(results, list) and len(results) > 0: - logger.debug(f"Found {len(results)} analyses with ref {try_ref}") - return [loadOctoItem(CodeScanningAnalysis, analysis) for analysis in results] - - logger.debug(f"No results found with ref {try_ref}, will retry or try next ref") + logger.debug( + f"Found {len(results)} analyses with ref {try_ref}" + ) + return [ + loadOctoItem(CodeScanningAnalysis, analysis) + for analysis in results + ] + + logger.debug( + f"No results found with ref {try_ref}, will retry or try next ref" + ) time.sleep(self.retry_sleep) - + except Exception as e: logger.debug(f"Error getting analyses with ref {try_ref}: {str(e)}") - if "Code scanning alerts" in str(e) and "repository permissions" in str(e): + if "Code scanning alerts" in str( + e + ) and "repository permissions" in str(e): raise # Re-raise permission errors immediately time.sleep(self.retry_sleep) @@ -684,4 +699,4 @@ def downloadExtractorPack(self, repository_name: str, output: str) -> Optional[s version = latest_release.get("tag_name", "0.0.0") logger.debug(f"Latest Releases :: {version}") - # for asset in latest_release.get("assets", []): \ No newline at end of file + # for asset in latest_release.get("assets", []): diff --git a/src/ghastoolkit/octokit/repository.py b/src/ghastoolkit/octokit/repository.py index 95e6300..c6b7c8d 100644 --- a/src/ghastoolkit/octokit/repository.py +++ b/src/ghastoolkit/octokit/repository.py @@ -58,7 +58,7 @@ def __post_init__(self) -> None: pr_info = self.getPullRequestInfo() if pr_info and "head" in pr_info: self.branch = pr_info["head"].get("ref") - + if self.branch and not self.reference: self.reference = f"refs/heads/{self.branch}" @@ -276,4 +276,4 @@ def parseRepository(name: str) -> "Repository": owner, repo, path = name.split("/", 2) else: owner, repo = name.split("/", 1) - return Repository(owner, repo, reference=ref, branch=branch, path=path) \ No newline at end of file + return Repository(owner, repo, reference=ref, branch=branch, path=path)