@@ -55,14 +55,15 @@ class CVEFetcher:
5555 "baseSeverity" : "N/A"
5656 }
5757
58- def __init__ (self , cve_id : str ):
58+ def __init__ (self , cve_id : str = None , cpe_id : str = None ):
5959 """
6060 Initializes the CVEFetcher with a specific CVE ID.
6161
6262 Args:
6363 cve_id (str): The CVE identifier to fetch data for.
6464 """
6565 self .cve_id = cve_id
66+ self .cpe_id = cpe_id
6667 self .data = {}
6768 self .successful = False
6869
@@ -78,7 +79,7 @@ def fetch_from_nist_gov(self):
7879 """
7980 try :
8081 headers = {"apiKey" : NVD_API_KEY }
81- url = parse .urlunparse (NVD_ADDRESS ) + self .cve_id
82+ url = parse .urlunparse (NVD_ADDRESS ) + "?cveId=" + self .cve_id
8283 logger .info (f"Fetching CVE data from NIST for CVE ID: { self .cve_id } using URL: { url } " )
8384 response = requests .get (url , headers = headers )
8485
@@ -127,6 +128,71 @@ def fetch_from_nist_gov(self):
127128 except (ValueError , KeyError ) as e :
128129 logger .error (f"Failed to retrieve or process CVE data for CVE ID: { self .cve_id } . Error: { e } " )
129130
131+ def fetch_from_nist_by_cpe (self ):
132+ """
133+ Fetches CVE data from the NIST government server for a given CPE ID.
134+
135+ The function will retrieve all CVEs associated with the provided CPE ID.
136+ """
137+ if not self .cpe_id :
138+ logger .error ("CPE ID is required for this fetcher." )
139+ return
140+
141+ try :
142+ headers = {"apiKey" : NVD_API_KEY }
143+ url = parse .urlunparse (NVD_ADDRESS ) + "?cpeName=" + self .cpe_id
144+ logger .info (f"Fetching CVE data from NIST for CPE ID: { self .cpe_id } using URL: { url } " )
145+ response = self ._make_request (url , headers )
146+
147+ if response :
148+ self ._process_cve_data (response )
149+ self .successful = True
150+ logger .info (f"Successfully fetched CVE data for CPE ID: { self .cpe_id } " )
151+
152+ except Exception as e :
153+ logger .error (f"Failed to fetch CVE data for CPE ID: { self .cpe_id } . Error: { e } " )
154+
155+
156+ def _make_request (self , url , headers ):
157+ """
158+ Makes a GET request to the provided URL with the given headers and handles the response.
159+ """
160+ try :
161+ response = requests .get (url , headers = headers )
162+
163+ if response .status_code != 200 :
164+ logger .warning (f"Failed to fetch CVE data for CPE ID: { self .cpe_id } . HTTP status: { response .status_code } " )
165+ return None
166+
167+ return response .json ()
168+
169+ except requests .RequestException as e :
170+ logger .error (f"Request failed for CPE ID: { self .cpe_id } . Error: { e } " )
171+ return None
172+
173+ def _process_cve_data (self , response_json ):
174+ """
175+ Processes the CVE data from the response and fetches additional CVE data for each CVE ID.
176+ """
177+ if not isinstance (response_json , dict ) or "vulnerabilities" not in response_json :
178+ raise ValueError (f"Invalid response structure for CPE ID: { self .cpe_id } " )
179+
180+ cve_list = response_json ["vulnerabilities" ]
181+ for cve_entry in cve_list :
182+ cve_id = cve_entry .get ("cve" , {}).get ("id" , None )
183+ if cve_id :
184+ self .fetch_cve_by_id (cve_id )
185+
186+ def fetch_cve_by_id (self , cve_id ):
187+ """
188+ Fetches CVE data for a specific CVE ID (helper function for fetching individual CVEs).
189+ """
190+ # Reuse the original `fetch_from_nist_gov` logic here for individual CVEs
191+ logger .info (f"Fetching individual CVE data for { cve_id } " )
192+ fetcher = CVEFetcher (cve_id = cve_id )
193+ fetcher .fetch_from_nist_gov ()
194+ self .data [cve_id ] = fetcher .data
195+
130196 def fetch_epss (self ):
131197 """
132198 Fetches the Exploit Prediction Scoring System (EPSS) score for the given CVE ID.
0 commit comments