Skip to content

Commit 44f488f

Browse files
committed
add a PyPa importer , organize the code using a shared osv.py
Signed-off-by: Ziad <[email protected]>
1 parent c94ed57 commit 44f488f

File tree

9 files changed

+710
-459
lines changed

9 files changed

+710
-459
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from vulnerabilities.importers import nginx
1515
from vulnerabilities.importers import nvd
1616
from vulnerabilities.importers import openssl
17+
from vulnerabilities.importers import pypa
1718
from vulnerabilities.importers import pysec
1819
from vulnerabilities.importers import redhat
1920

@@ -27,6 +28,7 @@
2728
pysec.PyPIImporter,
2829
debian.DebianImporter,
2930
gitlab.GitLabAPIImporter,
31+
pypa.PyPaImporter,
3032
]
3133

3234
IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}

vulnerabilities/importers/osv.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/nexB/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
from typing import Iterable
12+
from typing import List
13+
from typing import Optional
14+
15+
import dateparser
16+
from packageurl import PackageURL
17+
from univers.version_range import PypiVersionRange
18+
from univers.versions import InvalidVersion
19+
from univers.versions import PypiVersion
20+
from univers.versions import SemverVersion
21+
from univers.versions import Version
22+
23+
from vulnerabilities.importer import AdvisoryData
24+
from vulnerabilities.importer import AffectedPackage
25+
from vulnerabilities.importer import Reference
26+
from vulnerabilities.importer import VulnerabilitySeverity
27+
from vulnerabilities.severity_systems import SCORING_SYSTEMS
28+
from vulnerabilities.utils import build_description
29+
from vulnerabilities.utils import dedupe
30+
31+
logger = logging.getLogger(__name__)
32+
33+
34+
def parse_advisory_data(raw_data: dict) -> Optional[AdvisoryData]:
35+
raw_id = raw_data.get("id") or ""
36+
summary = raw_data.get("summary") or ""
37+
details = raw_data.get("details") or ""
38+
summary = build_description(summary=summary, description=details)
39+
aliases = raw_data.get("aliases") or []
40+
if raw_id:
41+
aliases.append(raw_id)
42+
date_published = get_published_date(raw_data)
43+
severity = list(get_severities(raw_data))
44+
references = get_references(raw_data, severity)
45+
46+
affected_packages = []
47+
if "affected" not in raw_data:
48+
logger.error(f"affected_packages not found - {raw_id !r}")
49+
return AdvisoryData(
50+
aliases=aliases,
51+
summary=summary,
52+
references=references,
53+
affected_packages=[],
54+
date_published=date_published,
55+
)
56+
57+
for affected_pkg in raw_data.get("affected") or []:
58+
purl = get_affected_purl(affected_pkg, raw_id)
59+
if purl.type != "pypi":
60+
logger.error(f"Non PyPI package found in PYSEC advisories: {purl} - from: {raw_id !r}")
61+
continue
62+
63+
affected_version_range = get_affected_version_range(affected_pkg, raw_id)
64+
for fixed_range in affected_pkg.get("ranges", []):
65+
fixed_version = get_fixed_version(fixed_range, raw_id)
66+
67+
for version in fixed_version:
68+
affected_packages.append(
69+
AffectedPackage(
70+
package=purl,
71+
affected_version_range=affected_version_range,
72+
fixed_version=version,
73+
)
74+
)
75+
76+
return AdvisoryData(
77+
aliases=aliases,
78+
summary=summary,
79+
affected_packages=affected_packages,
80+
references=references,
81+
date_published=date_published,
82+
)
83+
84+
85+
def fixed_filter(fixed_range) -> Iterable[str]:
86+
"""
87+
Return a list of fixed version strings given a ``fixed_range`` mapping of OSV data.
88+
>>> list(fixed_filter({"type": "SEMVER", "events": [{"introduced": "0"}, {"fixed": "1.6.0"}]}))
89+
['1.6.0']
90+
>>> list(fixed_filter({"type": "ECOSYSTEM","events":[{"introduced": "0"},{"fixed": "1.0.0"},{"fixed": "9.0.0"}]}))
91+
['1.0.0', '9.0.0']
92+
"""
93+
for event in fixed_range.get("events") or []:
94+
fixed = event.get("fixed")
95+
if fixed:
96+
yield fixed
97+
98+
99+
def get_published_date(raw_data):
100+
published = raw_data.get("published")
101+
return published and dateparser.parse(published)
102+
103+
104+
def get_severities(raw_data) -> Iterable[VulnerabilitySeverity]:
105+
for sever_list in raw_data.get("severity") or []:
106+
if sever_list.get("type") == "CVSS_V3":
107+
yield VulnerabilitySeverity(
108+
system=SCORING_SYSTEMS["cvssv3.1_vector"], value=sever_list["score"]
109+
)
110+
else:
111+
logger.error(f"NotImplementedError severity type- {raw_data['id']!r}")
112+
113+
ecosys = raw_data.get("ecosystem_specific") or {}
114+
sever = ecosys.get("severity")
115+
if sever:
116+
yield VulnerabilitySeverity(
117+
system=SCORING_SYSTEMS["generic_textual"],
118+
value=sever,
119+
)
120+
121+
database_specific = raw_data.get("database_specific") or {}
122+
sever = database_specific.get("severity")
123+
if sever:
124+
yield VulnerabilitySeverity(
125+
system=SCORING_SYSTEMS["generic_textual"],
126+
value=sever,
127+
)
128+
129+
130+
def get_references(raw_data, severities) -> List[Reference]:
131+
references = raw_data.get("references") or []
132+
return [Reference(url=ref["url"], severities=severities) for ref in references if ref]
133+
134+
135+
def get_affected_purl(affected_pkg, raw_id):
136+
package = affected_pkg.get("package") or {}
137+
purl = package.get("purl")
138+
if purl:
139+
try:
140+
return PackageURL.from_string(purl)
141+
except ValueError:
142+
logger.error(f"PackageURL ValueError - {raw_id !r} - purl: {purl !r}")
143+
144+
ecosys = package.get("ecosystem")
145+
name = package.get("name")
146+
if ecosys and name:
147+
return PackageURL(type=ecosys, name=name)
148+
else:
149+
logger.error(f"purl affected_pkg not found - {raw_id !r}")
150+
151+
152+
def get_affected_version_range(affected_pkg, raw_id):
153+
affected_versions = affected_pkg.get("versions")
154+
if affected_versions:
155+
try:
156+
return PypiVersionRange.from_versions(affected_versions)
157+
except Exception as e:
158+
logger.error(
159+
f"InvalidVersionRange affected_pkg_version_range Error - {raw_id !r} {e!r}"
160+
)
161+
# else:
162+
# logger.error(f"affected_pkg_version_range not found - {raw_id !r} ")
163+
164+
165+
def get_fixed_version(fixed_range, raw_id) -> List[Version]:
166+
"""
167+
Return a list of fixed versions, using fixed_filter we get the list of fixed version strings,
168+
then we pass every element to their univers.versions , then we dedupe the result
169+
>>> get_fixed_version({}, "GHSA-j3f7-7rmc-6wqj")
170+
[]
171+
>>> get_fixed_version({"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}]}, "GHSA-j3f7-7rmc-6wqj")
172+
[PypiVersion(string='1.7.0')]
173+
"""
174+
fixed_version = []
175+
if "type" not in fixed_range:
176+
logger.error(f"Invalid type - {raw_id!r}")
177+
else:
178+
list_fixed = fixed_filter(fixed_range)
179+
fixed_range_type = fixed_range["type"]
180+
for i in list_fixed:
181+
if fixed_range_type == "ECOSYSTEM":
182+
try:
183+
fixed_version.append(PypiVersion(i))
184+
except InvalidVersion:
185+
logger.error(f"Invalid Version - PypiVersion - {raw_id !r} - {i !r}")
186+
if fixed_range_type == "SEMVER":
187+
try:
188+
fixed_version.append(SemverVersion(i))
189+
except InvalidVersion:
190+
logger.error(f"Invalid Version - SemverVersion - {raw_id !r} - {i !r}")
191+
if fixed_range_type == "GIT":
192+
# TODO add GitHubVersion univers fix_version
193+
logger.error(f"NotImplementedError GIT Version - {raw_id !r} - {i !r}")
194+
195+
return dedupe(fixed_version)

