|
11 | 11 | from typing import Iterable |
12 | 12 | from typing import Optional |
13 | 13 |
|
| 14 | +from cwe2.database import Database |
14 | 15 | from dateutil import parser as dateparser |
15 | 16 | from packageurl import PackageURL |
16 | 17 | from univers.version_range import RANGE_CLASS_BY_SCHEMES |
|
24 | 25 | from vulnerabilities.importer import Reference |
25 | 26 | from vulnerabilities.importer import VulnerabilitySeverity |
26 | 27 | from vulnerabilities.utils import dedupe |
| 28 | +from vulnerabilities.utils import get_cwe_id |
27 | 29 | from vulnerabilities.utils import get_item |
28 | 30 |
|
29 | 31 | logger = logging.getLogger(__name__) |
30 | 32 |
|
31 | | - |
32 | 33 | PACKAGE_TYPE_BY_GITHUB_ECOSYSTEM = { |
33 | 34 | "MAVEN": "maven", |
34 | 35 | "NUGET": "nuget", |
|
63 | 64 | url |
64 | 65 | } |
65 | 66 | severity |
| 67 | + cwes(first: 10){ |
| 68 | + nodes { |
| 69 | + cweId |
| 70 | + } |
| 71 | + } |
66 | 72 | publishedAt |
67 | 73 | } |
68 | 74 | firstPatchedVersion{ |
@@ -227,10 +233,34 @@ def process_response(resp: dict, package_type: str) -> Iterable[AdvisoryData]: |
227 | 233 | else: |
228 | 234 | logger.error(f"Unknown identifier type {identifier_type!r} and value {value!r}") |
229 | 235 |
|
| 236 | + weaknesses = get_cwes_from_github_advisory(advisory) |
| 237 | + |
230 | 238 | yield AdvisoryData( |
231 | 239 | aliases=sorted(dedupe(aliases)), |
232 | 240 | summary=summary, |
233 | 241 | references=references, |
234 | 242 | affected_packages=affected_packages, |
235 | 243 | date_published=date_published, |
| 244 | + weaknesses=weaknesses, |
236 | 245 | ) |
| 246 | + |
| 247 | + |
| 248 | +def get_cwes_from_github_advisory(advisory) -> [int]: |
| 249 | + """ |
| 250 | + Return the cwe-id list from advisory ex: [ 522 ] |
| 251 | + by extracting the cwe_list from advisory ex: [{'cweId': 'CWE-522'}] |
| 252 | + then remove the CWE- from string and convert it to integer 522 and Check if the CWE in CWE-Database |
| 253 | + """ |
| 254 | + weaknesses = [] |
| 255 | + db = Database() |
| 256 | + cwe_list = get_item(advisory, "cwes", "nodes") or [] |
| 257 | + for cwe_item in cwe_list: |
| 258 | + cwe_string = get_item(cwe_item, "cweId") |
| 259 | + if cwe_string: |
| 260 | + cwe_id = get_cwe_id(cwe_string) |
| 261 | + try: |
| 262 | + db.get(cwe_id) |
| 263 | + weaknesses.append(cwe_id) |
| 264 | + except Exception: |
| 265 | + logger.error("Invalid CWE id") |
| 266 | + return weaknesses |
0 commit comments