|
1 | 1 |
|
2 | 2 | import pathlib
|
| 3 | + |
| 4 | +import urllib3.util |
3 | 5 | from contentctl.input.director import Director, DirectorOutputDto
|
4 | 6 | from contentctl.objects.config import validate
|
5 | 7 | from contentctl.enrichments.attack_enrichment import AttackEnrichment
|
@@ -36,10 +38,8 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
|
36 | 38 | director.execute(input_dto)
|
37 | 39 | self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
|
38 | 40 | if input_dto.data_source_TA_validation:
|
39 |
| - if self.validate_latest_TA_information(director_output_dto.data_sources) != 1: |
40 |
| - print("All TA versions are up to date.") |
41 |
| - else: |
42 |
| - raise Exception("One or more TA versions are out of date. Please update the data source with the latest version.") |
| 41 | + self.validate_latest_TA_information(director_output_dto.data_sources) |
| 42 | + |
43 | 43 | return director_output_dto
|
44 | 44 |
|
45 | 45 |
|
@@ -81,27 +81,35 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o
|
81 | 81 | return
|
82 | 82 |
|
83 | 83 |
|
84 |
| - def validate_latest_TA_information(self, data_sources: list[DataSource]) -> int: |
| 84 | + def validate_latest_TA_information(self, data_sources: list[DataSource]) -> None: |
85 | 85 | validated_TAs: list[tuple[str, str]] = []
|
86 |
| - error_occurred = False |
| 86 | + errors:list[str] = [] |
87 | 87 | print("----------------------")
|
88 | 88 | print("Validating latest TA:")
|
89 | 89 | print("----------------------")
|
90 | 90 | for data_source in data_sources:
|
91 | 91 | for supported_TA in data_source.supported_TA:
|
92 |
| - ta_identifier = (supported_TA["name"], supported_TA["version"]) |
| 92 | + ta_identifier = (supported_TA.name, supported_TA.version) |
93 | 93 | if ta_identifier in validated_TAs:
|
94 | 94 | continue
|
95 |
| - if "url" in supported_TA: |
| 95 | + if supported_TA.url is not None: |
96 | 96 | validated_TAs.append(ta_identifier)
|
97 |
| - uid = int(supported_TA["url"].rstrip('/').split("/")[-1]) |
| 97 | + uid = int(str(supported_TA.url).rstrip('/').split("/")[-1]) |
98 | 98 | try:
|
99 | 99 | splunk_app = SplunkApp(app_uid=uid)
|
100 |
| - if splunk_app.latest_version != supported_TA["version"]: |
101 |
| - raise Exception(f"Version mismatch for TA {supported_TA['name']}: " |
102 |
| - f"Latest version on Splunkbase is {splunk_app.latest_version}, " |
103 |
| - f"but version {supported_TA['version']} is specified in the data source {data_source.name}.") |
| 100 | + if splunk_app.latest_version != supported_TA.version: |
| 101 | + errors.append(f"Version mismatch in '{data_source.file_path}' supported TA '{supported_TA.name}'" |
| 102 | + f"\n Latest version on Splunkbase : {splunk_app.latest_version}" |
| 103 | + f"\n Version specified in data source: {supported_TA.version}") |
104 | 104 | except Exception as e:
|
105 |
| - print(f"Error processing TA {supported_TA['name']}: {str(e)}") |
106 |
| - error_occurred = True |
107 |
| - return 1 if error_occurred else 0 |
| 105 | + errors.append(f"Error processing checking version of TA {supported_TA.name}: {str(e)}") |
| 106 | + |
| 107 | + if len(errors) > 0: |
| 108 | + errorString = '\n\n'.join(errors) |
| 109 | + raise Exception(f"[{len(errors)}] or more TA versions are out of date or have other errors." |
| 110 | + f"Please update the following data sources with the latest versions of " |
| 111 | + f"their supported tas:\n\n{errorString}") |
| 112 | + print("All TA versions are up to date.") |
| 113 | + |
| 114 | + |
| 115 | + |
0 commit comments