1
1
"""
2
2
Experimental script for bulk generation of MaD models based on a list of projects.
3
+
4
+ Note: This file must be formatted using the Black Python formatter.
3
5
"""
4
6
5
7
import os .path
24
26
)
25
27
build_dir = os .path .join (gitroot , "mad-generation-build" )
26
28
29
+
27
30
# A project to generate models for
28
31
class Project (TypedDict ):
29
32
"""
@@ -132,7 +135,9 @@ def clone_projects(projects: List[Project]) -> List[tuple[Project, str]]:
132
135
return project_dirs
133
136
134
137
135
- def build_database (language : str , extractor_options , project : Project , project_dir : str ) -> str | None :
138
+ def build_database (
139
+ language : str , extractor_options , project : Project , project_dir : str
140
+ ) -> str | None :
136
141
"""
137
142
Build a CodeQL database for a project.
138
143
@@ -179,6 +184,7 @@ def build_database(language: str, extractor_options, project: Project, project_d
179
184
180
185
return database_dir
181
186
187
+
182
188
def generate_models (args , name : str , database_dir : str ) -> None :
183
189
"""
184
190
Generate models for a project.
@@ -196,7 +202,10 @@ def generate_models(args, name: str, database_dir: str) -> None:
196
202
generator .setenvironment (database = database_dir , folder = name )
197
203
generator .run ()
198
204
199
- def build_databases_from_projects (language : str , extractor_options , projects : List [Project ]) -> List [tuple [str , str | None ]]:
205
+
206
+ def build_databases_from_projects (
207
+ language : str , extractor_options , projects : List [Project ]
208
+ ) -> List [tuple [str , str | None ]]:
200
209
"""
201
210
Build databases for all projects in parallel.
202
211
@@ -215,11 +224,15 @@ def build_databases_from_projects(language: str, extractor_options, projects: Li
215
224
# Phase 2: Build databases for all projects
216
225
print ("\n === Phase 2: Building databases ===" )
217
226
database_results = [
218
- (project ["name" ], build_database (language , extractor_options , project , project_dir ))
227
+ (
228
+ project ["name" ],
229
+ build_database (language , extractor_options , project , project_dir ),
230
+ )
219
231
for project , project_dir in project_dirs
220
232
]
221
233
return database_results
222
234
235
+
223
236
def github (url : str , pat : str , extra_headers : dict [str , str ] = {}) -> dict :
224
237
"""
225
238
Download a JSON file from GitHub using a personal access token (PAT).
@@ -230,14 +243,15 @@ def github(url: str, pat: str, extra_headers: dict[str, str] = {}) -> dict:
230
243
Returns:
231
244
The JSON response as a dictionary.
232
245
"""
233
- headers = { "Authorization" : f"token { pat } " } | extra_headers
246
+ headers = {"Authorization" : f"token { pat } " } | extra_headers
234
247
response = requests .get (url , headers = headers )
235
248
if response .status_code != 200 :
236
249
print (f"Failed to download JSON: { response .status_code } { response .text } " )
237
250
sys .exit (1 )
238
251
else :
239
252
return response .json ()
240
253
254
+
241
255
def download_artifact (url : str , artifact_name : str , pat : str ) -> str :
242
256
"""
243
257
Download a GitHub Actions artifact from a given URL.
@@ -248,7 +262,7 @@ def download_artifact(url: str, artifact_name: str, pat: str) -> str:
248
262
Returns:
249
263
The path to the downloaded artifact file.
250
264
"""
251
- headers = { "Authorization" : f"token { pat } " , "Accept" : "application/vnd.github+json" }
265
+ headers = {"Authorization" : f"token { pat } " , "Accept" : "application/vnd.github+json" }
252
266
response = requests .get (url , stream = True , headers = headers )
253
267
zipName = artifact_name + ".zip"
254
268
if response .status_code == 200 :
@@ -262,15 +276,20 @@ def download_artifact(url: str, artifact_name: str, pat: str) -> str:
262
276
print (f"Failed to download file. Status code: { response .status_code } " )
263
277
sys .exit (1 )
264
278
279
+
265
280
def remove_extension (filename : str ) -> str :
266
281
while "." in filename :
267
282
filename , _ = os .path .splitext (filename )
268
283
return filename
269
284
285
+
270
286
def pretty_name_from_artifact_name (artifact_name : str ) -> str :
271
287
return artifact_name .split ("___" )[1 ]
272
288
273
- def download_dca_databases (experiment_name : str , pat : str , projects ) -> List [tuple [str , str | None ]]:
289
+
290
+ def download_dca_databases (
291
+ experiment_name : str , pat : str , projects
292
+ ) -> List [tuple [str , str | None ]]:
274
293
"""
275
294
Download databases from a DCA experiment.
276
295
Args:
@@ -282,58 +301,81 @@ def download_dca_databases(experiment_name: str, pat: str, projects) -> List[tup
282
301
"""
283
302
database_results = []
284
303
print ("\n === Finding projects ===" )
285
- response = github (f"https://raw.githubusercontent.com/github/codeql-dca-main/data/{ experiment_name } /reports/downloads.json" , pat )
304
+ response = github (
305
+ f"https://raw.githubusercontent.com/github/codeql-dca-main/data/{ experiment_name } /reports/downloads.json" ,
306
+ pat ,
307
+ )
286
308
targets = response ["targets" ]
287
309
for target , data in targets .items ():
288
- downloads = data ["downloads" ]
289
- analyzed_database = downloads ["analyzed_database" ]
290
- artifact_name = analyzed_database ["artifact_name" ]
291
- pretty_name = pretty_name_from_artifact_name (artifact_name )
292
-
293
- if not pretty_name in [project ["name" ] for project in projects ]:
294
- print (f"Skipping { pretty_name } as it is not in the list of projects" )
295
- continue
296
-
297
- repository = analyzed_database ["repository" ]
298
- run_id = analyzed_database ["run_id" ]
299
- print (f"=== Finding artifact: { artifact_name } ===" )
300
- response = github (f"https://api.github.com/repos/{ repository } /actions/runs/{ run_id } /artifacts" , pat , { "Accept" : "application/vnd.github+json" })
301
- artifacts = response ["artifacts" ]
302
- artifact_map = {artifact ["name" ]: artifact for artifact in artifacts }
303
- print (f"=== Downloading artifact: { artifact_name } ===" )
304
- archive_download_url = artifact_map [artifact_name ]["archive_download_url" ]
305
- artifact_zip_location = download_artifact (archive_download_url , artifact_name , pat )
306
- print (f"=== Extracting artifact: { artifact_name } ===" )
307
- # The database is in a zip file, which contains a tar.gz file with the DB
308
- # First we open the zip file
309
- with zipfile .ZipFile (artifact_zip_location , 'r' ) as zip_ref :
310
- artifact_unzipped_location = os .path .join (build_dir , artifact_name )
311
- # And then we extract it to build_dir/artifact_name
312
- zip_ref .extractall (artifact_unzipped_location )
313
- # And then we iterate over the contents of the extracted directory
314
- # and extract the tar.gz files inside it
315
- for entry in os .listdir (artifact_unzipped_location ):
316
- artifact_tar_location = os .path .join (artifact_unzipped_location , entry )
317
- with tarfile .open (artifact_tar_location , "r:gz" ) as tar_ref :
318
- # And we just untar it to the same directory as the zip file
319
- tar_ref .extractall (artifact_unzipped_location )
320
- database_results .append ((pretty_name , os .path .join (artifact_unzipped_location , remove_extension (entry ))))
310
+ downloads = data ["downloads" ]
311
+ analyzed_database = downloads ["analyzed_database" ]
312
+ artifact_name = analyzed_database ["artifact_name" ]
313
+ pretty_name = pretty_name_from_artifact_name (artifact_name )
314
+
315
+ if not pretty_name in [project ["name" ] for project in projects ]:
316
+ print (f"Skipping { pretty_name } as it is not in the list of projects" )
317
+ continue
318
+
319
+ repository = analyzed_database ["repository" ]
320
+ run_id = analyzed_database ["run_id" ]
321
+ print (f"=== Finding artifact: { artifact_name } ===" )
322
+ response = github (
323
+ f"https://api.github.com/repos/{ repository } /actions/runs/{ run_id } /artifacts" ,
324
+ pat ,
325
+ {"Accept" : "application/vnd.github+json" },
326
+ )
327
+ artifacts = response ["artifacts" ]
328
+ artifact_map = {artifact ["name" ]: artifact for artifact in artifacts }
329
+ print (f"=== Downloading artifact: { artifact_name } ===" )
330
+ archive_download_url = artifact_map [artifact_name ]["archive_download_url" ]
331
+ artifact_zip_location = download_artifact (
332
+ archive_download_url , artifact_name , pat
333
+ )
334
+ print (f"=== Extracting artifact: { artifact_name } ===" )
335
+ # The database is in a zip file, which contains a tar.gz file with the DB
336
+ # First we open the zip file
337
+ with zipfile .ZipFile (artifact_zip_location , "r" ) as zip_ref :
338
+ artifact_unzipped_location = os .path .join (build_dir , artifact_name )
339
+ # And then we extract it to build_dir/artifact_name
340
+ zip_ref .extractall (artifact_unzipped_location )
341
+ # And then we iterate over the contents of the extracted directory
342
+ # and extract the tar.gz files inside it
343
+ for entry in os .listdir (artifact_unzipped_location ):
344
+ artifact_tar_location = os .path .join (artifact_unzipped_location , entry )
345
+ with tarfile .open (artifact_tar_location , "r:gz" ) as tar_ref :
346
+ # And we just untar it to the same directory as the zip file
347
+ tar_ref .extractall (artifact_unzipped_location )
348
+ database_results .append (
349
+ (
350
+ pretty_name ,
351
+ os .path .join (
352
+ artifact_unzipped_location , remove_extension (entry )
353
+ ),
354
+ )
355
+ )
321
356
print (f"\n === Extracted { len (database_results )} databases ===" )
322
357
323
358
def compare (a , b ):
324
- a_index = next (i for i , project in enumerate (projects ) if project ["name" ] == a [0 ])
325
- b_index = next (i for i , project in enumerate (projects ) if project ["name" ] == b [0 ])
359
+ a_index = next (
360
+ i for i , project in enumerate (projects ) if project ["name" ] == a [0 ]
361
+ )
362
+ b_index = next (
363
+ i for i , project in enumerate (projects ) if project ["name" ] == b [0 ]
364
+ )
326
365
return a_index - b_index
327
366
328
367
# Sort the database results based on the order in the projects file
329
368
return sorted (database_results , key = cmp_to_key (compare ))
330
-
369
+
370
+
331
371
def get_destination_for_project (config , name : str ) -> str :
332
372
return os .path .join (config ["destination" ], name )
333
373
374
+
334
375
def get_strategy (config ) -> str :
335
376
return config ["strategy" ].lower ()
336
377
378
+
337
379
def main (config , args ) -> None :
338
380
"""
339
381
Main function to handle the bulk generation of MaD models.
@@ -371,7 +413,9 @@ def main(config, args) -> None:
371
413
match get_strategy (config ):
372
414
case "repo" :
373
415
extractor_options = config .get ("extractor_options" , [])
374
- database_results = build_databases_from_projects (language , extractor_options , projects )
416
+ database_results = build_databases_from_projects (
417
+ language , extractor_options , projects
418
+ )
375
419
case "dca" :
376
420
experiment_name = args .dca
377
421
if experiment_name is None :
@@ -386,9 +430,7 @@ def main(config, args) -> None:
386
430
# Phase 3: Generate models for all projects
387
431
print ("\n === Phase 3: Generating models ===" )
388
432
389
- failed_builds = [
390
- project for project , db_dir in database_results if db_dir is None
391
- ]
433
+ failed_builds = [project for project , db_dir in database_results if db_dir is None ]
392
434
if failed_builds :
393
435
print (
394
436
f"ERROR: { len (failed_builds )} database builds failed: { ', ' .join (failed_builds )} "
@@ -406,15 +448,36 @@ def main(config, args) -> None:
406
448
if database_dir is not None :
407
449
generate_models (args , project , database_dir )
408
450
451
+
409
452
if __name__ == "__main__" :
410
453
parser = argparse .ArgumentParser ()
411
- parser .add_argument ("--config" , type = str , help = "Path to the configuration file." , required = True )
412
- parser .add_argument ("--dca" , type = str , help = "Name of a DCA run that built all the projects" , required = False )
413
- parser .add_argument ("--pat" , type = str , help = "PAT token to grab DCA databases (the same as the one you use for DCA)" , required = False )
414
- parser .add_argument ("--lang" , type = str , help = "The language to generate models for" , required = True )
415
- parser .add_argument ("--with-sources" , action = "store_true" , help = "Generate sources" , required = False )
416
- parser .add_argument ("--with-sinks" , action = "store_true" , help = "Generate sinks" , required = False )
417
- parser .add_argument ("--with-summaries" , action = "store_true" , help = "Generate sinks" , required = False )
454
+ parser .add_argument (
455
+ "--config" , type = str , help = "Path to the configuration file." , required = True
456
+ )
457
+ parser .add_argument (
458
+ "--dca" ,
459
+ type = str ,
460
+ help = "Name of a DCA run that built all the projects" ,
461
+ required = False ,
462
+ )
463
+ parser .add_argument (
464
+ "--pat" ,
465
+ type = str ,
466
+ help = "PAT token to grab DCA databases (the same as the one you use for DCA)" ,
467
+ required = False ,
468
+ )
469
+ parser .add_argument (
470
+ "--lang" , type = str , help = "The language to generate models for" , required = True
471
+ )
472
+ parser .add_argument (
473
+ "--with-sources" , action = "store_true" , help = "Generate sources" , required = False
474
+ )
475
+ parser .add_argument (
476
+ "--with-sinks" , action = "store_true" , help = "Generate sinks" , required = False
477
+ )
478
+ parser .add_argument (
479
+ "--with-summaries" , action = "store_true" , help = "Generate sinks" , required = False
480
+ )
418
481
args = parser .parse_args ()
419
482
420
483
# Load config file
0 commit comments