Skip to content

Commit 50d38db

Browse files
committed
Merge branch 'vulnerablecode_datasource' into vulntotal-clean
2 parents 3664c3e + fb7ef75 commit 50d38db

File tree

4 files changed

+2137
-0
lines changed

4 files changed

+2137
-0
lines changed

vulntotal/datasources/vulnerablecode.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,83 @@
2020
# for any legal advice.
2121
# VulnTotal is a free software tool from nexB Inc. and others.
2222
# Visit https://github.com/nexB/vulnerablecode/ for support and download.
23+
24+
import json
25+
import logging
26+
from typing import Iterable
27+
from urllib.parse import urljoin
28+
29+
import requests
30+
from packageurl import PackageURL
31+
32+
from vulntotal.validator import DataSource
33+
from vulntotal.validator import VendorData
34+
35+
logger = logging.getLogger(__name__)
36+
37+
38+
class VulnerableCodeDataSource(DataSource):
39+
spdx_license_expression = "CC-BY-SA-4.0"
40+
license_url = "https://github.com/nexB/vulnerablecode/blob/main/cc-by-sa-4.0.LICENSE"
41+
42+
global_instance = None
43+
vc_purl_search_api_path = "api/packages/bulk_search/"
44+
45+
def fetch_post_json(self, payload):
46+
vc_instance = self.global_instance if self.global_instance else "http://localhost:8001/"
47+
48+
url = urljoin(vc_instance, self.vc_purl_search_api_path)
49+
response = requests.post(url, json=payload)
50+
if not response.status_code == 200:
51+
logger.error(f"Error while fetching {url}")
52+
return
53+
return response.json()
54+
55+
def fetch_get_json(self, url):
56+
response = requests.get(url)
57+
if not response.status_code == 200:
58+
logger.error(f"Error while fetching {url}")
59+
return
60+
return response.json()
61+
62+
def datasource_advisory(self, purl) -> Iterable[VendorData]:
63+
if purl.type not in self.supported_ecosystem() or not purl.version:
64+
return
65+
metadata_advisories = self.fetch_post_json({"purls": [str(purl)]})
66+
self._raw_dump.append(metadata_advisories)
67+
if metadata_advisories and "affected_by_vulnerabilities" in metadata_advisories[0]:
68+
for advisory in metadata_advisories[0]["affected_by_vulnerabilities"]:
69+
fetched_advisory = self.fetch_get_json(advisory["url"])
70+
self._raw_dump.append(fetched_advisory)
71+
yield parse_advisory(fetched_advisory)
72+
73+
@classmethod
74+
def supported_ecosystem(cls):
75+
return {
76+
"alpine": "alpine",
77+
"cargo": "cargo",
78+
"composer": "composer",
79+
"deb": "deb",
80+
"golang": "golang",
81+
"maven": "maven",
82+
"nginx": "nginx",
83+
"npm": "npm",
84+
"nuget": "nuget",
85+
"pypi": "pypi",
86+
"rpm": "rpm",
87+
"gem": "gem",
88+
"openssl": "openssl",
89+
}
90+
91+
92+
def parse_advisory(fetched_advisory) -> VendorData:
93+
aliases = [aliase["alias"] for aliase in fetched_advisory["aliases"]]
94+
affected_versions = []
95+
fixed_versions = []
96+
for instance in fetched_advisory["affected_packages"]:
97+
affected_versions.append(PackageURL.from_string(instance["purl"]).version)
98+
for instance in fetched_advisory["fixed_packages"]:
99+
fixed_versions.append(PackageURL.from_string(instance["purl"]).version)
100+
return VendorData(
101+
aliases=aliases, affected_versions=affected_versions, fixed_versions=fixed_versions
102+
)

0 commit comments

Comments
 (0)