|
12 | 12 | """
|
13 | 13 |
|
14 | 14 | from argparse import ArgumentParser
|
15 |
| -from collections import defaultdict |
16 | 15 | from dependencies import (
|
17 | 16 | ignore_list,
|
18 | 17 | dependencies_info,
|
@@ -40,7 +39,12 @@ def __init__(self, id: str, url: str, dependency: str, version: str):
|
40 | 39 | class VulnerabilityEncoder(json.JSONEncoder):
|
41 | 40 | def default(self, obj):
|
42 | 41 | if isinstance(obj, Vulnerability):
|
43 |
| - return {"id": obj.id, "url": obj.url, "dependency": obj.dependency, "version": obj.version} |
| 42 | + return { |
| 43 | + "id": obj.id, |
| 44 | + "url": obj.url, |
| 45 | + "dependency": obj.dependency, |
| 46 | + "version": obj.version, |
| 47 | + } |
44 | 48 | # Let the base class default method raise the TypeError
|
45 | 49 | return json.JSONEncoder.default(self, obj)
|
46 | 50 |
|
@@ -118,7 +122,10 @@ def query_ghad(
|
118 | 122 | found_vulnerabilities.extend(
|
119 | 123 | [
|
120 | 124 | Vulnerability(
|
121 |
| - id=vuln["advisory"]["ghsaId"], url=vuln["advisory"]["permalink"], dependency=name, version=dep_version |
| 125 | + id=vuln["advisory"]["ghsaId"], |
| 126 | + url=vuln["advisory"]["permalink"], |
| 127 | + dependency=name, |
| 128 | + version=dep_version, |
122 | 129 | )
|
123 | 130 | for vuln in matching_vulns
|
124 | 131 | ]
|
@@ -146,14 +153,22 @@ def query_nvd(
|
146 | 153 | query_results = [
|
147 | 154 | cve
|
148 | 155 | for cve in searchCVE(
|
149 |
| - cpeMatchString=dep.get_cpe(repo_path), keyword=dep.keyword, key=api_key |
| 156 | + virtualMatchString=dep.get_cpe(repo_path), |
| 157 | + keywordSearch=dep.keyword, |
| 158 | + key=api_key, |
| 159 | + delay=6 if api_key else False, |
150 | 160 | )
|
151 | 161 | if cve.id not in ignore_list
|
152 | 162 | ]
|
153 | 163 | if query_results:
|
154 | 164 | version = dep.version_parser(repo_path)
|
155 | 165 | found_vulnerabilities.extend(
|
156 |
| - [Vulnerability(id=cve.id, url=cve.url, dependency=name, version=version) for cve in query_results] |
| 166 | + [ |
| 167 | + Vulnerability( |
| 168 | + id=cve.id, url=cve.url, dependency=name, version=version |
| 169 | + ) |
| 170 | + for cve in query_results |
| 171 | + ] |
157 | 172 | )
|
158 | 173 |
|
159 | 174 | return found_vulnerabilities
|
@@ -185,7 +200,7 @@ def main() -> int:
|
185 | 200 | )
|
186 | 201 | parser.add_argument(
|
187 | 202 | "--json-output",
|
188 |
| - action='store_true', |
| 203 | + action="store_true", |
189 | 204 | help="the NVD API key for querying the National Vulnerability Database",
|
190 | 205 | )
|
191 | 206 | repo_path: Path = parser.parse_args().node_repo_path
|
@@ -216,13 +231,15 @@ def main() -> int:
|
216 | 231 | if name in dependencies_per_branch[repo_branch]
|
217 | 232 | }
|
218 | 233 | ghad_vulnerabilities: list[Vulnerability] = (
|
219 |
| - {} if gh_token is None else query_ghad(dependencies, gh_token, repo_path) |
| 234 | + list() if gh_token is None else query_ghad(dependencies, gh_token, repo_path) |
220 | 235 | )
|
221 | 236 | nvd_vulnerabilities: list[Vulnerability] = query_nvd(
|
222 | 237 | dependencies, nvd_key, repo_path
|
223 | 238 | )
|
224 | 239 |
|
225 |
| - all_vulnerabilities = {"vulnerabilities": ghad_vulnerabilities + nvd_vulnerabilities} |
| 240 | + all_vulnerabilities = { |
| 241 | + "vulnerabilities": ghad_vulnerabilities + nvd_vulnerabilities |
| 242 | + } |
226 | 243 | no_vulnerabilities_found = not ghad_vulnerabilities and not nvd_vulnerabilities
|
227 | 244 | if json_output:
|
228 | 245 | print(json.dumps(all_vulnerabilities, cls=VulnerabilityEncoder))
|
|
0 commit comments