|
11 | 11 |
|
12 | 12 | from pathlib import Path |
13 | 13 | from typing import Iterable |
14 | | -from typing import List |
15 | 14 |
|
16 | 15 | import pytz |
17 | 16 | from dateutil.parser import parse |
| 17 | +from fetchcode.vcs import fetch_via_vcs |
18 | 18 | from packageurl import PackageURL |
19 | 19 | from univers.version_range import NpmVersionRange |
20 | 20 |
|
21 | 21 | from vulnerabilities.importer import AdvisoryData |
22 | 22 | from vulnerabilities.importer import AffectedPackage |
23 | | -from vulnerabilities.importer import Importer |
24 | 23 | from vulnerabilities.importer import Reference |
25 | 24 | from vulnerabilities.importer import VulnerabilitySeverity |
| 25 | +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline |
26 | 26 | from vulnerabilities.severity_systems import CVSSV2 |
27 | 27 | from vulnerabilities.severity_systems import CVSSV3 |
28 | 28 | from vulnerabilities.utils import build_description |
29 | 29 | from vulnerabilities.utils import load_json |
30 | 30 |
|
31 | 31 |
|
32 | | -class NpmImporter(Importer): |
| 32 | +class NpmImporterPipeline(VulnerableCodeBaseImporterPipeline): |
| 33 | + """Collect advisories from nodejs GitHub repository.""" |
| 34 | + |
| 35 | + pipeline_id = "npm_importer" |
| 36 | + |
33 | 37 | spdx_license_expression = "MIT" |
34 | 38 | license_url = "https://github.com/nodejs/security-wg/blob/main/LICENSE.md" |
35 | 39 | repo_url = "git+https://github.com/nodejs/security-wg" |
36 | 40 | importer_name = "Npm Importer" |
37 | 41 |
|
38 | | - def advisory_data(self) -> Iterable[AdvisoryData]: |
39 | | - try: |
40 | | - self.clone(self.repo_url) |
41 | | - path = Path(self.vcs_response.dest_dir) |
| 42 | + @classmethod |
| 43 | + def steps(cls): |
| 44 | + return ( |
| 45 | + cls.clone, |
| 46 | + cls.collect_and_store_advisories, |
| 47 | + cls.import_new_advisories, |
| 48 | + cls.clean_downloads, |
| 49 | + ) |
| 50 | + |
| 51 | + def clone(self): |
| 52 | + self.log(f"Cloning `{self.repo_url}`") |
| 53 | + self.vcs_response = fetch_via_vcs(self.repo_url) |
42 | 54 |
|
43 | | - vuln = path / "vuln" |
44 | | - npm_vulns = vuln / "npm" |
45 | | - for file in npm_vulns.glob("*.json"): |
46 | | - yield from self.to_advisory_data(file) |
47 | | - finally: |
48 | | - if self.vcs_response: |
49 | | - self.vcs_response.delete() |
| 55 | + def advisories_count(self): |
| 56 | + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" |
| 57 | + return sum(1 for _ in vuln_directory.glob("*.json")) |
50 | 58 |
|
51 | | - def to_advisory_data(self, file: Path) -> List[AdvisoryData]: |
| 59 | + def collect_advisories(self) -> Iterable[AdvisoryData]: |
| 60 | + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" |
| 61 | + |
| 62 | + for advisory in vuln_directory.glob("*.json"): |
| 63 | + yield from self.to_advisory_data(advisory) |
| 64 | + |
| 65 | + def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: |
52 | 66 | data = load_json(file) |
53 | 67 | id = data.get("id") |
54 | 68 | description = data.get("overview") or "" |
@@ -144,3 +158,8 @@ def get_affected_package(self, data, package_name): |
144 | 158 | affected_version_range=affected_version_range, |
145 | 159 | fixed_version=fixed_version, |
146 | 160 | ) |
| 161 | + |
| 162 | + def clean_downloads(self): |
| 163 | + if self.vcs_response: |
| 164 | + self.log(f"Removing cloned repository") |
| 165 | + self.vcs_response.delete() |
0 commit comments