3232
3333
3434def parse_advisory_data (raw_data : dict , supported_ecosystem ) -> Optional [AdvisoryData ]:
35+ """
36+ Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
37+ a ``supported_ecosystem`` string.
38+ """
3539 raw_id = raw_data .get ("id" ) or ""
3640 summary = raw_data .get ("summary" ) or ""
3741 details = raw_data .get ("details" ) or ""
3842 summary = build_description (summary = summary , description = details )
3943 aliases = raw_data .get ("aliases" ) or []
4044 if raw_id :
4145 aliases .append (raw_id )
42- aliases = dedupe (aliases )
43- date_published = get_published_date (raw_data )
44- severity = list (get_severities (raw_data ))
45- references = get_references (raw_data , severity )
46+ aliases = dedupe (original = aliases )
47+
48+ date_published = get_published_date (raw_data = raw_data )
49+ severities = list (get_severities (raw_data = raw_data ))
50+ references = get_references (raw_data = raw_data , severities = severities )
4651
4752 affected_packages = []
48- if "affected" not in raw_data :
49- return AdvisoryData (
50- aliases = aliases ,
51- summary = summary ,
52- references = references ,
53- affected_packages = [],
54- date_published = date_published ,
55- )
5653
5754 for affected_pkg in raw_data .get ("affected" ) or []:
58- purl = get_affected_purl (affected_pkg , raw_id )
55+ purl = get_affected_purl (affected_pkg = affected_pkg , raw_id = raw_id )
5956 if purl .type != supported_ecosystem :
60- logger .error (
61- f"un supported ecosystem package found in the advisories: { purl } - from: { raw_id !r} "
62- )
57+ logger .error (f"Unsupported package type: { purl !r} in OSV: { raw_id !r} " )
6358 continue
6459
6560 affected_version_range = get_affected_version_range (
66- affected_pkg , raw_id , supported_ecosystem
61+ affected_pkg = affected_pkg ,
62+ raw_id = raw_id ,
63+ supported_ecosystem = supported_ecosystem ,
6764 )
68- for fixed_range in affected_pkg .get ("ranges" , []):
69- fixed_version = get_fixed_version (fixed_range , raw_id )
65+
66+ for fixed_range in affected_pkg .get ("ranges" ) or []:
67+ fixed_version = get_fixed_versions (fixed_range = fixed_range , raw_id = raw_id )
7068
7169 for version in fixed_version :
7270 affected_packages .append (
@@ -80,18 +78,24 @@ def parse_advisory_data(raw_data: dict, supported_ecosystem) -> Optional[Advisor
8078 return AdvisoryData (
8179 aliases = aliases ,
8280 summary = summary ,
83- affected_packages = affected_packages ,
8481 references = references ,
82+ affected_packages = affected_packages ,
8583 date_published = date_published ,
8684 )
8785
8886
89- def fixed_filter (fixed_range ) -> Iterable [str ]:
87+ def extract_fixed_versions (fixed_range ) -> Iterable [str ]:
9088 """
91- Return a list of fixed version strings given a ``fixed_range`` mapping of OSV data.
92- >>> list(fixed_filter({"type": "SEMVER", "events": [{"introduced": "0"}, {"fixed": "1.6.0"}]}))
89+ Return a list of fixed version strings given a ``fixed_range`` mapping of
90+ OSV data.
91+
92+ >>> list(extract_fixed_versions(
93+ ... {"type": "SEMVER", "events": [{"introduced": "0"},{"fixed": "1.6.0"}]}))
9394 ['1.6.0']
94- >>> list(fixed_filter({"type": "ECOSYSTEM","events":[{"introduced": "0"},{"fixed": "1.0.0"},{"fixed": "9.0.0"}]}))
95+
96+ >>> list(extract_fixed_versions(
97+ ... {"type": "ECOSYSTEM","events":[{"introduced": "0"},
98+ ... {"fixed": "1.0.0"},{"fixed": "9.0.0"}]}))
9599 ['1.0.0', '9.0.0']
96100 """
97101 for event in fixed_range .get ("events" ) or []:
@@ -102,97 +106,141 @@ def fixed_filter(fixed_range) -> Iterable[str]:
102106
103107def get_published_date (raw_data ):
104108 published = raw_data .get ("published" )
105- return published and dateparser .parse (published )
109+ return published and dateparser .parse (date_string = published )
106110
107111
108112def get_severities (raw_data ) -> Iterable [VulnerabilitySeverity ]:
109- for sever_list in raw_data .get ("severity" ) or []:
110- if sever_list .get ("type" ) == "CVSS_V3" :
113+ """
114+ Yield VulnerabilitySeverity extracted from a mapping of OSV ``raw_data``
115+ """
116+ for severity in raw_data .get ("severity" ) or []:
117+ if severity .get ("type" ) == "CVSS_V3" :
111118 yield VulnerabilitySeverity (
112- system = SCORING_SYSTEMS ["cvssv3.1_vector" ], value = sever_list ["score" ]
119+ system = SCORING_SYSTEMS ["cvssv3.1_vector" ],
120+ value = severity ["score" ],
113121 )
114122 else :
115- logger .error (f"NotImplementedError severity type- { raw_data ['id' ]!r} " )
123+ logger .error (f"Unsupported severity type: { severity !r } for OSV id: { raw_data ['id' ]!r} " )
116124
117- ecosys = raw_data .get ("ecosystem_specific" ) or {}
118- sever = ecosys .get ("severity" )
119- if sever :
125+ ecosystem_specific = raw_data .get ("ecosystem_specific" ) or {}
126+ severity = ecosystem_specific .get ("severity" )
127+ if severity :
120128 yield VulnerabilitySeverity (
121129 system = SCORING_SYSTEMS ["generic_textual" ],
122- value = sever ,
130+ value = severity ,
123131 )
124132
125133 database_specific = raw_data .get ("database_specific" ) or {}
126- sever = database_specific .get ("severity" )
127- if sever :
134+ severity = database_specific .get ("severity" )
135+ if severity :
128136 yield VulnerabilitySeverity (
129137 system = SCORING_SYSTEMS ["generic_textual" ],
130- value = sever ,
138+ value = severity ,
131139 )
132140
133141
134142def get_references (raw_data , severities ) -> List [Reference ]:
135- references = raw_data .get ("references" ) or []
136- return [Reference (url = ref ["url" ], severities = severities ) for ref in references if ref ]
143+ """
144+ Return a list Reference extracted from a mapping of OSV ``raw_data`` given a
145+ ``severities`` list of VulnerabilitySeverity.
146+ """
147+ references = []
148+ for ref in raw_data .get ("references" ) or []:
149+ if not ref :
150+ continue
151+
152+ url = ref ["url" ]
153+ if not url :
154+ logger .error (f"Reference without URL : { ref !r} for OSV id: { raw_data ['id' ]!r} " )
155+ continue
156+ references .append (Reference (url = ref ["url" ], severities = severities ))
157+ return references
137158
138159
139160def get_affected_purl (affected_pkg , raw_id ):
161+ """
162+ Return an affected PackageURL or None given a mapping of ``affected_pkg``
163+ data and a ``raw_id``.
164+ """
140165 package = affected_pkg .get ("package" ) or {}
141166 purl = package .get ("purl" )
142167 if purl :
143168 try :
144169 return PackageURL .from_string (purl )
145170 except ValueError :
146- logger .error (f"PackageURL ValueError - { raw_id !r} - purl: { purl !r} " )
171+ logger .error (
172+ f"Invalid PackageURL: { purl !r} for OSV "
173+ f"affected_pkg { affected_pkg } and id: { raw_id } "
174+ )
147175
148176 ecosys = package .get ("ecosystem" )
149177 name = package .get ("name" )
150178 if ecosys and name :
151179 return PackageURL (type = ecosys , name = name )
152- else :
153- logger .error (f"purl affected_pkg not found - { raw_id !r} " )
180+
181+ logger .error (
182+ f"No PackageURL possible: { purl !r} for affected_pkg { affected_pkg } for OSV id: { raw_id } "
183+ )
154184
155185
156186def get_affected_version_range (affected_pkg , raw_id , supported_ecosystem ):
187+ """
188+ Return a univers VersionRange for the ``affected_pkg`` package data mapping
189+ or None. Use a ``raw_id`` OSV id and ``supported_ecosystem``.
190+ """
157191 affected_versions = affected_pkg .get ("versions" )
158192 if affected_versions :
159193 try :
160194 return RANGE_CLASS_BY_SCHEMES [supported_ecosystem ].from_versions (affected_versions )
161195 except Exception as e :
162196 logger .error (
163- f"InvalidVersionRange affected_pkg_version_range Error - { raw_id !r} { e !r} "
197+ f"Invalid VersionRange for affected_pkg: { affected_pkg } "
198+ f"for OSV id: { raw_id !r} : error:{ e !r} "
164199 )
165- # else:
166- # logger.error(f"affected_pkg_version_range not found - {raw_id !r} ")
167200
168201
169- def get_fixed_version (fixed_range , raw_id ) -> List [Version ]:
202+ def get_fixed_versions (fixed_range , raw_id ) -> List [Version ]:
170203 """
171- Return a list of fixed versions, using fixed_filter we get the list of fixed version strings,
172- then we pass every element to their univers.versions , then we dedupe the result
173- >>> get_fixed_version({}, "GHSA-j3f7-7rmc-6wqj")
204+ Return a list of unique fixed univers Versions given a ``fixed_range``
205+ univers VersionRange and a ``raw_id``.
206+
207+ For example::
208+
209+ >>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj")
174210 []
175- >>> get_fixed_version({"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}]}, "GHSA-j3f7-7rmc-6wqj")
211+ >>> get_fixed_versions(
212+ ... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}]},
213+ ... raw_id="GHSA-j3f7-7rmc-6wqj"
214+ ... )
176215 [PypiVersion(string='1.7.0')]
177216 """
178- fixed_version = []
217+ fixed_versions = []
179218 if "type" not in fixed_range :
180- logger .error (f"Invalid type - { raw_id !r} " )
181- else :
182- list_fixed = fixed_filter (fixed_range )
183- fixed_range_type = fixed_range ["type" ]
184- for i in list_fixed :
185- if fixed_range_type == "ECOSYSTEM" :
186- try :
187- fixed_version .append (PypiVersion (i ))
188- except InvalidVersion :
189- logger .error (f"Invalid Version - PypiVersion - { raw_id !r} - { i !r} " )
190- if fixed_range_type == "SEMVER" :
191- try :
192- fixed_version .append (SemverVersion (i ))
193- except InvalidVersion :
194- logger .error (f"Invalid Version - SemverVersion - { raw_id !r} - { i !r} " )
195- # if fixed_range_type == "GIT":
196- # TODO add GitHubVersion univers fix_version
197- # logger.error(f"NotImplementedError GIT Version - {raw_id !r} - {i !r}")
198- return dedupe (fixed_version )
219+ logger .error (f"Invalid fixed_range type for: { fixed_range } for OSV id: { raw_id !r} " )
220+ return []
221+
222+ fixed_range_type = fixed_range ["type" ]
223+
224+ for version in extract_fixed_versions (fixed_range ):
225+
226+ # FIXME: ECOSYSTEM does not imply PyPI!!!!
227+ if fixed_range_type == "ECOSYSTEM" :
228+ try :
229+ fixed_versions .append (PypiVersion (version ))
230+ except InvalidVersion :
231+ logger .error (f"Invalid PypiVersion: { version !r} for OSV id: { raw_id !r} " )
232+
233+ elif fixed_range_type == "SEMVER" :
234+ try :
235+ fixed_versions .append (SemverVersion (version ))
236+ except InvalidVersion :
237+ logger .error (f"Invalid SemverVersion: { version !r} for OSV id: { raw_id !r} " )
238+
239+ else :
240+ logger .error (f"Unsupported fixed version type: { version !r} for OSV id: { raw_id !r} " )
241+
242+ # if fixed_range_type == "GIT":
243+ # TODO add GitHubVersion univers fix_version
244+ # logger.error(f"NotImplementedError GIT Version - {raw_id !r} - {i !r}")
245+
246+ return dedupe (fixed_versions )
0 commit comments