|
12 | 12 | from requests import Response, Session, codes
|
13 | 13 |
|
14 | 14 | from .config import DEFAULT_API_VERSION, DEFAULT_BASE_URI, DEFAULT_TIMEOUT
|
15 |
| -from .iac_models import IaCScanParameters, IaCScanParametersSchema, IaCScanResult |
| 15 | +from .iac_models import ( |
| 16 | + IaCDiffScanResult, |
| 17 | + IaCScanParameters, |
| 18 | + IaCScanParametersSchema, |
| 19 | + IaCScanResult, |
| 20 | +) |
16 | 21 | from .models import (
|
17 | 22 | Detail,
|
18 | 23 | Document,
|
@@ -270,6 +275,7 @@ def post(
|
270 | 275 | **kwargs: Any,
|
271 | 276 | ) -> Response:
|
272 | 277 | # Be aware that self.iac_directory_scan bypass this method and calls self.request directly.
|
| 278 | + # self.iac_diff_scan also bypass this method |
273 | 279 | return self.request(
|
274 | 280 | "post",
|
275 | 281 | endpoint=endpoint,
|
@@ -462,14 +468,23 @@ def create_honeytoken(
|
462 | 468 | result.status_code = resp.status_code
|
463 | 469 | return result
|
464 | 470 |
|
465 |
| - # For IaC Scans |
466 | 471 | def iac_directory_scan(
|
467 | 472 | self,
|
468 | 473 | directory: Path,
|
469 | 474 | filenames: List[str],
|
470 | 475 | scan_parameters: IaCScanParameters,
|
471 | 476 | extra_headers: Optional[Dict[str, str]] = None,
|
472 | 477 | ) -> Union[Detail, IaCScanResult]:
|
| 478 | + """ |
| 479 | + iac_directory_scan handles the /iac_scan endpoint of the API. |
| 480 | +
|
| 481 | + :param directory: path to the directory to scan |
| 482 | + :param filenames: filenames of the directory to include in the scan |
| 483 | + :param scan_parameters: minimum severities wanted and policies to ignore |
| 484 | + example: {"ignored_policies":["GG_IAC_0003"],"minimum_severity":"HIGH"} |
| 485 | + :param extra_headers: optional extra headers to add to the request |
| 486 | + :return: ScanResult response and status code |
| 487 | + """ |
473 | 488 | tar = _create_tar(directory, filenames)
|
474 | 489 | result: Union[Detail, IaCScanResult]
|
475 | 490 | try:
|
@@ -498,6 +513,55 @@ def iac_directory_scan(
|
498 | 513 |
|
499 | 514 | return result
|
500 | 515 |
|
| 516 | + def iac_diff_scan( |
| 517 | + self, |
| 518 | + reference: bytes, |
| 519 | + current: bytes, |
| 520 | + scan_parameters: IaCScanParameters, |
| 521 | + extra_headers: Optional[Dict[str, str]] = None, |
| 522 | + ) -> Union[Detail, IaCDiffScanResult]: |
| 523 | + """ |
| 524 | + iac_diff_scan handles the /iac_diff_scan endpoint of the API. |
| 525 | +
|
| 526 | + Scan two directories and compare their vulnerabilities. |
| 527 | + Vulnerabilities in reference but not in current are considered "new". |
| 528 | + Vulnerabilities in both reference and current are considered "unchanged". |
| 529 | + Vulnerabilities in current but not in reference are considered "deleted". |
| 530 | +
|
| 531 | + :param reference: tar file containing the reference directory. Usually an incoming commit |
| 532 | + :param current: tar file of the current directory. Usually HEAD |
| 533 | + :param scan_parameters: minimum severities wanted and policies to ignore |
| 534 | + example: {"ignored_policies":["GG_IAC_0003"],"minimum_severity":"HIGH"} |
| 535 | + :param extra_headers: optional extra headers to add to the request |
| 536 | + :return: ScanResult response and status code |
| 537 | + """ |
| 538 | + result: Union[Detail, IaCDiffScanResult] |
| 539 | + try: |
| 540 | + # bypass self.post because data argument is needed in self.request and self.post use it as json |
| 541 | + resp = self.request( |
| 542 | + "post", |
| 543 | + endpoint="iac_diff_scan", |
| 544 | + extra_headers=extra_headers, |
| 545 | + files={ |
| 546 | + "reference": reference, |
| 547 | + "current": current, |
| 548 | + }, |
| 549 | + data={ |
| 550 | + "scan_parameters": IaCScanParametersSchema().dumps(scan_parameters), |
| 551 | + }, |
| 552 | + ) |
| 553 | + except requests.exceptions.ReadTimeout: |
| 554 | + result = Detail("The request timed out.") |
| 555 | + result.status_code = 504 |
| 556 | + else: |
| 557 | + if is_ok(resp): |
| 558 | + result = IaCDiffScanResult.from_dict(resp.json()) |
| 559 | + else: |
| 560 | + result = load_detail(resp) |
| 561 | + |
| 562 | + result.status_code = resp.status_code |
| 563 | + return result |
| 564 | + |
501 | 565 | def read_metadata(self) -> Optional[Detail]:
|
502 | 566 | """
|
503 | 567 | Fetch server preferences and store them in `self.secret_scan_preferences`.
|
|
0 commit comments