Skip to content

Commit dc172c8

Browse files
Merge pull request #34 from Contrast-Security-OSS/AIML-49_open_external_issue
AIML-49 open external issue
2 parents d05ece6 + 752b80d commit dc172c8

File tree

9 files changed

+1393
-17
lines changed

9 files changed

+1393
-17
lines changed

src/closed_handler.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
# Import from src package to ensure correct module resolution
2525
from src import contrast_api
2626
from src.config import get_config # Using get_config function instead of direct import
27-
from src.utils import debug_log, extract_remediation_id_from_branch, log
27+
from src.utils import debug_log, extract_remediation_id_from_branch, extract_remediation_id_from_labels, log
2828
import src.telemetry_handler as telemetry_handler
2929

3030
def handle_closed_pr():
3131
"""Handles the logic when a pull request is closed without merging."""
32+
telemetry_handler.initialize_telemetry()
33+
3234
log("--- Handling Closed (Unmerged) Contrast AI SmartFix Pull Request ---")
3335

3436
# Get PR event details from environment variables set by GitHub Actions
@@ -63,11 +65,24 @@ def handle_closed_pr():
6365

6466
debug_log(f"Branch name: {branch_name}")
6567

66-
# Extract remediation ID from branch name
67-
remediation_id = extract_remediation_id_from_branch(branch_name)
68+
# Extract remediation ID from branch name or PR labels
69+
remediation_id = None
70+
71+
# Check if this is a branch created by external agent (e.g., GitHub Copilot)
72+
if branch_name.startswith("copilot/fix"):
73+
debug_log("Branch appears to be created by external agent. Extracting remediation ID from PR labels.")
74+
# Get labels from the PR
75+
labels = pull_request.get("labels", [])
76+
remediation_id = extract_remediation_id_from_labels(labels)
77+
else:
78+
# Use original method for branches created by SmartFix
79+
remediation_id = extract_remediation_id_from_branch(branch_name)
6880

6981
if not remediation_id:
70-
log(f"Error: Could not extract remediation ID from branch name: {branch_name}", is_error=True)
82+
if branch_name.startswith("copilot/fix"):
83+
log(f"Error: Could not extract remediation ID from PR labels for external agent branch: {branch_name}", is_error=True)
84+
else:
85+
log(f"Error: Could not extract remediation ID from branch name: {branch_name}", is_error=True)
7186
# If we can't find the remediation ID, we can't proceed
7287
sys.exit(1)
7388

