-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsemgrep_client.py
More file actions
292 lines (246 loc) · 14.4 KB
/
semgrep_client.py
File metadata and controls
292 lines (246 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import requests
from urllib.parse import quote
from typing import List, Dict, Any, Optional
from datetime import datetime
class SemgrepClient:
"""Client for interacting with the Semgrep API"""
def __init__(self, api_token: str, deployment_slug: str, deployment_id: str):
"""Initialize the Semgrep API client
Args:
api_token (str): Semgrep API token
deployment_slug (str): Deployment slug
deployment_id (str): Deployment ID
"""
self.base_url = "https://semgrep.dev/api/v1"
self.api_token = api_token.strip()
self.deployment_slug = deployment_slug.strip()
self.deployment_id = deployment_id.strip()
self.headers = {
"Authorization": f"Bearer {self.api_token}",
"Accept": "application/json"
}
def get_projects(self) -> List[Dict[str, Any]]:
"""Get all projects for the deployment
Returns:
List[Dict[str, Any]]: List of project dictionaries
"""
url = f"{self.base_url}/deployments/{quote(self.deployment_id)}/projects"
params = {
"page": 0,
"page_size": 100
}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
json_response = response.json()
return json_response.get("projects", [])
def _fetch_detailed_findings(self, project_name: str, issue_type: str, severity_levels: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Fetch detailed findings for a specific issue type using the deployment slug endpoint.
Args:
project_name (str): Name of the project (e.g., 'org/repo')
issue_type (str): 'sast' for code, 'sca' for supply-chain
severity_levels (Optional[List[str]]): Severities to include
Returns:
List[Dict[str, Any]]: True-positive findings for the requested type
"""
findings_url = f"{self.base_url}/deployments/{quote(self.deployment_slug)}/findings"
params: Dict[str, Any] = {
"issue_type": issue_type,
"repos": project_name,
"page_size": 3000 # Get up to 3000 findings per request
}
# For SCA findings, don't filter by status in API call - filter in response processing
# For SAST findings, add status and severity filters
if issue_type == 'sast':
params["status"] = "open"
if severity_levels:
params["severity"] = severity_levels
try:
print(f"Fetching {issue_type.upper()} findings from: {findings_url}")
print(f"With params: {params}")
response = requests.get(findings_url, headers=self.headers, params=params)
print(f"Response status: {response.status_code}")
print(f"Response text (first 500 chars): {response.text[:500]}")
response.raise_for_status()
data = response.json()
print(f"Response JSON keys: {list(data.keys()) if data else 'None'}")
findings = data.get("findings", []) if data else []
if not findings and data:
findings = data.get("results", [])
print(f"{issue_type.upper()} total findings returned: {len(findings)}")
# For SCA findings, filter by reachability and status
if issue_type == 'sca':
# Debug: Let's see what fields are available in SCA findings
if findings:
sample_finding = findings[0]
print(f"SCA finding sample keys: {list(sample_finding.keys())}")
print(f"Sample reachability: {sample_finding.get('reachability')}")
print(f"Sample status: {sample_finding.get('status')}")
print(f"Sample severity: {sample_finding.get('severity')}")
def is_actionable_sca(item: Dict[str, Any]) -> bool:
# Filter by reachability and status as specified
reachability = item.get('reachability', '')
status = item.get('status', '')
# Must have open status
if status != 'open':
return False
# Must have specific reachability values only
return reachability in [
'reachable',
'no reachability analysis',
'conditionally reachable',
'always reachable'
]
actionable_sca = [f for f in findings if is_actionable_sca(f)]
print(f"{issue_type.upper()} total findings: {len(findings)}")
print(f"{issue_type.upper()} actionable findings (reachable + open): {len(actionable_sca)}")
# Show breakdown of reachability values for debugging
reachability_counts = {}
status_counts = {}
for finding in findings:
reach = finding.get('reachability', 'unknown')
stat = finding.get('status', 'unknown')
reachability_counts[reach] = reachability_counts.get(reach, 0) + 1
status_counts[stat] = status_counts.get(stat, 0) + 1
print(f"Reachability breakdown: {reachability_counts}")
print(f"Status breakdown: {status_counts}")
return actionable_sca
# For SAST findings, filter to Semgrep Assistant true positives if available
def is_true_positive(item: Dict[str, Any]) -> bool:
assistant = item.get("assistant") or {}
autotriage = assistant.get("autotriage") or {}
verdict = autotriage.get("verdict")
return verdict == "true_positive"
true_positives = [f for f in findings if is_true_positive(f)]
print(f"{issue_type.upper()} true_positive findings: {len(true_positives)}")
# Debug: Show sample SAST finding structure
if issue_type == 'sast' and true_positives:
sample = true_positives[0]
print(f"SAST sample keys: {list(sample.keys())}")
print(f"Sample CWE names: {sample.get('cwe_names')}")
print(f"Sample OWASP names: {sample.get('owasp_names')}")
print(f"Sample vulnerability classes: {sample.get('vulnerability_classes')}")
# Check if we need to fetch individual finding details for these fields
finding_id = sample.get('id')
if finding_id:
print(f"Sample finding ID: {finding_id}")
# Try to fetch individual finding details
try:
finding_detail_url = f"{self.base_url}/deployments/{quote(self.deployment_slug)}/findings/{finding_id}"
print(f"Trying to fetch detailed finding from: {finding_detail_url}")
detail_resp = requests.get(finding_detail_url, headers=self.headers)
if detail_resp.status_code == 200:
detail_data = detail_resp.json()
print(f"Detailed finding keys: {list(detail_data.keys())}")
print(f"Detailed CWE names: {detail_data.get('cwe_names')}")
print(f"Detailed OWASP names: {detail_data.get('owasp_names')}")
print(f"Detailed vulnerability classes: {detail_data.get('vulnerability_classes')}")
else:
print(f"Failed to fetch finding details: {detail_resp.status_code}")
except Exception as e:
print(f"Error fetching finding details: {e}")
return true_positives
except Exception as e:
print(f"Error fetching {issue_type} findings for {project_name}: {str(e)}")
return []
def get_project_findings(self, project_name: str, severity_levels: Optional[List[str]] = None) -> Dict[str, Any]:
"""Get findings (SAST, SCA, secrets) and scan metrics for a project.
Args:
project_name (str): Repository string, e.g., 'org/repo'
severity_levels (Optional[List[str]]): Filter severities
Returns:
Dict[str, Any]: { 'findings': {..}, 'metrics': {...} }
"""
findings: Dict[str, List[Dict[str, Any]]] = {"sast": [], "sca": [], "secrets": []}
scan_metrics: Dict[str, Any] = {}
# Map project name -> id (for scans/search)
try:
projects = self.get_projects()
project_data = next((p for p in projects if p.get("name") == project_name), None)
except Exception:
project_data = None
if not project_data or not project_data.get("id"):
print(f"Could not resolve project id for repo '{project_name}'")
return {"findings": findings, "metrics": scan_metrics}
project_id = project_data["id"]
# 1) Fetch latest scans via deployment_id/scans/search (POST)
scans_url = f"{self.base_url}/deployments/{self.deployment_id}/scans/search"
scans_body = {
"repository_id": project_id,
"branch": "",
"cursor": "",
"limit": 1
}
try:
print(f"\nFetching scans from: {scans_url}")
print(f"With body: {scans_body}")
scans_resp = requests.post(scans_url, headers=self.headers, json=scans_body)
scans_resp.raise_for_status()
print(f"Scans response: {scans_resp.text[:500]}") # Log response
scans = scans_resp.json().get("scans", [])
print(f"Found {len(scans)} scans")
if scans:
# Get the first scan (most recent)
latest_scan = scans[0]
scan_id = latest_scan["id"]
print(f"Latest scan ID: {scan_id}")
# Fetch detailed scan metrics
scan_url = f"{self.base_url}/deployments/{self.deployment_id}/scan/{scan_id}"
print(f"Fetching scan metrics from: {scan_url}")
scan_resp = requests.get(scan_url, headers=self.headers)
scan_resp.raise_for_status()
scan_metrics = scan_resp.json()
print(f"Scan metrics response: {scan_resp.text[:500]}") # Log response
# Calculate duration from timestamps in the detailed response
if 'started_at' in scan_metrics and 'completed_at' in scan_metrics:
started_at_str = scan_metrics.get('started_at')
completed_at_str = scan_metrics.get('completed_at')
print(f"Raw timestamps - Started: {started_at_str}, Completed: {completed_at_str}")
# Handle both Z and +00:00 timezone formats
if started_at_str.endswith('Z'):
started_at_str = started_at_str.replace('Z', '+00:00')
if completed_at_str.endswith('Z'):
completed_at_str = completed_at_str.replace('Z', '+00:00')
started_at = datetime.fromisoformat(started_at_str)
completed_at = datetime.fromisoformat(completed_at_str)
duration_seconds = round((completed_at - started_at).total_seconds())
print(f"Calculated scan duration: {duration_seconds} seconds ({duration_seconds/60:.1f} minutes)")
# Also check if there's a total_time in the stats for comparison
if 'stats' in scan_metrics and 'total_time' in scan_metrics['stats']:
api_total_time = scan_metrics['stats']['total_time']
print(f"API total_time field: {api_total_time} seconds ({api_total_time/60:.1f} minutes)")
else:
# Fallback: use total_time from the scan search response if available
duration_seconds = latest_scan.get('total_time', 0)
print(f"Using total_time from scan search: {duration_seconds} seconds")
# Ensure scan_metrics has the complete stats structure from the API response
# Use the API's total_time field as it's more accurate than timestamp calculation
print(f"Scan metrics keys: {list(scan_metrics.keys())}")
if 'stats' in scan_metrics:
print(f"Stats keys: {list(scan_metrics['stats'].keys())}")
# Keep the API's total_time as it's more accurate
api_total_time = scan_metrics['stats'].get('total_time', duration_seconds)
print(f"Using API total_time: {api_total_time} seconds ({api_total_time/60:.1f} minutes)")
# Don't override - keep the API value
else:
# Create stats structure if missing, use calculated duration as fallback
scan_metrics['stats'] = {'total_time': duration_seconds}
else:
print("No scans found")
scan_metrics = {}
except Exception as e:
print(f"Error fetching scan metrics: {str(e)}")
scan_metrics = {}
# 3) Fetch detailed SAST and SCA findings using deployment_slug endpoint
findings["sast"] = self._fetch_detailed_findings(project_name, "sast", severity_levels)
findings["sca"] = self._fetch_detailed_findings(project_name, "sca", severity_levels)
# 4) Fetch secrets using deployment_id
try:
secrets_url = f"{self.base_url}/deployments/{self.deployment_id}/secrets"
secrets_params = {"repo": project_name}
secrets_resp = requests.get(secrets_url, headers=self.headers, params=secrets_params)
secrets_resp.raise_for_status()
secrets = secrets_resp.json().get("results", [])
findings["secrets"] = secrets
except Exception as e:
print(f"Error fetching secrets for {project_name}: {str(e)}")
return {"findings": findings, "metrics": scan_metrics}