1111from typing import Iterable
1212
1313from dotenv import load_dotenv
14+ from packageurl import PackageURL
1415
1516from vulnerabilities import utils
1617from vulntotal .validator import DataSource
18+ from vulntotal .validator import InvalidCVEError
1719from vulntotal .validator import VendorData
1820from vulntotal .vulntotal_utils import get_item
1921from vulntotal .vulntotal_utils import github_constraints_satisfied
@@ -27,7 +29,7 @@ class GithubDataSource(DataSource):
2729
2830 def fetch_github (self , graphql_query ):
2931 """
30- Requires GitHub API key in .env file
32+ Requires GitHub API key in .env file.
3133 For example::
3234
3335 GH_TOKEN="your-github-token"
@@ -39,15 +41,36 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
3941 end_cursor = ""
4042 interesting_edges = []
4143 while True :
42- queryset = generate_graphql_payload (purl , end_cursor )
44+ queryset = generate_graphql_payload_from_purl (purl , end_cursor )
4345 response = self .fetch_github (queryset )
4446 self ._raw_dump .append (response )
4547 security_advisories = get_item (response , "data" , "securityVulnerabilities" )
4648 interesting_edges .extend (extract_interesting_edge (security_advisories ["edges" ], purl ))
4749 end_cursor = get_item (security_advisories , "pageInfo" , "endCursor" )
4850 if not security_advisories ["pageInfo" ]["hasNextPage" ]:
4951 break
50- return parse_advisory (interesting_edges )
52+ return parse_advisory (interesting_edges , purl )
53+
54+ def datasource_advisory_from_cve (self , cve : str ) -> Iterable [VendorData ]:
55+ if not cve .upper ().startswith ("CVE-" ):
56+ raise InvalidCVEError
57+
58+ queryset = generate_graphql_payload_from_cve (cve )
59+ response = self .fetch_github (queryset )
60+ self ._raw_dump = [response ]
61+ grouped_advisory = group_advisory_by_package (response , cve )
62+
63+ for advisory in grouped_advisory :
64+ ecosystem = get_item (advisory , "package" , "ecosystem" )
65+ ecosystem = get_purl_type (ecosystem )
66+ package_name = get_item (advisory , "package" , "name" )
67+ purl = PackageURL .from_string (f"pkg:{ ecosystem } /{ package_name } " )
68+ yield VendorData (
69+ purl = purl ,
70+ aliases = sorted (list (set (advisory .get ("identifiers" , None )))),
71+ affected_versions = sorted (list (set (advisory .get ("firstPatchedVersion" , None )))),
72+ fixed_versions = sorted (list (set (advisory .get ("vulnerableVersionRange" , None )))),
73+ )
5174
5275 @classmethod
5376 def supported_ecosystem (cls ):
@@ -61,17 +84,29 @@ def supported_ecosystem(cls):
6184 "cargo" : "RUST" ,
6285 "npm" : "NPM" ,
6386 "hex" : "ERLANG" ,
87+ "pub" : "PUB" ,
6488 }
6589
6690
67- def parse_advisory (interesting_edges ) -> Iterable [VendorData ]:
91+ def parse_advisory (interesting_edges , purl ) -> Iterable [VendorData ]:
92+ """
93+ Parse the GraphQL response and yield VendorData instances.
94+
95+ Parameters:
96+ interesting_edges (list): List of edges containing security advisory.
97+ purl (PackageURL): PURL to be included in VendorData.
98+
99+ Yields:
100+ VendorData instance containing purl, aliases, affected_versions and fixed_versions.
101+ """
68102 for edge in interesting_edges :
69103 node = edge ["node" ]
70104 aliases = [aliase ["value" ] for aliase in get_item (node , "advisory" , "identifiers" )]
71105 affected_versions = node ["vulnerableVersionRange" ].strip ().replace (" " , "" ).split ("," )
72106 parsed_fixed_versions = get_item (node , "firstPatchedVersion" , "identifier" )
73107 fixed_versions = [parsed_fixed_versions ] if parsed_fixed_versions else []
74108 yield VendorData (
109+ purl = PackageURL (purl .type , purl .namespace , purl .name ),
75110 aliases = sorted (list (set (aliases ))),
76111 affected_versions = sorted (list (set (affected_versions ))),
77112 fixed_versions = sorted (list (set (fixed_versions ))),
@@ -86,39 +121,49 @@ def extract_interesting_edge(edges, purl):
86121 return interesting_edges
87122
88123
89- def generate_graphql_payload (purl , end_cursor ):
124+ def generate_graphql_payload_from_purl (purl , end_cursor = "" ):
125+ """
126+ Generate a GraphQL payload for querying security vulnerabilities related to a PURL.
127+
128+ Parameters:
129+ purl (PackageURL): The PURL to search for vulnerabilities.
130+ end_cursor (str): An optional end cursor to use for pagination.
131+
132+ Returns:
133+ dict: A dictionary containing the GraphQL query string with ecosystem and package.
134+ """
90135 GRAPHQL_QUERY_TEMPLATE = """
91136 query{
92137 securityVulnerabilities(first: 100, ecosystem: %s, package: "%s", %s){
93138 edges {
94- node {
95- advisory {
96- identifiers {
97- type
98- value
139+ node {
140+ advisory {
141+ identifiers {
142+ type
143+ value
144+ }
145+ summary
146+ references {
147+ url
148+ }
149+ severity
150+ publishedAt
99151 }
100- summary
101- references {
102- url
152+ firstPatchedVersion{
153+ identifier
103154 }
104- severity
105- publishedAt
106- }
107- firstPatchedVersion{
108- identifier
109- }
110- package {
111- name
155+ package {
156+ name
157+ }
158+ vulnerableVersionRange
112159 }
113- vulnerableVersionRange
114160 }
115- }
116- pageInfo {
117- hasNextPage
118- endCursor
161+ pageInfo {
162+ hasNextPage
163+ endCursor
164+ }
119165 }
120166 }
121- }
122167 """
123168
124169 supported_ecosystem = GithubDataSource .supported_ecosystem ()
@@ -149,3 +194,146 @@ def generate_graphql_payload(purl, end_cursor):
149194 package_name = f"{ purl .namespace } /{ purl .name } "
150195
151196 return {"query" : GRAPHQL_QUERY_TEMPLATE % (ecosystem , package_name , end_cursor_exp )}
197+
198+
199+ def generate_graphql_payload_from_cve (cve : str ):
200+ """
201+ Generate a GraphQL payload for querying security advisories related to a CVE.
202+
203+ Parameters:
204+ - cve (str): CVE identifier string to search for.
205+
206+ Returns:
207+ - dict: Dictionary containing the GraphQL query string with the CVE identifier substituted in.
208+ """
209+ GRAPHQL_QUERY_TEMPLATE = """
210+ query {
211+ securityAdvisories(first: 100, identifier: { type: CVE, value: "%s" }) {
212+ nodes {
213+ vulnerabilities(first: 100) {
214+ nodes {
215+ package {
216+ ecosystem
217+ name
218+ }
219+ advisory {
220+ identifiers {
221+ type
222+ value
223+ }
224+ }
225+ firstPatchedVersion {
226+ identifier
227+ }
228+ vulnerableVersionRange
229+ }
230+ }
231+ }
232+ }
233+ }
234+ """
235+ return {"query" : GRAPHQL_QUERY_TEMPLATE % (cve )}
236+
237+
238+ def get_purl_type (github_ecosystem ):
239+ """
240+ Return the corresponding purl type for a given GitHub ecosystem string.
241+
242+ Parameters:
243+ github_ecosystem (str): The GitHub ecosystem string.
244+
245+ Returns:
246+ str or None: The corresponding purl type string, or None if the ecosystem is not supported.
247+ """
248+ ecosystems = GithubDataSource .supported_ecosystem ()
249+ for key , val in ecosystems .items ():
250+ if val == github_ecosystem .upper ():
251+ return key .lower ()
252+ return None
253+
254+
255+ def group_advisory_by_package (advisories_dict , cve ):
256+ """
257+ Extract security advisory information from a dictionary and groups them by package.
258+
259+ Parameters:
260+ advisories_dict (dict): Dictionary containing security advisory. The dictionary
261+ should have the following structure:
262+ {
263+ "data":{
264+ "securityAdvisories":{
265+ "nodes":[
266+ {
267+ "vulnerabilities":{
268+ "nodes":[
269+ {
270+ "package": {
271+ "ecosystem": str,
272+ "name": str
273+ },
274+ "advisory":{
275+ "identifiers":[
276+ { "value": str },
277+ ...
278+ ]
279+ },
280+ "firstPatchedVersion":{
281+ "identifier": str
282+ },
283+ "vulnerableVersionRange": str
284+ },
285+ ...
286+ ]
287+ }
288+ },
289+ ...
290+ ]
291+ }
292+ }
293+ }
294+
295+ cve (str): Used for filtering out advisory non maching CVEs.
296+
297+ Returns:
298+ list: List of dict containing advisory for package. Each dict
299+ in the list represents advisory for a package and has the following keys:
300+
301+ package (dict): Dict containing ecosystem and package name.
302+ identifiers (list of str): List of identifiers CVE and GHSA.
303+ firstPatchedVersion (list of str): List of first patched versions.
304+ vulnerableVersionRange (list of str): List of vulnerable version ranges.
305+ """
306+ advisories = advisories_dict ["data" ]["securityAdvisories" ]["nodes" ]
307+ output = []
308+
309+ for advisory in advisories :
310+ for vulnerability in advisory ["vulnerabilities" ]["nodes" ]:
311+ package = vulnerability ["package" ]
312+ advisory_ids = [
313+ identifier ["value" ] for identifier in vulnerability ["advisory" ]["identifiers" ]
314+ ]
315+
316+ # Skip advisory if required CVE is not present in advisory.
317+ # GraphQL query for `CVE-2022-2922` may also include advisory for `CVE-2022-29221`
318+ # `CVE-2022-29222` and `CVE-2022-29229`
319+ if cve not in advisory_ids :
320+ continue
321+ first_patched_version = vulnerability ["firstPatchedVersion" ]["identifier" ]
322+ vulnerable_version_range = vulnerability ["vulnerableVersionRange" ]
323+
324+ # Check if a vulnerability for the same package is already in the output list
325+ existing_vulnerability = next ((v for v in output if v ["package" ] == package ), None )
326+ if existing_vulnerability :
327+ existing_vulnerability ["identifiers" ] += advisory_ids
328+ existing_vulnerability ["firstPatchedVersion" ].append (first_patched_version )
329+ existing_vulnerability ["vulnerableVersionRange" ].append (vulnerable_version_range )
330+ else :
331+ output .append (
332+ {
333+ "package" : package ,
334+ "identifiers" : advisory_ids ,
335+ "firstPatchedVersion" : [first_patched_version ],
336+ "vulnerableVersionRange" : [vulnerable_version_range ],
337+ }
338+ )
339+ return output
0 commit comments