src/external_coding_agent.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
#-
2+
# #%L
3+
# Contrast AI SmartFix
4+
# %%
5+
# Copyright (C) 2025 Contrast Security, Inc.
6+
# %%
7+
8+
# License: Commercial
9+
# NOTICE: This Software and the patented inventions embodied within may only be
10+
# used as part of Contrast Security's commercial offerings. Even though it is
11+
# made available through public repositories, use of this Software is subject to
12+
# the applicable End User Licensing Agreement found at
13+
# https://www.contrastsecurity.com/enduser-terms-0317a or as otherwise agreed
14+
# between Contrast Security and the End User. The Software may not be reverse
15+
# engineered, modified, repackaged, sold, redistributed or otherwise used in a
16+
# way not consistent with the End User License Agreement.
17+
# #L%
18+
#
19+
20+
import time
21+
from typing import Optional
22+
from src.utils import log, debug_log, error_exit
23+
from src.contrast_api import FailureCategory, notify_remediation_pr_opened
24+
from src.config import Config
25+
from src import git_handler
26+
from src import telemetry_handler
27+
28+
class ExternalCodingAgent:
29+
"""
30+
A class that interfaces with an external coding agent through an API or command line.
31+
This agent is used as an alternative to the built-in SmartFix coding agent.
32+
"""
33+
34+
def __init__(self, config: Config):
35+
"""
36+
Initialize the ExternalCodingAgent with configuration settings.
37+
38+
Args:
39+
config: The application configuration object
40+
"""
41+
self.config = config
42+
debug_log(f"Initialized ExternalCodingAgent")
43+
44+
def generate_fixes(self, vuln_uuid: str, remediation_id: str, vuln_title: str) -> bool:
45+
"""
46+
Generate fixes for vulnerabilities.
47+
48+
Returns:
49+
bool: False if the CODING_AGENT is SMARTFIX, True otherwise
50+
"""
51+
if hasattr(self.config, 'CODING_AGENT') and self.config.CODING_AGENT == "SMARTFIX":
52+
debug_log("SMARTFIX agent detected, ExternalCodingAgent.generate_fixes returning False")
53+
return False
54+
55+
log(f"\n::group::--- Using External Coding Agent ({self.config.CODING_AGENT}) ---")
56+
57+
# Hard-coded vulnerability label for now, will be passed as argument later
58+
vulnerability_label = f"contrast-vuln-id:VULN-{vuln_uuid}"
59+
remediation_label = f"smartfix-id:{remediation_id}"
60+
issue_title = vuln_title
61+
issue_body = "This is a fake issue body for testing purposes."
62+
63+
# Use git_handler to find if there's an existing issue with this label
64+
issue_number = git_handler.find_issue_with_label(vulnerability_label)
65+
66+
if issue_number:
67+
debug_log(f"Found existing GitHub issue #{issue_number} with label {vulnerability_label}")
68+
if not git_handler.reset_issue(issue_number, remediation_label):
69+
log(f"Failed to reset issue #{issue_number} with labels {vulnerability_label}, {remediation_label}", is_error=True)
70+
error_exit(remediation_id, FailureCategory.AGENT_FAILURE.value)
71+
else:
72+
debug_log(f"No GitHub issue found with label {vulnerability_label}")
73+
issue_number = git_handler.create_issue(issue_title, issue_body, vulnerability_label, remediation_label)
74+
if not issue_number:
75+
log(f"Failed to create issue with labels {vulnerability_label}, {remediation_label}", is_error=True)
76+
error_exit(remediation_id, FailureCategory.AGENT_FAILURE.value)
77+
78+
telemetry_handler.update_telemetry("additionalAttributes.githubIssueNumber", issue_number)
79+
80+
# Poll for PR creation by the external agent
81+
log(f"Waiting for external agent to create a PR for issue #{issue_number}")
82+
telemetry_handler.update_telemetry("additionalAttributes.codingAgent", "EXTERNAL")
83+
84+
# Poll for a PR to be created by the external agent (100 attempts, 5 seconds apart = ~8.3 minutes max)
85+
pr_info = self._poll_for_pr(issue_number, remediation_id, vulnerability_label, remediation_label, max_attempts=100, sleep_seconds=5)
86+
87+
log("\n::endgroup::")
88+
if pr_info:
89+
pr_number = pr_info.get("number")
90+
pr_url = pr_info.get("url")
91+
log(f"External agent created PR #{pr_number} at {pr_url}")
92+
telemetry_handler.update_telemetry("resultInfo.prCreated", True)
93+
telemetry_handler.update_telemetry("additionalAttributes.prStatus", "OPEN")
94+
telemetry_handler.update_telemetry("additionalAttributes.prNumber", pr_number)
95+
telemetry_handler.update_telemetry("additionalAttributes.prUrl", pr_url)
96+
return True
97+
else:
98+
log("External agent failed to create a PR within the timeout period", is_error=True)
99+
telemetry_handler.update_telemetry("resultInfo.prCreated", False)
100+
telemetry_handler.update_telemetry("resultInfo.failureReason", "PR creation timeout")
101+
telemetry_handler.update_telemetry("resultInfo.failureCategory", FailureCategory.AGENT_FAILURE.name)
102+
return False
103+
104+
def _poll_for_pr(self, issue_number: int, remediation_id: str, vulnerability_label: str, remediation_label:str, max_attempts: int = 100, sleep_seconds: int = 5) -> Optional[dict]:
105+
"""
106+
Poll for a PR to be created by the external agent.
107+
108+
Args:
109+
issue_number: The issue number to check for a PR
110+
remediation_id: The remediation ID for telemetry and API notification
111+
max_attempts: Maximum number of polling attempts (default: 100)
112+
sleep_seconds: Time to sleep between attempts (default: 5 seconds)
113+
114+
Returns:
115+
Optional[dict]: PR information if found, None if not found after max attempts
116+
"""
117+
debug_log(f"Polling for PR creation for issue #{issue_number}, max {max_attempts} attempts with {sleep_seconds}s interval")
118+
119+
for attempt in range(1, max_attempts + 1):
120+
debug_log(f"Polling attempt {attempt}/{max_attempts} for PR related to issue #{issue_number}")
121+
122+
pr_info = git_handler.find_open_pr_for_issue(issue_number)
123+
124+
if pr_info:
125+
pr_number = pr_info.get("number")
126+
pr_url = pr_info.get("url")
127+
128+
# Add vulnerability and remediation labels to the PR
129+
labels_to_add = [vulnerability_label, remediation_label]
130+
if git_handler.add_labels_to_pr(pr_number, labels_to_add):
131+
debug_log(f"Successfully added labels to PR #{pr_number}: {labels_to_add}")
132+
else:
133+
log(f"Failed to add labels to PR #{pr_number}", is_error=True)
134+
return None
135+
136+
debug_log(f"Found PR #{pr_number} for issue #{issue_number} after {attempt} attempts")
137+
138+
# Notify the Remediation backend about the PR
139+
success = notify_remediation_pr_opened(
140+
remediation_id=remediation_id,
141+
pr_number=pr_number,
142+
pr_url=pr_url,
143+
contrast_host=self.config.CONTRAST_HOST,
144+
contrast_org_id=self.config.CONTRAST_ORG_ID,
145+
contrast_app_id=self.config.CONTRAST_APP_ID,
146+
contrast_auth_key=self.config.CONTRAST_AUTHORIZATION_KEY,
147+
contrast_api_key=self.config.CONTRAST_API_KEY
148+
)
149+
150+
if success:
151+
log(f"Successfully notified remediation backend about PR #{pr_number}")
152+
else:
153+
log(f"Failed to notify remediation backend about PR #{pr_number}", is_error=True)
154+
155+
return pr_info
156+
157+
# Sleep before the next attempt, but don't sleep after the last attempt
158+
if attempt < max_attempts:
159+
time.sleep(sleep_seconds)
160+
161+
log(f"No PR found for issue #{issue_number} after {max_attempts} polling attempts", is_error=True)
162+
return None
163+
164+
# Additional methods will be implemented later
165+
166+
# %%

0 commit comments

Comments
 (0)