|
10 | 10 | import logging |
11 | 11 | from typing import Iterable |
12 | 12 | from urllib.parse import quote |
| 13 | +from urllib.parse import unquote_plus |
13 | 14 |
|
14 | 15 | import requests |
15 | 16 | from bs4 import BeautifulSoup |
16 | 17 | from packageurl import PackageURL |
17 | 18 |
|
18 | 19 | from vulntotal.validator import DataSource |
| 20 | +from vulntotal.validator import InvalidCVEError |
19 | 21 | from vulntotal.validator import VendorData |
20 | 22 | from vulntotal.vulntotal_utils import snyk_constraints_satisfied |
21 | 23 |
|
@@ -70,6 +72,38 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]: |
70 | 72 | if advisory_html: |
71 | 73 | yield parse_html_advisory(advisory_html, snyk_id, affected, purl) |
72 | 74 |
|
| 75 | + def datasource_advisory_from_cve(self, cve: str) -> Iterable[VendorData]: |
| 76 | + """ |
| 77 | + Fetch advisories from Snyk for a given CVE. |
| 78 | +
|
| 79 | + Parameters: |
| 80 | + cve : CVE ID |
| 81 | +
|
| 82 | + Yields: |
| 83 | + VendorData instance containing advisory information. |
| 84 | + """ |
| 85 | + if not cve.upper().startswith("CVE-"): |
| 86 | + raise InvalidCVEError |
| 87 | + |
| 88 | + package_list = generate_payload_from_cve(cve) |
| 89 | + response = self.fetch(package_list) |
| 90 | + self._raw_dump = [response] |
| 91 | + |
| 92 | + # get list of vulnerabilities for cve id |
| 93 | + vulns_list = parse_cve_advisory_html(response) |
| 94 | + |
| 95 | + # for each vulnerability get fixed version from snyk_id_url, get affected version from package_advisory_url |
| 96 | + for snyk_id, package_advisory_url in vulns_list.items(): |
| 97 | + package_advisories_list = self.fetch(package_advisory_url) |
| 98 | + package_advisories = extract_html_json_advisories(package_advisories_list) |
| 99 | + affected_versions = package_advisories[snyk_id] |
| 100 | + advisory_payload = generate_advisory_payload(snyk_id) |
| 101 | + advisory_html = self.fetch(advisory_payload) |
| 102 | + self._raw_dump.append(advisory_html) |
| 103 | + purl = generate_purl(package_advisory_url) |
| 104 | + if advisory_html and purl: |
| 105 | + yield parse_html_advisory(advisory_html, snyk_id, affected_versions, purl) |
| 106 | + |
73 | 107 | @classmethod |
74 | 108 | def supported_ecosystem(cls): |
75 | 109 | return { |
@@ -132,6 +166,61 @@ def generate_package_advisory_url(purl): |
132 | 166 | ) |
133 | 167 |
|
134 | 168 |
|
| 169 | +def generate_purl(package_advisory_url): |
| 170 | + """ |
| 171 | + Generates purl from Package advisory url. |
| 172 | +
|
| 173 | + Parameters: |
| 174 | + package_advisory_url: URL of the package on Snyk. |
| 175 | +
|
| 176 | + Returns: |
| 177 | + A PackageURL instance representing the package |
| 178 | + """ |
| 179 | + package_advisory_url = unquote_plus( |
| 180 | + package_advisory_url.replace("https://security.snyk.io/package/", "") |
| 181 | + ) |
| 182 | + supported_ecosystems = {v: k for (k, v) in SnykDataSource.supported_ecosystem().items()} |
| 183 | + |
| 184 | + package_url_split = package_advisory_url.split("/") |
| 185 | + pkg_type = package_url_split[0] |
| 186 | + |
| 187 | + pkg_name = None |
| 188 | + namespace = None |
| 189 | + qualifiers = {} |
| 190 | + |
| 191 | + if pkg_type == "maven": |
| 192 | + pkg_name = package_url_split[1].split(":")[1] |
| 193 | + namespace = package_url_split[1].split(":")[0] |
| 194 | + |
| 195 | + elif pkg_type == "composer": |
| 196 | + pkg_name = package_url_split[-1] |
| 197 | + namespace = package_url_split[-2] |
| 198 | + |
| 199 | + elif pkg_type == "golang": |
| 200 | + pkg_name = package_url_split[-1] |
| 201 | + namespace = "/".join(package_url_split[1:-1]) |
| 202 | + |
| 203 | + elif pkg_type == "npm": |
| 204 | + # handle scoped npm packages |
| 205 | + if "@" in package_advisory_url: |
| 206 | + namespace = package_url_split[-2] |
| 207 | + |
| 208 | + pkg_name = package_url_split[-1] |
| 209 | + |
| 210 | + elif pkg_type == "linux": |
| 211 | + pkg_name = package_url_split[-1] |
| 212 | + qualifiers["distro"] = package_url_split[1] |
| 213 | + |
| 214 | + elif pkg_type in ("cocoapods", "hex", "nuget", "pip", "rubygems", "unmanaged"): |
| 215 | + pkg_name = package_url_split[-1] |
| 216 | + |
| 217 | + if pkg_type is None or pkg_name is None: |
| 218 | + logger.error("Invalid package advisory url, package type or name is missing") |
| 219 | + return |
| 220 | + |
| 221 | + return PackageURL(type=supported_ecosystems[pkg_type], name=pkg_name, namespace=namespace) |
| 222 | + |
| 223 | + |
135 | 224 | def extract_html_json_advisories(package_advisories): |
136 | 225 | """ |
137 | 226 | Extract vulnerability information from HTML or JSON advisories. |
@@ -204,9 +293,41 @@ def parse_html_advisory(advisory_html, snyk_id, affected, purl) -> VendorData: |
204 | 293 | ) |
205 | 294 |
|
206 | 295 |
|
| 296 | +def parse_cve_advisory_html(cve_advisory_html): |
| 297 | + """ |
| 298 | + Parse CVE HTML advisory from Snyk and extract list of vulnerabilities and corresponding packages for that CVE. |
| 299 | +
|
| 300 | + Parameters: |
| 301 | + advisory_html: A string of HTML containing the vulnerabilities for given CVE. |
| 302 | +
|
| 303 | + Returns: |
| 304 | + A dictionary with each item representing a vulnerability. Key of each item is the SNYK_ID and value is the package advisory url on snyk website |
| 305 | + """ |
| 306 | + cve_advisory_soup = BeautifulSoup(cve_advisory_html, "html.parser") |
| 307 | + vulns_table = cve_advisory_soup.find("tbody", class_="vue--table__tbody") |
| 308 | + if not vulns_table: |
| 309 | + return None |
| 310 | + vulns_rows = vulns_table.find_all("tr", class_="vue--table__row") |
| 311 | + vulns_list = {} |
| 312 | + |
| 313 | + for row in vulns_rows: |
| 314 | + anchors = row.find_all("a", {"class": "vue--anchor"}) |
| 315 | + if len(anchors) != 2: |
| 316 | + continue |
| 317 | + snyk_id = anchors[0]["href"].split("/")[1] |
| 318 | + package_advisory_url = f"https://security.snyk.io{anchors[1]['href']}" |
| 319 | + vulns_list[snyk_id] = package_advisory_url |
| 320 | + |
| 321 | + return vulns_list |
| 322 | + |
| 323 | + |
207 | 324 | def is_purl_in_affected(version, affected): |
208 | 325 | return any(snyk_constraints_satisfied(affected_range, version) for affected_range in affected) |
209 | 326 |
|
210 | 327 |
|
211 | 328 | def generate_advisory_payload(snyk_id): |
212 | 329 | return f"https://security.snyk.io/vuln/{snyk_id}" |
| 330 | + |
| 331 | + |
| 332 | +def generate_payload_from_cve(cve_id): |
| 333 | + return f"https://security.snyk.io/vuln?search={cve_id}" |
0 commit comments