88#
99
1010import logging
11+ import os
1112from typing import Iterable
1213from urllib .parse import urljoin
1314
1415import requests
16+ from dotenv import load_dotenv
1517from packageurl import PackageURL
1618
1719from vulntotal .validator import DataSource
@@ -24,21 +26,19 @@ class VulnerableCodeDataSource(DataSource):
2426 spdx_license_expression = "CC-BY-SA-4.0"
2527 license_url = "https://github.com/nexB/vulnerablecode/blob/main/cc-by-sa-4.0.LICENSE"
2628
27- global_instance = None
29+ global_instance = "https://public.vulnerablecode.io/"
2830 vc_purl_search_api_path = "api/packages/bulk_search/"
2931
3032 def fetch_post_json (self , payload ):
31- vc_instance = self .global_instance if self .global_instance else "http://localhost:8001/"
32-
33- url = urljoin (vc_instance , self .vc_purl_search_api_path )
34- response = requests .post (url , json = payload )
33+ url = urljoin (self .global_instance , self .vc_purl_search_api_path )
34+ response = fetch_vulnerablecode_query (url = url , payload = payload )
3535 if not response .status_code == 200 :
3636 logger .error (f"Error while fetching { url } " )
3737 return
3838 return response .json ()
3939
4040 def fetch_get_json (self , url ):
41- response = requests . get (url )
41+ response = fetch_vulnerablecode_query (url = url , payload = None )
4242 if not response .status_code == 200 :
4343 logger .error (f"Error while fetching { url } " )
4444 return
@@ -85,3 +85,32 @@ def parse_advisory(fetched_advisory) -> VendorData:
8585 return VendorData (
8686 aliases = aliases , affected_versions = affected_versions , fixed_versions = fixed_versions
8787 )
88+
89+
90+ class VCIOTokenError (Exception ):
91+ pass
92+
93+
94+ def fetch_vulnerablecode_query (url : str , payload : dict ):
95+ """
96+ Requires VCIO API key in .env file
97+ For example::
98+
99+ VCIO_TOKEN="OJ78Os2IPfM80hqVT2ek+1QnrTKvsX1HdOMABq3pmQd"
100+ """
101+ load_dotenv ()
102+ vcio_token = os .environ .get ("VCIO_TOKEN" , None )
103+ if not vcio_token :
104+ msg = "Cannot call VulnerableCode API without a token set in the VCIO_TOKEN environment variable."
105+ raise VCIOTokenError (msg )
106+
107+ response = (
108+ requests .post (url , headers = {"Authorization" : f"Token { vcio_token } " }, json = payload )
109+ if payload is not None
110+ else requests .get (url , headers = {"Authorization" : f"Token { vcio_token } " })
111+ )
112+
113+ if response .text .startswith ('{"detail":' ):
114+ raise VCIOTokenError (f"{ response .json ().get ('detail' )} " )
115+
116+ return response
0 commit comments