1313from typing import Optional
1414
1515import dateparser
16+ from cvss .exceptions import CVSS3MalformedError
1617from packageurl import PackageURL
1718from univers .version_range import RANGE_CLASS_BY_SCHEMES
1819from univers .versions import InvalidVersion
19- from univers .versions import PypiVersion
2020from univers .versions import SemverVersion
2121from univers .versions import Version
2222
3131
3232logger = logging .getLogger (__name__ )
3333
34+ PURL_TYPE_BY_OSV_ECOSYSTEM = {
35+ "npm" : "npm" ,
36+ "pypi" : "pypi" ,
37+ "maven" : "maven" ,
38+ "nuget" : "nuget" ,
39+ "packagist" : "composer" ,
40+ "rubygems" : "gem" ,
41+ "go" : "golang" ,
42+ "hex" : "hex" ,
43+ "cargo" : "cargo" ,
44+ }
45+
3446
3547def parse_advisory_data (
36- raw_data : dict , supported_ecosystem , advisory_url : str
48+ raw_data : dict , supported_ecosystems , advisory_url : str
3749) -> Optional [AdvisoryData ]:
3850 """
3951 Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
@@ -56,18 +68,21 @@ def parse_advisory_data(
5668
5769 for affected_pkg in raw_data .get ("affected" ) or []:
5870 purl = get_affected_purl (affected_pkg = affected_pkg , raw_id = raw_id )
59- if purl .type != supported_ecosystem :
60- logger .error (f"Unsupported package type: { purl !r} in OSV: { raw_id !r} " )
71+
72+ if not purl or purl .type not in supported_ecosystems :
73+ logger .error (f"Unsupported package type: { affected_pkg !r} in OSV: { raw_id !r} " )
6174 continue
6275
6376 affected_version_range = get_affected_version_range (
6477 affected_pkg = affected_pkg ,
6578 raw_id = raw_id ,
66- supported_ecosystem = supported_ecosystem ,
79+ supported_ecosystem = purl . type ,
6780 )
6881
6982 for fixed_range in affected_pkg .get ("ranges" ) or []:
70- fixed_version = get_fixed_versions (fixed_range = fixed_range , raw_id = raw_id )
83+ fixed_version = get_fixed_versions (
84+ fixed_range = fixed_range , raw_id = raw_id , supported_ecosystem = purl .type
85+ )
7186
7287 for version in fixed_version :
7388 affected_packages .append (
@@ -121,14 +136,22 @@ def get_severities(raw_data) -> Iterable[VulnerabilitySeverity]:
121136 """
122137 Yield VulnerabilitySeverity extracted from a mapping of OSV ``raw_data``
123138 """
124- for severity in raw_data .get ("severity" ) or []:
125- if severity .get ("type" ) == "CVSS_V3" :
126- vector = severity ["score" ]
127- system = SCORING_SYSTEMS ["cvssv3.1" ]
128- score = system .compute (vector )
129- yield VulnerabilitySeverity (system = system , value = score , scoring_elements = vector )
130- else :
131- logger .error (f"Unsupported severity type: { severity !r} for OSV id: { raw_data ['id' ]!r} " )
139+ try :
140+ for severity in raw_data .get ("severity" ) or []:
141+ if severity .get ("type" ) == "CVSS_V3" :
142+ vector = severity .get ("score" )
143+ # remove the / from the end of the vector if / exist
144+ valid_vector = vector [:- 1 ] if vector and vector [- 1 ] == "/" else vector
145+ system = SCORING_SYSTEMS ["cvssv3.1" ]
146+ score = system .compute (valid_vector )
147+ yield VulnerabilitySeverity (system = system , value = score , scoring_elements = vector )
148+
149+ else :
150+ logger .error (
151+ f"Unsupported severity type: { severity !r} for OSV id: { raw_data ['id' ]!r} "
152+ )
153+ except CVSS3MalformedError as e :
154+ logger .error (f"Invalid severity { e } " )
132155
133156 ecosystem_specific = raw_data .get ("ecosystem_specific" ) or {}
134157 severity = ecosystem_specific .get ("severity" )
@@ -173,21 +196,31 @@ def get_affected_purl(affected_pkg, raw_id):
173196 purl = package .get ("purl" )
174197 if purl :
175198 try :
176- return PackageURL .from_string (purl )
199+ purl = PackageURL .from_string (purl )
177200 except ValueError :
178201 logger .error (
179202 f"Invalid PackageURL: { purl !r} for OSV "
180203 f"affected_pkg { affected_pkg } and id: { raw_id } "
181204 )
182-
183- ecosys = package .get ("ecosystem" )
184- name = package .get ("name" )
185- if ecosys and name :
186- return PackageURL (type = ecosys , name = name )
187-
188- logger .error (
189- f"No PackageURL possible: { purl !r} for affected_pkg { affected_pkg } for OSV id: { raw_id } "
190- )
205+ else :
206+ ecosys = package .get ("ecosystem" )
207+ name = package .get ("name" )
208+ if ecosys and name :
209+ ecosys = ecosys .lower ()
210+ purl_type = PURL_TYPE_BY_OSV_ECOSYSTEM .get (ecosys )
211+ if not purl_type :
212+ return
213+ namespace = ""
214+ if purl_type == "maven" :
215+ namespace , _ , name = name .partition (":" )
216+
217+ purl = PackageURL (type = purl_type , namespace = namespace , name = name )
218+ else :
219+ logger .error (
220+ f"No PackageURL possible: { purl !r} for affected_pkg { affected_pkg } for OSV id: { raw_id } "
221+ )
222+ return
223+ return PackageURL .from_string (str (purl ))
191224
192225
193226def get_affected_version_range (affected_pkg , raw_id , supported_ecosystem ):
@@ -206,18 +239,17 @@ def get_affected_version_range(affected_pkg, raw_id, supported_ecosystem):
206239 )
207240
208241
209- def get_fixed_versions (fixed_range , raw_id ) -> List [Version ]:
242+ def get_fixed_versions (fixed_range , raw_id , supported_ecosystem ) -> List [Version ]:
210243 """
211244 Return a list of unique fixed univers Versions given a ``fixed_range``
212245 univers VersionRange and a ``raw_id``.
213-
214246 For example::
215-
216- >>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj")
247+ >>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj", supported_ecosystem="pypi",)
217248 []
218249 >>> get_fixed_versions(
219- ... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}]},
220- ... raw_id="GHSA-j3f7-7rmc-6wqj"
250+ ... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}], },
251+ ... raw_id="GHSA-j3f7-7rmc-6wqj",
252+ ... supported_ecosystem="pypi",
221253 ... )
222254 [PypiVersion(string='1.7.0')]
223255 """
@@ -228,21 +260,27 @@ def get_fixed_versions(fixed_range, raw_id) -> List[Version]:
228260
229261 fixed_range_type = fixed_range ["type" ]
230262
231- for version in extract_fixed_versions (fixed_range ):
263+ version_range_class = RANGE_CLASS_BY_SCHEMES .get (supported_ecosystem )
264+ version_class = version_range_class .version_class if version_range_class else None
232265
233- # FIXME: ECOSYSTEM does not imply PyPI!!!!
266+ for version in extract_fixed_versions ( fixed_range ):
234267 if fixed_range_type == "ECOSYSTEM" :
235268 try :
236- fixed_versions .append (PypiVersion (version ))
269+ if not version_class :
270+ raise InvalidVersion (
271+ f"Unsupported version for ecosystem: { supported_ecosystem } "
272+ )
273+ fixed_versions .append (version_class (version ))
237274 except InvalidVersion :
238- logger .error (f"Invalid PypiVersion: { version !r} for OSV id: { raw_id !r} " )
275+ logger .error (
276+ f"Invalid version class: { version_class } - { version !r} for OSV id: { raw_id !r} "
277+ )
239278
240279 elif fixed_range_type == "SEMVER" :
241280 try :
242281 fixed_versions .append (SemverVersion (version ))
243282 except InvalidVersion :
244283 logger .error (f"Invalid SemverVersion: { version !r} for OSV id: { raw_id !r} " )
245-
246284 else :
247285 logger .error (f"Unsupported fixed version type: { version !r} for OSV id: { raw_id !r} " )
248286
0 commit comments