|
6 | 6 | # See https://github.com/nexB/vulnerablecode for support or download. |
7 | 7 | # See https://aboutcode.org for more information about nexB OSS projects. |
8 | 8 | # |
9 | | -import asyncio |
| 9 | +from pathlib import Path |
10 | 10 | from typing import Set |
11 | 11 |
|
12 | 12 | from packageurl import PackageURL |
13 | | -from univers.version_range import VersionRange |
14 | | -from univers.versions import SemverVersion |
| 13 | +from univers.version_constraint import VersionConstraint |
| 14 | +from univers.version_range import HexVersionRange |
15 | 15 |
|
16 | 16 | from vulnerabilities.importer import AdvisoryData |
17 | | -from vulnerabilities.importer import GitImporter |
| 17 | +from vulnerabilities.importer import AffectedPackage |
| 18 | +from vulnerabilities.importer import Importer |
18 | 19 | from vulnerabilities.importer import Reference |
19 | | -from vulnerabilities.package_managers import HexVersionAPI |
| 20 | +from vulnerabilities.utils import is_cve |
20 | 21 | from vulnerabilities.utils import load_yaml |
21 | | -from vulnerabilities.utils import nearest_patched_package |
22 | 22 |
|
23 | 23 |
|
24 | | -class ElixirSecurityImporter(GitImporter): |
25 | | - def __enter__(self): |
26 | | - super(ElixirSecurityImporter, self).__enter__() |
| 24 | +class ElixirSecurityImporter(Importer): |
27 | 25 |
|
28 | | - if not getattr(self, "_added_files", None): |
29 | | - self._added_files, self._updated_files = self.file_changes( |
30 | | - recursive=True, file_ext="yml", subdir="./packages" |
31 | | - ) |
32 | | - self.pkg_manager_api = HexVersionAPI() |
33 | | - self.set_api(self.collect_packages()) |
34 | | - |
35 | | - def set_api(self, packages): |
36 | | - asyncio.run(self.pkg_manager_api.load_api(packages)) |
37 | | - |
38 | | - def updated_advisories(self) -> Set[AdvisoryData]: |
39 | | - files = self._updated_files.union(self._added_files) |
40 | | - advisories = [] |
41 | | - for f in files: |
42 | | - processed_data = self.process_file(f) |
43 | | - if processed_data: |
44 | | - advisories.append(processed_data) |
45 | | - return self.batch_advisories(advisories) |
46 | | - |
47 | | - def collect_packages(self): |
48 | | - packages = set() |
49 | | - files = self._updated_files.union(self._added_files) |
50 | | - for f in files: |
51 | | - data = load_yaml(f) |
52 | | - if data.get("package"): |
53 | | - packages.add(data["package"]) |
54 | | - |
55 | | - return packages |
56 | | - |
57 | | - def get_versions_for_pkg_from_range_list(self, version_range_list, pkg_name): |
58 | | - # Takes a list of version ranges(pathced and unaffected) of a package |
59 | | - # as parameter and returns a tuple of safe package versions and |
60 | | - # vulnerable package versions |
61 | | - |
62 | | - safe_pkg_versions = [] |
63 | | - vuln_pkg_versions = [] |
64 | | - all_version_list = self.pkg_manager_api.get(pkg_name).valid_versions |
65 | | - if not version_range_list: |
66 | | - return [], all_version_list |
67 | | - version_ranges = [ |
68 | | - VersionRange.from_scheme_version_spec_string("semver", r) for r in version_range_list |
69 | | - ] |
70 | | - for version in all_version_list: |
71 | | - version_obj = SemverVersion(version) |
72 | | - if any([version_obj in v for v in version_ranges]): |
73 | | - safe_pkg_versions.append(version) |
74 | | - |
75 | | - vuln_pkg_versions = set(all_version_list) - set(safe_pkg_versions) |
76 | | - return safe_pkg_versions, vuln_pkg_versions |
| 26 | + repo_url = "git+https://github.com/dependabot/elixir-security-advisories" |
| 27 | + license_url = "https://github.com/dependabot/elixir-security-advisories/blob/master/LICENSE.txt" |
| 28 | + spdx_license_expression = "CC0-1.0" |
| 29 | + |
| 30 | + def advisory_data(self) -> Set[AdvisoryData]: |
| 31 | + try: |
| 32 | + self.clone(self.repo_url) |
| 33 | + path = Path(self.vcs_response.dest_dir) |
| 34 | + vuln = path / "packages" |
| 35 | + for file in vuln.glob("**/*.yml"): |
| 36 | + yield from self.process_file(file) |
| 37 | + finally: |
| 38 | + if self.vcs_response: |
| 39 | + self.vcs_response.delete() |
77 | 40 |
|
78 | 41 | def process_file(self, path): |
| 42 | + path = str(path) |
79 | 43 | yaml_file = load_yaml(path) |
80 | | - pkg_name = yaml_file["package"] |
81 | | - safe_pkg_versions = [] |
82 | | - vuln_pkg_versions = [] |
83 | | - if not yaml_file.get("patched_versions"): |
84 | | - yaml_file["patched_versions"] = [] |
85 | | - |
86 | | - if not yaml_file.get("unaffected_versions"): |
87 | | - yaml_file["unaffected_versions"] = [] |
88 | | - |
89 | | - safe_pkg_versions, vuln_pkg_versions = self.get_versions_for_pkg_from_range_list( |
90 | | - yaml_file["patched_versions"] + yaml_file["unaffected_versions"], |
91 | | - pkg_name, |
92 | | - ) |
| 44 | + cve_id = "" |
| 45 | + summary = yaml_file.get("description") or "" |
| 46 | + pkg_name = yaml_file.get("package") or "" |
| 47 | + |
| 48 | + cve = yaml_file.get("cve") or "" |
| 49 | + |
| 50 | + if cve and not cve.startswith("CVE-"): |
| 51 | + cve_id = f"CVE-{cve}" |
| 52 | + |
| 53 | + if not cve_id: |
| 54 | + return [] |
| 55 | + |
| 56 | + if not is_cve(cve_id): |
| 57 | + return [] |
| 58 | + |
| 59 | + references = [] |
| 60 | + link = yaml_file.get("link") or "" |
| 61 | + if link: |
| 62 | + references.append( |
| 63 | + Reference( |
| 64 | + url=link, |
| 65 | + ) |
| 66 | + ) |
| 67 | + |
| 68 | + affected_packages = [] |
| 69 | + |
| 70 | + unaffected_versions = yaml_file.get("unaffected_versions") or [] |
| 71 | + patched_versions = yaml_file.get("patched_versions") or [] |
| 72 | + |
| 73 | + constraints = [] |
| 74 | + vrc = HexVersionRange.version_class |
| 75 | + |
| 76 | + for version in unaffected_versions: |
| 77 | + constraints.append(VersionConstraint.from_string(version_class=vrc, string=version)) |
| 78 | + |
| 79 | + for version in patched_versions: |
| 80 | + if version.startswith("~>"): |
| 81 | + version = version[2:] |
| 82 | + constraints.append( |
| 83 | + VersionConstraint.from_string(version_class=vrc, string=version).invert() |
| 84 | + ) |
| 85 | + |
| 86 | + if pkg_name: |
| 87 | + affected_packages.append( |
| 88 | + AffectedPackage( |
| 89 | + package=PackageURL( |
| 90 | + type="hex", |
| 91 | + name=pkg_name, |
| 92 | + ), |
| 93 | + affected_version_range=HexVersionRange(constraints=constraints), |
| 94 | + ) |
| 95 | + ) |
93 | 96 |
|
94 | | - if yaml_file.get("cve"): |
95 | | - cve_id = "CVE-" + yaml_file["cve"] |
96 | | - else: |
97 | | - cve_id = "" |
98 | | - |
99 | | - safe_purls = [] |
100 | | - vuln_purls = [] |
101 | | - |
102 | | - safe_purls = [ |
103 | | - PackageURL(name=pkg_name, type="hex", version=version) for version in safe_pkg_versions |
104 | | - ] |
105 | | - |
106 | | - vuln_purls = [ |
107 | | - PackageURL(name=pkg_name, type="hex", version=version) for version in vuln_pkg_versions |
108 | | - ] |
109 | | - |
110 | | - references = [ |
111 | | - Reference( |
112 | | - reference_id=yaml_file["id"], |
113 | | - ), |
114 | | - Reference( |
115 | | - url=yaml_file["link"], |
116 | | - ), |
117 | | - ] |
118 | | - |
119 | | - return AdvisoryData( |
120 | | - summary=yaml_file["description"], |
121 | | - affected_packages=nearest_patched_package(vuln_purls, safe_purls), |
122 | | - vulnerability_id=cve_id, |
| 97 | + yield AdvisoryData( |
| 98 | + aliases=[cve_id], |
| 99 | + summary=summary, |
123 | 100 | references=references, |
| 101 | + affected_packages=affected_packages, |
124 | 102 | ) |
0 commit comments