1010import logging
1111from typing import Iterable
1212
13+ from dotenv import load_dotenv
14+
1315from vulnerabilities import utils
1416from vulntotal .validator import DataSource
1517from vulntotal .validator import VendorData
18+ from vulntotal .vulntotal_utils import get_item
1619from vulntotal .vulntotal_utils import github_constraints_satisfied
1720
1821logger = logging .getLogger (__name__ )
@@ -23,6 +26,13 @@ class GithubDataSource(DataSource):
2326 license_url = "TODO"
2427
2528 def fetch_github (self , graphql_query ):
29+ """
30+ Requires GitHub API key in .env file
31+ For example::
32+
33+ GH_TOKEN="your-github-token"
34+ """
35+ load_dotenv ()
2636 return utils .fetch_github_graphql_query (graphql_query )
2737
2838 def datasource_advisory (self , purl ) -> Iterable [VendorData ]:
@@ -32,9 +42,9 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
3242 queryset = generate_graphql_payload (purl , end_cursor )
3343 response = self .fetch_github (queryset )
3444 self ._raw_dump .append (response )
35- security_advisories = response [ "data" ][ "securityVulnerabilities" ]
45+ security_advisories = get_item ( response , "data" , "securityVulnerabilities" )
3646 interesting_edges .extend (extract_interesting_edge (security_advisories ["edges" ], purl ))
37- end_cursor = security_advisories [ "pageInfo" ][ "endCursor" ]
47+ end_cursor = get_item ( security_advisories , "pageInfo" , "endCursor" )
3848 if not security_advisories ["pageInfo" ]["hasNextPage" ]:
3949 break
4050 return parse_advisory (interesting_edges )
@@ -57,9 +67,10 @@ def supported_ecosystem(cls):
5767def parse_advisory (interesting_edges ) -> Iterable [VendorData ]:
5868 for edge in interesting_edges :
5969 node = edge ["node" ]
60- aliases = [aliase ["value" ] for aliase in node [ "advisory" ][ "identifiers" ] ]
70+ aliases = [aliase ["value" ] for aliase in get_item ( node , "advisory" , "identifiers" ) ]
6171 affected_versions = node ["vulnerableVersionRange" ].strip ().replace (" " , "" ).split ("," )
62- fixed_versions = [node ["firstPatchedVersion" ]["identifier" ]]
72+ parsed_fixed_versions = get_item (node , "firstPatchedVersion" , "identifier" )
73+ fixed_versions = [parsed_fixed_versions ] if parsed_fixed_versions else []
6374 yield VendorData (
6475 aliases = sorted (list (set (aliases ))),
6576 affected_versions = sorted (list (set (affected_versions ))),
0 commit comments