Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 66 additions & 48 deletions src/ghastoolkit/octokit/codescanning.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,20 +371,36 @@ def getAlertsInPR(self, base: str) -> list[CodeAlert]:
if not self.repository.reference or not self.repository.isInPullRequest():
raise GHASToolkitError("Repository is not in a Pull Request")

# Try merge and then head
analysis = self.getAnalyses(reference=self.repository.reference)
# Get PR info to determine the correct reference
pr_info = self.repository.getPullRequestInfo()
if not pr_info:
raise GHASToolkitError("Could not get PR information")

# Try head ref first, then merge ref
head_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/head"
merge_ref = f"refs/pull/{self.repository.getPullRequestNumber()}/merge"

logger.debug(f"Trying head ref first: {head_ref}")
analysis = self.getAnalyses(reference=head_ref)

if len(analysis) == 0:
logger.debug(
f"No analyses found for head ref, trying merge ref: {merge_ref}"
)
analysis = self.getAnalyses(reference=merge_ref)

if len(analysis) == 0:
raise GHASToolkitError("No analyses found for the PR")

# For CodeQL results using Default Setup
reference = analysis[0].get("ref")
reference = analysis[0].ref
if not reference:
raise GHASToolkitError("No ref found in the analysis")

alerts = self.getAlerts("open", ref=reference)

for alert in alerts:
number = alert.get("number")
number = alert.number
alert_info = self.getAlertInstances(number, ref=base)
if len(alert_info) == 0:
results.append(alert)
Expand Down Expand Up @@ -447,67 +463,69 @@ def getAnalyses(
https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository
"""
ref = reference or self.repository.reference
logger.debug(f"Getting Analyses for {ref}")
logger.debug(f"Getting Analyses for ref: {ref}")
logger.debug(f"Repository reference: {self.repository.reference}")
logger.debug(f"Repository branch: {self.repository.branch}")
logger.debug(f"Is in PR: {self.repository.isInPullRequest()}")

if ref is None:
raise GHASToolkitError("Reference is required for getting analyses")

counter = 0

logger.debug(
f"Fetching Analyses (retries {self.retry_count} every {self.retry_sleep}s)"
)

while counter < self.retry_count:
counter += 1

results = self.rest.get(
"/repos/{org}/{repo}/code-scanning/analyses",
{"tool_name": tool, "ref": ref},
)
if not isinstance(results, list):
raise GHASToolkitTypeError(
"Error getting analyses from Repository",
permissions=[
'"Code scanning alerts" repository permissions (read)'
],
docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository",
# If we're in a PR, try merge ref first
if self.repository.isInPullRequest() and "/pull/" in ref:
pr_number = self.repository.getPullRequestNumber()
refs_to_try = [
f"refs/pull/{pr_number}/merge", # Try merge first
f"refs/pull/{pr_number}/head", # Then head
ref, # Then original ref
]
else:
refs_to_try = [ref]

for try_ref in refs_to_try:
counter = 0
while counter < self.retry_count:
counter += 1
logger.debug(
f"Attempting with ref {try_ref} (attempt {counter}/{self.retry_count})"
)

# Try default setup `head` if no results (required for default setup)
if (
len(results) == 0
and self.repository.isInPullRequest()
and (ref.endswith("/merge") or ref.endswith("/head"))
):
logger.debug("No analyses found for `merge`, trying `head`")
results = self.rest.get(
"/repos/{org}/{repo}/code-scanning/analyses",
{"tool_name": tool, "ref": ref.replace("/merge", "/head")},
)
if not isinstance(results, list):
raise GHASToolkitTypeError(
"Error getting analyses from Repository",
permissions=[
'"Code scanning alerts" repository permissions (read)'
],
docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository",
try:
results = self.rest.get(
"/repos/{org}/{repo}/code-scanning/analyses",
{"tool_name": tool, "ref": try_ref},
)

if len(results) == 0:
# If the retry count is less than 1, we don't retry
if self.retry_count > 1:
if isinstance(results, list) and len(results) > 0:
logger.debug(
f"Found {len(results)} analyses with ref {try_ref}"
)
return [
loadOctoItem(CodeScanningAnalysis, analysis)
for analysis in results
]

logger.debug(
f"No analyses found, retrying ({counter}/{self.retry_count})"
f"No results found with ref {try_ref}, will retry or try next ref"
)
time.sleep(self.retry_sleep)
else:
return [
loadOctoItem(CodeScanningAnalysis, analysis) for analysis in results
]

# If we get here, we have retried the max number of times and still no results
except Exception as e:
logger.debug(f"Error getting analyses with ref {try_ref}: {str(e)}")
if "Code scanning alerts" in str(
e
) and "repository permissions" in str(e):
raise # Re-raise permission errors immediately
time.sleep(self.retry_sleep)

# If we get here, we have tried all refs and retried the max number of times
raise GHASToolkitError(
"Error getting analyses from Repository (retry limit reached)",
"Error getting analyses from Repository (all refs and retries exhausted)",
permissions=['"Code scanning alerts" repository permissions (read)'],
docs="https://docs.github.com/en/enterprise-cloud@latest/rest/code-scanning#list-code-scanning-analyses-for-a-repository",
)
Expand Down
6 changes: 6 additions & 0 deletions src/ghastoolkit/octokit/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def __post_init__(self) -> None:
if not self.isInPullRequest():
_, _, branch = self.reference.split("/", 2)
self.branch = branch
else:
# For PRs, try to get the head branch
pr_info = self.getPullRequestInfo()
if pr_info and "head" in pr_info:
self.branch = pr_info["head"].get("ref")

if self.branch and not self.reference:
self.reference = f"refs/heads/{self.branch}"

Expand Down
Loading