|
1 | 1 | import pathlib
|
2 | 2 |
|
3 |
| -from contentctl.input.director import Director, DirectorOutputDto |
4 |
| -from contentctl.objects.config import validate |
5 | 3 | from contentctl.enrichments.attack_enrichment import AttackEnrichment
|
6 | 4 | from contentctl.enrichments.cve_enrichment import CveEnrichment
|
7 |
| -from contentctl.objects.atomic import AtomicEnrichment |
8 |
| -from contentctl.objects.lookup import FileBackedLookup |
| 5 | +from contentctl.helper.splunk_app import SplunkApp |
9 | 6 | from contentctl.helper.utils import Utils
|
| 7 | +from contentctl.input.director import Director, DirectorOutputDto, ValidationFailedError |
| 8 | +from contentctl.objects.atomic import AtomicEnrichment |
| 9 | +from contentctl.objects.config import validate |
10 | 10 | from contentctl.objects.data_source import DataSource
|
11 |
| -from contentctl.helper.splunk_app import SplunkApp |
| 11 | +from contentctl.objects.lookup import FileBackedLookup, RuntimeCSV |
12 | 12 |
|
13 | 13 |
|
14 | 14 | class Validate:
|
15 | 15 | def execute(self, input_dto: validate) -> DirectorOutputDto:
|
16 |
| - director_output_dto = DirectorOutputDto( |
17 |
| - AtomicEnrichment.getAtomicEnrichment(input_dto), |
18 |
| - AttackEnrichment.getAttackEnrichment(input_dto), |
19 |
| - CveEnrichment.getCveEnrichment(input_dto), |
20 |
| - [], |
21 |
| - [], |
22 |
| - [], |
23 |
| - [], |
24 |
| - [], |
25 |
| - [], |
26 |
| - [], |
27 |
| - [], |
28 |
| - [], |
29 |
| - [], |
30 |
| - ) |
| 16 | + try: |
| 17 | + director_output_dto = DirectorOutputDto( |
| 18 | + AtomicEnrichment.getAtomicEnrichment(input_dto), |
| 19 | + AttackEnrichment.getAttackEnrichment(input_dto), |
| 20 | + CveEnrichment.getCveEnrichment(input_dto), |
| 21 | + ) |
| 22 | + |
| 23 | + director = Director(director_output_dto) |
| 24 | + director.execute(input_dto) |
| 25 | + self.ensure_no_orphaned_files_in_lookups( |
| 26 | + input_dto.path, director_output_dto |
| 27 | + ) |
| 28 | + if input_dto.data_source_TA_validation: |
| 29 | + self.validate_latest_TA_information(director_output_dto.data_sources) |
31 | 30 |
|
32 |
| - director = Director(director_output_dto) |
33 |
| - director.execute(input_dto) |
34 |
| - self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto) |
35 |
| - if input_dto.data_source_TA_validation: |
36 |
| - self.validate_latest_TA_information(director_output_dto.data_sources) |
| 31 | + return director_output_dto |
37 | 32 |
|
38 |
| - return director_output_dto |
| 33 | + except ValidationFailedError: |
| 34 | + # Just re-raise without additional output since we already formatted everything |
| 35 | + raise SystemExit(1) |
39 | 36 |
|
40 | 37 | def ensure_no_orphaned_files_in_lookups(
|
41 | 38 | self, repo_path: pathlib.Path, director_output_dto: DirectorOutputDto
|
@@ -64,11 +61,14 @@ def ensure_no_orphaned_files_in_lookups(
|
64 | 61 | """
|
65 | 62 | lookupsDirectory = repo_path / "lookups"
|
66 | 63 |
|
67 |
| - # Get all of the files referneced by Lookups |
| 64 | + # Get all of the files referenced by Lookups |
68 | 65 | usedLookupFiles: list[pathlib.Path] = [
|
69 | 66 | lookup.filename
|
70 | 67 | for lookup in director_output_dto.lookups
|
| 68 | + # Of course Runtime CSVs do not have underlying CSV files, so make |
| 69 | + # sure that we do not check for that existence. |
71 | 70 | if isinstance(lookup, FileBackedLookup)
|
| 71 | + and not isinstance(lookup, RuntimeCSV) |
72 | 72 | ] + [
|
73 | 73 | lookup.file_path
|
74 | 74 | for lookup in director_output_dto.lookups
|
|
0 commit comments