vulnerabilities/importers/pypa.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/nexB/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
import logging
10+
import os
11+
from typing import Iterable
12+
13+
import saneyaml
14+
from fetchcode.vcs.git import fetch_via_git
15+
16+
from vulnerabilities.importer import AdvisoryData
17+
from vulnerabilities.importer import Importer
18+
from vulnerabilities.importers.osv import parse_advisory_data
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
class PyPaImporter(Importer):
24+
license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE"
25+
spdx_license_expression = "CC-BY-4.0"
26+
url = "git+https://github.com/pypa/advisory-database"
27+
28+
def advisory_data(self) -> Iterable[AdvisoryData]:
29+
for file in fork_and_get_files(self.url):
30+
yield parse_advisory_data(file)
31+
32+
33+
class ForkError(Exception):
34+
pass
35+
36+
37+
def fork_and_get_files(url) -> dict:
38+
"""
39+
Fetch the github repository and go to vulns directory ,
40+
then open directories one by one and return a file .
41+
"""
42+
try:
43+
fork_directory = fetch_via_git(url=url)
44+
except Exception as e:
45+
logger.error(f"Can't clone url {url}")
46+
raise ForkError(url) from e
47+
48+
advisory_dirs = os.path.join(fork_directory.dest_dir, "vulns")
49+
for root, _, files in os.walk(advisory_dirs):
50+
for file in files:
51+
if not file.endswith(".yaml"):
52+
logger.warning(f"unsupported file {file}")
53+
else:
54+
with open(os.path.join(root, file), "r") as f:
55+
yield saneyaml.load(f.read())

0 commit comments

Comments
 (0)