|
12 | 12 | from requests import Response, Session, codes
|
13 | 13 |
|
14 | 14 | from .config import DEFAULT_API_VERSION, DEFAULT_BASE_URI, DEFAULT_TIMEOUT
|
15 |
| -from .iac_models import IaCScanParameters, IaCScanParametersSchema, IaCScanResult |
| 15 | +from .iac_models import ( |
| 16 | + IaCDiffScanResult, |
| 17 | + IaCDiffScanResultSchema, |
| 18 | + IaCScanParameters, |
| 19 | + IaCScanParametersSchema, |
| 20 | + IaCScanResult, |
| 21 | +) |
16 | 22 | from .models import (
|
17 | 23 | Detail,
|
18 | 24 | Document,
|
@@ -270,6 +276,7 @@ def post(
|
270 | 276 | **kwargs: Any,
|
271 | 277 | ) -> Response:
|
272 | 278 | # Be aware that self.iac_directory_scan bypass this method and calls self.request directly.
|
| 279 | + # self.iac_diff_scan also bypass this method |
273 | 280 | return self.request(
|
274 | 281 | "post",
|
275 | 282 | endpoint=endpoint,
|
@@ -498,6 +505,41 @@ def iac_directory_scan(
|
498 | 505 |
|
499 | 506 | return result
|
500 | 507 |
|
| 508 | + # For IaC diff Scans |
| 509 | + def iac_diff_scan( |
| 510 | + self, |
| 511 | + reference: bytes, |
| 512 | + current: bytes, |
| 513 | + scan_parameters: IaCScanParameters, |
| 514 | + extra_headers: Optional[Dict[str, str]] = None, |
| 515 | + ) -> Union[Detail, IaCDiffScanResult]: |
| 516 | + result: Union[Detail, IaCDiffScanResult] |
| 517 | + try: |
| 518 | + # bypass self.post because data argument is needed in self.request and self.post use it as json |
| 519 | + resp = self.request( |
| 520 | + "post", |
| 521 | + endpoint="iac_diff_scan", |
| 522 | + extra_headers=extra_headers, |
| 523 | + files={ |
| 524 | + "reference": reference, |
| 525 | + "current": current, |
| 526 | + }, |
| 527 | + data={ |
| 528 | + "scan_parameters": IaCScanParametersSchema().dumps(scan_parameters), |
| 529 | + }, |
| 530 | + ) |
| 531 | + except requests.exceptions.ReadTimeout: |
| 532 | + result = Detail("The request timed out.") |
| 533 | + result.status_code = 504 |
| 534 | + else: |
| 535 | + if is_ok(resp): |
| 536 | + result = IaCDiffScanResultSchema.from_dict(resp.json()) # type: ignore |
| 537 | + else: |
| 538 | + result = load_detail(resp) |
| 539 | + |
| 540 | + result.status_code = resp.status_code |
| 541 | + return result |
| 542 | + |
501 | 543 | def read_metadata(self) -> Optional[Detail]:
|
502 | 544 | """
|
503 | 545 | Fetch server preferences and store them in `self.secret_scan_preferences`.
|
|
0 commit comments