|
26 | 26 | field_validator,
|
27 | 27 | model_validator,
|
28 | 28 | )
|
| 29 | +from requests import RequestException, head |
29 | 30 |
|
30 | 31 | from contentctl.helper.splunk_app import SplunkApp
|
31 | 32 | from contentctl.helper.utils import Utils
|
@@ -261,6 +262,37 @@ class init(Config_Base):
|
261 | 262 | )
|
262 | 263 |
|
263 | 264 |
|
| 265 | +# There can be a number of attack data file warning mapping exceptions, or errors, |
| 266 | +# that can occur when using attack data caches. In order to avoid very complex |
| 267 | +# output, we will only emit the verbose versions of these message once per file. |
| 268 | +# This is a non-intuitive place to put this, but it is good enough for now. |
| 269 | +ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS: set[str] = set() |
| 270 | + |
| 271 | + |
| 272 | +class AttackDataCache(BaseModel): |
| 273 | + base_url: str = Field( |
| 274 | + "This is the beginning of a URL that the data must begin with to map to this cache object." |
| 275 | + ) |
| 276 | + base_directory_name: str = Field( |
| 277 | + "This is the root folder name where the attack data should be downloaded to. Note that this path MUST be in the external_repos/ folder", |
| 278 | + pattern=r"^external_repos/.+", |
| 279 | + ) |
| 280 | + # suggested checkout information for our attack_data repo |
| 281 | + # curl https://attack-range-attack-data.s3.us-west-2.amazonaws.com/attack_data.tar.zstd | zstd --decompress | tar -x -C attack_data/ |
| 282 | + # suggested YML values for this: |
| 283 | + helptext: str | None = Field( |
| 284 | + default="This repo is set up to use test_data_caches. This can be extremely helpful in validating correct links for test attack_data and speeding up testing.\n" |
| 285 | + "Include the following in your contentctl.yml file to use this cache:\n\n" |
| 286 | + "test_data_caches:\n" |
| 287 | + "- base_url: https://media.githubusercontent.com/media/splunk/attack_data/master/\n" |
| 288 | + " base_directory_name: external_repos/attack_data\n\n" |
| 289 | + "In order to check out STRT Attack Data, you can use the following command:\n" |
| 290 | + "mkdir -p external_repos; curl https://attack-range-attack-data.s3.us-west-2.amazonaws.com/attack_data.tar.zstd | zstd --decompress | tar -x -C external_repos/\n" |
| 291 | + "or\n" |
| 292 | + """echo "First ensure git-lfs is enabled"; git clone https://github.com/splunk/attack_data external_repos/attack_data""" |
| 293 | + ) |
| 294 | + |
| 295 | + |
264 | 296 | class validate(Config_Base):
|
265 | 297 | model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True)
|
266 | 298 | enforce_deprecation_mapping_requirement: bool = Field(
|
@@ -291,10 +323,151 @@ class validate(Config_Base):
|
291 | 323 | default=False, description="Validate latest TA information from Splunkbase"
|
292 | 324 | )
|
293 | 325 |
|
| 326 | + test_data_caches: list[AttackDataCache] = Field( |
| 327 | + default=[], |
| 328 | + description="A list of attack data that can " |
| 329 | + "be used in lieu of the HTTPS download links " |
| 330 | + "of each test data file. This cache can significantly " |
| 331 | + "increase overall test speed, ensure the correctness of " |
| 332 | + "links at 'contentctl validate' time, and reduce errors " |
| 333 | + "associated with failed responses from file servers.", |
| 334 | + ) |
| 335 | + |
294 | 336 | @property
|
295 | 337 | def external_repos_path(self) -> pathlib.Path:
|
296 | 338 | return self.path / "external_repos"
|
297 | 339 |
|
| 340 | + # We can't make this a validator because the constructor |
| 341 | + # is called many times - we don't want to print this out many times. |
| 342 | + def check_test_data_caches(self) -> Self: |
| 343 | + """ |
| 344 | + Check that the test data caches actually exist at the specified paths. |
| 345 | + If they do exist, then do nothing. If they do not, then emit the helpext, but |
| 346 | + do not raise an exception. They are not required, but can significantly speed up |
| 347 | + and reduce the flakiness of tests by reducing failed HTTP requests. |
| 348 | + """ |
| 349 | + if not self.verbose: |
| 350 | + # Ignore the check and error output if we are not in verbose mode |
| 351 | + return self |
| 352 | + for cache in self.test_data_caches: |
| 353 | + cache_path = self.path / cache.base_directory_name |
| 354 | + if not cache_path.is_dir(): |
| 355 | + print(cache.helptext) |
| 356 | + else: |
| 357 | + build_date_file = cache_path / "cache_build_date.txt" |
| 358 | + git_hash_file = cache_path / "git_hash.txt" |
| 359 | + |
| 360 | + if build_date_file.is_file(): |
| 361 | + # This is a cache that was built by contentctl. We can use this to |
| 362 | + # determine if the cache is out of date. |
| 363 | + with open(build_date_file, "r") as f: |
| 364 | + build_date = f.read().strip() |
| 365 | + else: |
| 366 | + build_date = "<UNKNOWN_DATE>" |
| 367 | + if git_hash_file.is_file(): |
| 368 | + # This is a cache that was built by contentctl. We can use this to |
| 369 | + # determine if the cache is out of date. |
| 370 | + with open(git_hash_file, "r") as f: |
| 371 | + git_hash = f.read().strip() |
| 372 | + else: |
| 373 | + git_hash = "<UNKNOWN_HASH>" |
| 374 | + |
| 375 | + print( |
| 376 | + f"Found attack data cache at [{cache_path}]\n**Cache Build Date: {build_date}\n**Repo Git Hash : {git_hash}\n" |
| 377 | + ) |
| 378 | + |
| 379 | + return self |
| 380 | + |
| 381 | + def map_to_attack_data_cache( |
| 382 | + self, filename: HttpUrl | FilePath, verbose: bool = False |
| 383 | + ) -> HttpUrl | FilePath: |
| 384 | + if str(filename) in ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS: |
| 385 | + # This is already something that we have emitted a warning or |
| 386 | + # Exception for. We don't want to emit it again as it will |
| 387 | + # pollute the output. |
| 388 | + return filename |
| 389 | + |
| 390 | + # If this is simply a link to a file directly, then no mapping |
| 391 | + # needs to take place. Return the link to the file. |
| 392 | + if isinstance(filename, pathlib.Path): |
| 393 | + return filename |
| 394 | + |
| 395 | + if len(self.test_data_caches) == 0: |
| 396 | + return filename |
| 397 | + |
| 398 | + # Otherwise, this is a URL. See if its prefix matches one of the |
| 399 | + # prefixes in the list of caches |
| 400 | + for cache in self.test_data_caches: |
| 401 | + root_folder_path = self.path / cache.base_directory_name |
| 402 | + # See if this data file was in that path |
| 403 | + |
| 404 | + if str(filename).startswith(cache.base_url): |
| 405 | + new_file_name = str(filename).replace(cache.base_url, "") |
| 406 | + new_file_path = root_folder_path / new_file_name |
| 407 | + |
| 408 | + if not root_folder_path.is_dir(): |
| 409 | + # This has not been checked out. Even though we want to use this cache |
| 410 | + # whenever possible, we don't want to force it. |
| 411 | + return filename |
| 412 | + |
| 413 | + if new_file_path.is_file(): |
| 414 | + # We found the file in the cache. Return the new path |
| 415 | + return new_file_path |
| 416 | + |
| 417 | + # Any thing below here is non standard behavior that will produce either a warning message, |
| 418 | + # an error, or both. We onyl want to do this once for each file, even if it is used |
| 419 | + # across multiple different detections. |
| 420 | + ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS.add(str(filename)) |
| 421 | + |
| 422 | + # The cache exists, but we didn't find the file. We will emit an informational warning |
| 423 | + # for this, but this is not an exception. Instead, we will just fall back to using |
| 424 | + # the original URL. |
| 425 | + if verbose: |
| 426 | + # Give some extra context about missing attack data files/bad mapping |
| 427 | + try: |
| 428 | + h = head(str(filename)) |
| 429 | + h.raise_for_status() |
| 430 | + |
| 431 | + except RequestException: |
| 432 | + raise ValueError( |
| 433 | + f"Error resolving the attack_data file {filename}. " |
| 434 | + f"It was missing from the cache {cache.base_directory_name} and a download from the server failed." |
| 435 | + ) |
| 436 | + print( |
| 437 | + f"\nFilename {filename} not found in cache {cache.base_directory_name}, but exists on the server. " |
| 438 | + f"Your cache {cache.base_directory_name} may be out of date." |
| 439 | + ) |
| 440 | + return filename |
| 441 | + if verbose: |
| 442 | + # Any thing below here is non standard behavior that will produce either a warning message, |
| 443 | + # an error, or both. We onyl want to do this once for each file, even if it is used |
| 444 | + # across multiple different detections. |
| 445 | + ATTACK_DATA_CACHE_MAPPING_EXCEPTIONS.add(str(filename)) |
| 446 | + |
| 447 | + # Give some extra context about missing attack data files/bad mapping |
| 448 | + url = f"Attack Data : {filename}" |
| 449 | + prefixes = "".join( |
| 450 | + [ |
| 451 | + f"\n Valid Prefix: {cache.base_url}" |
| 452 | + for cache in self.test_data_caches |
| 453 | + ] |
| 454 | + ) |
| 455 | + # Give some extra context about missing attack data files/bad mapping |
| 456 | + try: |
| 457 | + h = head(str(filename)) |
| 458 | + h.raise_for_status() |
| 459 | + except RequestException: |
| 460 | + raise ValueError( |
| 461 | + f"Error resolving the attack_data file {filename}. It was missing from all caches and a download from the server failed.\n" |
| 462 | + f"{url}{prefixes}\n" |
| 463 | + ) |
| 464 | + |
| 465 | + print( |
| 466 | + f"\nAttack Data Missing from all caches, but present at URL:\n{url}{prefixes}" |
| 467 | + ) |
| 468 | + |
| 469 | + return filename |
| 470 | + |
298 | 471 | @property
|
299 | 472 | def mitre_cti_repo_path(self) -> pathlib.Path:
|
300 | 473 | return self.external_repos_path / "cti"
|
|
0 commit comments