|
| 1 | +# |
| 2 | +# Copyright (c) nexB Inc. and others. All rights reserved. |
| 3 | +# VulnerableCode is a trademark of nexB Inc. |
| 4 | +# SPDX-License-Identifier: Apache-2.0 |
| 5 | +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. |
| 6 | +# See https://github.com/nexB/vulnerablecode for support or download. |
| 7 | +# See https://aboutcode.org for more information about nexB OSS projects. |
| 8 | +# |
| 9 | + |
| 10 | +import logging |
| 11 | +from datetime import datetime |
| 12 | +from datetime import timezone |
| 13 | +from typing import Iterable |
| 14 | +from typing import Mapping |
| 15 | + |
| 16 | +from cwe2.database import Database |
| 17 | +from packageurl import PackageURL |
| 18 | +from univers.version_range import GenericVersionRange |
| 19 | +from univers.versions import SemverVersion |
| 20 | + |
| 21 | +from vulnerabilities.importer import AdvisoryData |
| 22 | +from vulnerabilities.importer import AffectedPackage |
| 23 | +from vulnerabilities.importer import Importer |
| 24 | +from vulnerabilities.importer import Reference |
| 25 | +from vulnerabilities.importer import VulnerabilitySeverity |
| 26 | +from vulnerabilities.severity_systems import SCORING_SYSTEMS |
| 27 | +from vulnerabilities.utils import fetch_response |
| 28 | +from vulnerabilities.utils import get_cwe_id |
| 29 | +from vulnerabilities.utils import get_item |
| 30 | + |
| 31 | +logger = logging.getLogger(__name__) |
| 32 | + |
| 33 | + |
| 34 | +class CurlImporter(Importer): |
| 35 | + |
| 36 | + spdx_license_expression = "curl" |
| 37 | + license_url = "https://curl.se/docs/copyright.html" |
| 38 | + repo_url = "https://github.com/curl/curl-www/" |
| 39 | + importer_name = "Curl Importer" |
| 40 | + api_url = "https://curl.se/docs/vuln.json" |
| 41 | + |
| 42 | + def fetch(self) -> Iterable[Mapping]: |
| 43 | + response = fetch_response(self.api_url) |
| 44 | + return response.json() |
| 45 | + |
| 46 | + def advisory_data(self) -> Iterable[AdvisoryData]: |
| 47 | + raw_data = self.fetch() |
| 48 | + for data in raw_data: |
| 49 | + cve_id = data.get("aliases") or [] |
| 50 | + cve_id = cve_id[0] if len(cve_id) > 0 else None |
| 51 | + if not cve_id.startswith("CVE"): |
| 52 | + package = data.get("database_specific").get("package") |
| 53 | + logger.error(f"Invalid CVE ID: {cve_id} in package {package}") |
| 54 | + continue |
| 55 | + yield parse_advisory_data(data) |
| 56 | + |
| 57 | + |
| 58 | +def parse_advisory_data(raw_data) -> AdvisoryData: |
| 59 | + """ |
| 60 | + Parse advisory data from raw JSON data and return an AdvisoryData object. |
| 61 | +
|
| 62 | + Args: |
| 63 | + raw_data (dict): Raw JSON data containing advisory information. |
| 64 | +
|
| 65 | + Returns: |
| 66 | + AdvisoryData: Parsed advisory data as an AdvisoryData object. |
| 67 | +
|
| 68 | + Example: |
| 69 | + >>> raw_data = { |
| 70 | + ... "aliases": ["CVE-2024-2379"], |
| 71 | + ... "summary": "QUIC certificate check bypass with wolfSSL", |
| 72 | + ... "database_specific": { |
| 73 | + ... "package": "curl", |
| 74 | + ... "URL": "https://curl.se/docs/CVE-2024-2379.json", |
| 75 | + ... "www": "https://curl.se/docs/CVE-2024-2379.html", |
| 76 | + ... "issue": "https://hackerone.com/reports/2410774", |
| 77 | + ... "severity": "Low", |
| 78 | + ... "CWE": { |
| 79 | + ... "id": "CWE-297", |
| 80 | + ... "desc": "Improper Validation of Certificate with Host Mismatch" |
| 81 | + ... }, |
| 82 | + ... }, |
| 83 | + ... "published": "2024-03-27T08:00:00.00Z", |
| 84 | + ... "affected": [ |
| 85 | + ... { |
| 86 | + ... "ranges": [ |
| 87 | + ... { |
| 88 | + ... "type": "SEMVER", |
| 89 | + ... "events": [ |
| 90 | + ... {"introduced": "8.6.0"}, |
| 91 | + ... {"fixed": "8.7.0"} |
| 92 | + ... ] |
| 93 | + ... } |
| 94 | + ... ], |
| 95 | + ... "versions": ["8.6.0"] |
| 96 | + ... } |
| 97 | + ... ] |
| 98 | + ... } |
| 99 | + >>> parse_advisory_data(raw_data) |
| 100 | + AdvisoryData(aliases=['CVE-2024-2379'], summary='QUIC certificate check bypass with wolfSSL', affected_packages=[AffectedPackage(package=PackageURL(type='generic', namespace='curl.se', name='curl', version=None, qualifiers={}, subpath=None), affected_version_range=GenericVersionRange(constraints=(VersionConstraint(comparator='=', version=SemverVersion(string='8.6.0')),)), fixed_version=SemverVersion(string='8.7.0'))], references=[Reference(reference_id='', reference_type='', url='https://curl.se/docs/CVE-2024-2379.html', severities=[VulnerabilitySeverity(system=Cvssv3ScoringSystem(identifier='cvssv3.1', name='CVSSv3.1 Base Score', url='https://www.first.org/cvss/v3-1/', notes='CVSSv3.1 base score and vector'), value='Low', scoring_elements='', published_at=None)]), Reference(reference_id='', reference_type='', url='https://hackerone.com/reports/2410774', severities=[])], date_published=datetime.datetime(2024, 3, 27, 8, 0, tzinfo=datetime.timezone.utc), weaknesses=[297], url='https://curl.se/docs/CVE-2024-2379.json') |
| 101 | + """ |
| 102 | + |
| 103 | + affected = get_item(raw_data, "affected")[0] if len(get_item(raw_data, "affected")) > 0 else [] |
| 104 | + |
| 105 | + ranges = get_item(affected, "ranges")[0] if len(get_item(affected, "ranges")) > 0 else [] |
| 106 | + events = get_item(ranges, "events")[1] if len(get_item(ranges, "events")) > 1 else {} |
| 107 | + version_type = get_item(ranges, "type") if get_item(ranges, "type") else "" |
| 108 | + fixed_version = events.get("fixed") |
| 109 | + if version_type == "SEMVER" and fixed_version: |
| 110 | + fixed_version = SemverVersion(fixed_version) |
| 111 | + |
| 112 | + purl = PackageURL(type="generic", namespace="curl.se", name="curl") |
| 113 | + versions = affected.get("versions") or [] |
| 114 | + affected_version_range = GenericVersionRange.from_versions(versions) |
| 115 | + |
| 116 | + affected_package = AffectedPackage( |
| 117 | + package=purl, affected_version_range=affected_version_range, fixed_version=fixed_version |
| 118 | + ) |
| 119 | + |
| 120 | + database_specific = raw_data.get("database_specific") or {} |
| 121 | + severity = VulnerabilitySeverity( |
| 122 | + system=SCORING_SYSTEMS["cvssv3.1"], value=database_specific.get("severity", "") |
| 123 | + ) |
| 124 | + |
| 125 | + references = [] |
| 126 | + ref_www = database_specific.get("www") or "" |
| 127 | + ref_issue = database_specific.get("issue") or "" |
| 128 | + if ref_www: |
| 129 | + references.append(Reference(url=ref_www, severities=[severity])) |
| 130 | + if ref_issue: |
| 131 | + references.append(Reference(url=ref_issue)) |
| 132 | + |
| 133 | + date_published = datetime.strptime( |
| 134 | + raw_data.get("published") or "", "%Y-%m-%dT%H:%M:%S.%fZ" |
| 135 | + ).replace(tzinfo=timezone.utc) |
| 136 | + weaknesses = get_cwe_from_curl_advisory(raw_data) |
| 137 | + |
| 138 | + return AdvisoryData( |
| 139 | + aliases=raw_data.get("aliases") or [], |
| 140 | + summary=raw_data.get("summary") or "", |
| 141 | + affected_packages=[affected_package], |
| 142 | + references=references, |
| 143 | + date_published=date_published, |
| 144 | + weaknesses=weaknesses, |
| 145 | + url=raw_data.get("database_specific", {}).get("URL", ""), |
| 146 | + ) |
| 147 | + |
| 148 | + |
| 149 | +def get_cwe_from_curl_advisory(raw_data): |
| 150 | + """ |
| 151 | + Extracts CWE IDs from the given raw_data and returns a list of CWE IDs. |
| 152 | +
|
| 153 | + >>> get_cwe_from_curl_advisory({"database_specific": {"CWE": {"id": "CWE-333"}}}) |
| 154 | + [333] |
| 155 | + >>> get_cwe_from_curl_advisory({"database_specific": {"CWE": {"id": ""}}}) |
| 156 | + [] |
| 157 | + """ |
| 158 | + weaknesses = [] |
| 159 | + db = Database() |
| 160 | + cwe_string = get_item(raw_data, "database_specific", "CWE", "id") or "" |
| 161 | + |
| 162 | + if cwe_string: |
| 163 | + cwe_id = get_cwe_id(cwe_string) |
| 164 | + try: |
| 165 | + db.get(cwe_id) |
| 166 | + weaknesses.append(cwe_id) |
| 167 | + except Exception: |
| 168 | + logger.error("Invalid CWE id") |
| 169 | + return weaknesses |
0 commit comments