|
9 | 9 | import requests
|
10 | 10 | from requests import Response, Session, codes
|
11 | 11 |
|
12 |
| -from .config import ( |
13 |
| - DEFAULT_API_VERSION, |
14 |
| - DEFAULT_BASE_URI, |
15 |
| - DEFAULT_TIMEOUT, |
16 |
| - MULTI_DOCUMENT_LIMIT, |
17 |
| -) |
| 12 | +from .config import DEFAULT_API_VERSION, DEFAULT_BASE_URI, DEFAULT_TIMEOUT |
18 | 13 | from .iac_models import (
|
19 | 14 | IaCScanParameters,
|
20 | 15 | IaCScanParametersSchema,
|
|
29 | 24 | MultiScanResult,
|
30 | 25 | QuotaResponse,
|
31 | 26 | ScanResult,
|
| 27 | + SecretScanPreferences, |
| 28 | + ServerMetadata, |
32 | 29 | )
|
33 | 30 |
|
34 | 31 |
|
@@ -121,6 +118,7 @@ class GGClient:
|
121 | 118 | timeout: Optional[float]
|
122 | 119 | user_agent: str
|
123 | 120 | extra_headers: Dict
|
| 121 | + secret_scan_preferences: SecretScanPreferences |
124 | 122 |
|
125 | 123 | def __init__(
|
126 | 124 | self,
|
@@ -178,6 +176,7 @@ def __init__(
|
178 | 176 | "Authorization": f"Token {api_key}",
|
179 | 177 | },
|
180 | 178 | )
|
| 179 | + self.secret_scan_preferences = SecretScanPreferences() |
181 | 180 |
|
182 | 181 | def request(
|
183 | 182 | self,
|
@@ -308,6 +307,9 @@ def content_scan(
|
308 | 307 | doc_dict["filename"] = filename
|
309 | 308 |
|
310 | 309 | request_obj = Document.SCHEMA.load(doc_dict)
|
| 310 | + Document.SCHEMA.validate_size( |
| 311 | + request_obj, self.secret_scan_preferences.maximum_document_size |
| 312 | + ) |
311 | 313 |
|
312 | 314 | resp = self.post(
|
313 | 315 | endpoint="scan",
|
@@ -344,16 +346,22 @@ def multi_content_scan(
|
344 | 346 | :param ignore_known_secrets: indicates whether known secrets should be ignored
|
345 | 347 | :return: Detail or ScanResult response and status code
|
346 | 348 | """
|
347 |
| - if len(documents) > MULTI_DOCUMENT_LIMIT: |
| 349 | + max_documents = self.secret_scan_preferences.maximum_documents_per_scan |
| 350 | + if len(documents) > max_documents: |
348 | 351 | raise ValueError(
|
349 |
| - f"too many documents submitted for scan (max={MULTI_DOCUMENT_LIMIT})" |
| 352 | + f"too many documents submitted for scan (max={max_documents})" |
350 | 353 | )
|
351 | 354 |
|
352 | 355 | if all(isinstance(doc, dict) for doc in documents):
|
353 | 356 | request_obj = Document.SCHEMA.load(documents, many=True)
|
354 | 357 | else:
|
355 | 358 | raise TypeError("each document must be a dict")
|
356 | 359 |
|
| 360 | + for document in request_obj: |
| 361 | + Document.SCHEMA.validate_size( |
| 362 | + document, self.secret_scan_preferences.maximum_document_size |
| 363 | + ) |
| 364 | + |
357 | 365 | params = (
|
358 | 366 | {"ignore_known_secrets": ignore_known_secrets}
|
359 | 367 | if ignore_known_secrets
|
@@ -472,3 +480,23 @@ def iac_directory_scan(
|
472 | 480 | result.status_code = resp.status_code
|
473 | 481 |
|
474 | 482 | return result
|
| 483 | + |
| 484 | + def read_metadata(self) -> Optional[Detail]: |
| 485 | + """ |
| 486 | + Fetch server preferences and store them in `self.secret_scan_preferences`. |
| 487 | + These preferences are then used by all future secret scans. |
| 488 | +
|
| 489 | + Note that the call fails if the API key is not valid. |
| 490 | +
|
| 491 | + :return: a Detail instance in case of error, None otherwise |
| 492 | + """ |
| 493 | + resp = self.get("metadata") |
| 494 | + |
| 495 | + if not is_ok(resp): |
| 496 | + result = load_detail(resp) |
| 497 | + result.status_code = resp.status_code |
| 498 | + return result |
| 499 | + metadata = ServerMetadata.SCHEMA.load(resp.json()) |
| 500 | + |
| 501 | + self.secret_scan_preferences = metadata.secret_scan_preferences |
| 502 | + return None |
0 commit comments