107107global MAX_RETRIES
108108global SLEEP
109109MAX_RETRIES = 30
110- SLEEP = 5
110+ SLEEP = 10
111111
112112logging .basicConfig (
113113 level = logging .INFO ,
114114 format = "[%(asctime)s] {%(module)s:%(lineno)d} %(levelname)s - %(message)s"
115115)
116116
117+ logging .getLogger ("blackduck" ).setLevel (logging .CRITICAL )
118+
117119# Validates BD project and version
118120# Inputs:
119121# projname - Name of project
@@ -187,17 +189,18 @@ def get_sbom_mime_type(filename):
187189 return 'application/spdx'
188190 return None
189191
190- def poll_notifications_for_success (cl , proj_version_url , summaries_url ):
191- # We want to locate a notification for
192- # VERSION_BOM_CODE_LOCATION_BOM_COMPUTED
193- # matching our proj_version_url and our codelocation
192+ # Poll for notification alerting us of successful BOM computation
193+ #
194+ # Inputs:
195+ # cl_url - Code Loction URL to match
196+ # proj_version_url - Project Version URL to match
197+ # summaries_url - Summaries URL from codelocation
198+ #
199+ # Returns on success. Errors are fatal.
200+ def poll_notifications_for_success (cl_url , proj_version_url , summaries_url ):
194201 retries = MAX_RETRIES
195202 sleep_time = SLEEP
196203
197- # current theory: if a scan happened and we matched NOTHING, we
198- # aren't going to get a BOM_COMPUTED notification. so is there any type
199- # of notif that we DO get?
200-
201204 params = {
202205 'filter' : ["notificationType:VERSION_BOM_CODE_LOCATION_BOM_COMPUTED" ],
203206 'sort' : ["createdAt: ASC" ]
@@ -207,30 +210,35 @@ def poll_notifications_for_success(cl, proj_version_url, summaries_url):
207210 retries -= 1
208211 for result in bd .get_items ("/api/notifications" , params = params ):
209212 if 'projectVersion' not in result ['content' ]:
210- # skip it (shouldn 't be possible due to the filter)
213+ # Shouldn 't be possible due to the filter
211214 continue
212- # We're checking the entire list of notifications, but ours is
213- # likely to be the first. Walking the whole list to make
214- # sure we find an exact match.
215+ # We're checking the entire list of notifications, but ours should
216+ # be near the top.
215217 if result ['content' ]['projectVersion' ] == proj_version_url and \
216- result ['content' ]['codeLocation' ] == cl [ '_meta' ][ 'href' ] and \
218+ result ['content' ]['codeLocation' ] == cl_url and \
217219 result ['content' ]['scanSummary' ] == summaries_url :
218220 print ("BOM calculation complete" )
219221 return
220222
221223 print ("Waiting for BOM calculation to complete" )
222224 time .sleep (sleep_time )
223225
224- logging .error (f"Failed to verify successful BOM computed in { retries * sleep_time } seconds" )
226+ logging .error (f"Failed to verify successful BOM computed in { MAX_RETRIES * sleep_time } seconds" )
225227 sys .exit (1 )
226228
227229# Poll for successful scan of SBOM.
228- # Input: Name of SBOM document (not the filename, the name defined inside the json body)
230+ # Inputs:
231+ #
232+ # sbom_name: Name of SBOM document (not the filename, the name defined
233+ # inside the json body)
234+ # proj_version_url: Project version url
235+ #
229236# Returns on success. Errors will result in fatal exit.
230237def poll_for_sbom_complete (sbom_name , proj_version_url ):
231238 retries = MAX_RETRIES
232239 sleep_time = SLEEP
233240 matched_scan = False
241+ cl_url = None
234242
235243 # Replace any spaces in the name with a dash to match BD
236244 sbom_name = sbom_name .replace (' ' , '-' )
@@ -242,23 +250,29 @@ def poll_for_sbom_complete(sbom_name, proj_version_url):
242250 }
243251 cls = bd .get_resource ('codeLocations' , params = params )
244252 for cl in cls :
253+ if matched_scan :
254+ break
245255 # Force exact match of: spdx_doc_name + " spdx/sbom"
246256 # BD appends the "spdx/sbom" string to the name.
247257 if cl ['name' ] != sbom_name + " spdx/sbom" :
248258 continue
249259
250260 matched_scan = True
261+ cl_url = cl ['_meta' ]['href' ]
262+
251263 for link in (cl ['_meta' ]['links' ]):
252264 # Locate the scans URL to check for status
253265 if link ['rel' ] == "latest-scan" :
254266 latest_url = link ['href' ]
255267 break
256268
257- assert latest_url , "Failed to locate latest-scan reference"
258269 if not matched_scan :
259270 logging .error (f"No scan found for SBOM: { sbom_name } " )
260271 sys .exit (1 )
261272
273+ assert latest_url , "Failed to locate latest-scan reference"
274+ assert cl_url , "Failed to locate codelocation reference"
275+
262276 # Wait for scanState = SUCCESS
263277 while (retries ):
264278 json_data = bd .get_json (latest_url )
@@ -271,43 +285,40 @@ def poll_for_sbom_complete(sbom_name, proj_version_url):
271285 sys .exit (1 )
272286 else :
273287 # Only other state should be "STARTED" -- keep polling
274- print (f"Waiting for status success , currently: { json_data ['scanState' ]} " )
288+ print (f"Waiting for scan completion , currently: { json_data ['scanState' ]} " )
275289 time .sleep (sleep_time )
276290
277291 # If there were ZERO matches, there will never be a notification of
278- # BOM import success. Short-circuit that check and treat this as success.
292+ # BOM import success. Short-circuit the check and treat this as success.
279293 if json_data ['matchCount' ] == 0 :
280- print ("No KB matches in BOM , continuing..." )
294+ print ("No BOM KB matches, continuing..." )
281295 return
282296
283297 # Save the codelocation summaries_url
284298 summaries_url = json_data ['_meta' ]['href' ]
285299
286- # Greedy match - extract the scan id out of the URL
287- #scanid = re.findall(r'.*\/(.*)', json_data['_meta']['href'])
288- # proj_Version_url/bom-status/scanid does NOT WORK
289-
290- # TODO this seems actually fairly pointless - it get stuck in UP_TO_DATE
300+ # Check the bom-status endpoint for success
291301 retries = MAX_RETRIES
292302 while (retries ):
293303 json_data = bd .get_json (proj_version_url + "/bom-status" )
294304 retries -= 1
295305 if json_data ['status' ] == "UP_TO_DATE" :
296306 print ("BOM import complete" )
297307 break
298- elif json_data ['status' ] == "FAILURE" :
299- logging .error (f"BOM Import failure: { json_data ['status' ]} " )
308+ elif json_data ['status' ] == "UP_TO_DATE_WITH_ERRORS" or \
309+ json_data ['status' ] == "PROCESSING_WITH_ERRORS" :
310+ logging .error (f"BOM Import failure: status is { json_data ['status' ]} " )
300311 sys .exit (1 )
301312 else :
302313 print (f"Waiting for BOM import completion, current status: { json_data ['status' ]} " )
303314 time .sleep (sleep_time )
304315
305316 if retries == 0 :
306- logging .error ("Failed to verify successful SBOM import in {retries * sleep_time} seconds" )
317+ logging .error (f "Failed to verify successful SBOM import in { retries * sleep_time } seconds" )
307318 sys .exit (1 )
308319
309320 # Finally check notifications
310- poll_notifications_for_success (cl , proj_version_url , summaries_url )
321+ poll_notifications_for_success (cl_url , proj_version_url , summaries_url )
311322
312323 # Any errors above already resulted in fatal exit
313324 return
@@ -580,8 +591,7 @@ def main():
580591 package_count = 0
581592 cust_comp_count = 0
582593 cust_ver_count = 0
583- # Saving all encountered components by their name+version
584- # Used for debugging repeated package data
594+ # Used for tracking repeated package data
585595 packages = {}
586596 # Saved component data to write to file
587597 comps_out = []
@@ -595,8 +605,10 @@ def main():
595605
596606 if package .name == "" :
597607 # Strange case where the package name is empty. Skip it.
598- logging .warning ("WARNING: package name empty, skipping" )
608+ logging .warning ("WARNING: Skipping empty package name. Package info:" )
609+ pprint (package )
599610 continue
611+
600612 # Trim any odd leading/trailing space or newlines
601613 package .name = package .name .strip ()
602614
@@ -617,9 +629,8 @@ def main():
617629 if package .external_references :
618630 foundpurl = False
619631 for ref in package .external_references :
620- # There can be multiple extrefs - try to locate a purl
621- # If there should happen to be multiple purls,
622- # we only consider the first.
632+ # There can be multiple extrefs; try to locate a purl.
633+ # If there are multiple purls, use the first one.
623634 if (ref .reference_type == "purl" ):
624635 foundpurl = True
625636 kb_match = find_comp_in_kb (ref .locator )
@@ -668,8 +679,6 @@ def main():
668679 if kb_match :
669680 print (f" WARNING: { matchname } { matchver } in KB but not in SBOM" )
670681 add_to_sbom (proj_version_url , kb_match ['version' ])
671- # TODO TEMP DEBUG TO CATCH THIS
672- quit ()
673682 # short-circuit the rest
674683 continue
675684
0 commit comments