Skip to content

Commit 289c3ec

Browse files
committed
Improve how we determine what file extensions are
allowed in the lookups folder. A new function is introduced that improves checking on what file extensions are allowed in any folder which should supercede most usage of the Utils.get_all_yml_files_from_directory function in the future.
1 parent a8de0b7 commit 289c3ec

File tree

2 files changed

+81
-52
lines changed

2 files changed

+81
-52
lines changed

contentctl/actions/validate.py

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,11 @@
1-
import sys
2-
import pathlib
3-
from dataclasses import dataclass
4-
5-
from pydantic import ValidationError
6-
from typing import Union
71

8-
from contentctl.objects.enums import SecurityContentProduct
9-
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import (
10-
SecurityContentObject_Abstract,
11-
)
2+
import pathlib
123
from contentctl.input.director import Director, DirectorOutputDto
13-
144
from contentctl.objects.config import validate
155
from contentctl.enrichments.attack_enrichment import AttackEnrichment
166
from contentctl.enrichments.cve_enrichment import CveEnrichment
177
from contentctl.objects.atomic import AtomicTest
8+
from contentctl.helper.utils import Utils
189

1910

2011
class Validate:
@@ -42,49 +33,44 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
4233

4334
director = Director(director_output_dto)
4435
director.execute(input_dto)
45-
self.ensure_no_orphaned_lookup_files(input_dto.path, director_output_dto)
36+
self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
4637
return director_output_dto
4738

48-
def ensure_no_orphaned_lookup_files(self, repo_path:pathlib.Path, director_output_dto:DirectorOutputDto):
49-
# get all files in the lookup folder
50-
usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if lookup.filename is not None] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None]
39+
40+
def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_output_dto:DirectorOutputDto):
41+
"""
42+
This function ensures that only files which are relevant to lookups are included in the lookups folder.
43+
This means that a file must be either:
44+
1. A lookup YML (.yml)
45+
2. A lookup CSV (.csv) which is referenced by a YML
46+
3. A lookup MLMODEL (.mlmodel) which is referenced by a YML.
47+
48+
All other files, includes CSV and MLMODEL files which are NOT
49+
referenced by a YML, will generate an exception from this function.
50+
51+
Args:
52+
repo_path (pathlib.Path): path to the root of the app
53+
director_output_dto (DirectorOutputDto): director object with all constructed content
54+
55+
Raises:
56+
Exception: An Exception will be raised if there are any non .yml, .csv, or .mlmodel
57+
files in this directory. Additionally, an exception will be raised if there
58+
exists one or more .csv or .mlmodel files that are not referenced by at least 1
59+
detection .yml file in this directory.
60+
This avoids having additional, unused files in this directory that may be copied into
61+
the app when it is built (which can cause appinspect errors or larger app size.)
62+
"""
5163
lookupsDirectory = repo_path/"lookups"
52-
unusedLookupFiles:list[pathlib.Path] = [testFile for testFile in lookupsDirectory.glob("**/*.*") if testFile not in usedLookupFiles]
64+
65+
# Get all of the files referneced by Lookups
66+
usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if lookup.filename is not None] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None]
67+
68+
# Get all of the mlmodel and csv files in the lookups directory
69+
csvAndMlmodelFiles = Utils.get_security_content_files_from_directory(lookupsDirectory, allowedFileExtensions=[".yml",".csv",".mlmodel"], fileExtensionsToReturn=[".csv",".mlmodel"])
70+
71+
# Generate an exception of any csv or mlmodel files exist but are not used
72+
unusedLookupFiles:list[pathlib.Path] = [testFile for testFile in csvAndMlmodelFiles if testFile not in usedLookupFiles]
5373
if len(unusedLookupFiles) > 0:
54-
raise ValueError(f"The following files exist in '{lookupsDirectory}', but either do not end in .yml, .csv, or .mlmodel or are not used by a YML file: {[str(path) for path in unusedLookupFiles]}")
74+
raise Exception(f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}")
5575
return
56-
57-
58-
def validate_duplicate_uuids(
59-
self, security_content_objects: list[SecurityContentObject_Abstract]
60-
):
61-
all_uuids = set()
62-
duplicate_uuids = set()
63-
for elem in security_content_objects:
64-
if elem.id in all_uuids:
65-
# The uuid has been found more than once
66-
duplicate_uuids.add(elem.id)
67-
else:
68-
# This is the first time the uuid has been found
69-
all_uuids.add(elem.id)
70-
71-
if len(duplicate_uuids) == 0:
72-
return
73-
74-
# At least once duplicate uuid has been found. Enumerate all
75-
# the pieces of content that use duplicate uuids
76-
duplicate_messages = []
77-
for uuid in duplicate_uuids:
78-
duplicate_uuid_content = [
79-
str(content.file_path)
80-
for content in security_content_objects
81-
if content.id in duplicate_uuids
82-
]
83-
duplicate_messages.append(
84-
f"Duplicate UUID [{uuid}] in {duplicate_uuid_content}"
85-
)
86-
87-
raise ValueError(
88-
"ERROR: Duplicate ID(s) found in objects:\n"
89-
+ "\n - ".join(duplicate_messages)
90-
)
76+

contentctl/helper/utils.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,49 @@ def get_all_yml_files_from_directory(path: str) -> list[pathlib.Path]:
3434
listOfFiles.append(pathlib.Path(os.path.join(dirpath, file)))
3535

3636
return sorted(listOfFiles)
37+
38+
@staticmethod
39+
def get_security_content_files_from_directory(path: pathlib.Path, allowedFileExtensions:list[str]=[".yml"], fileExtensionsToReturn:list[str]=[".yml"]) -> list[pathlib.Path]:
40+
41+
"""
42+
Get all of the Security Content Object Files rooted in a given directory. These will almost
43+
certain be YML files, but could be other file types as specified by the user
44+
45+
Args:
46+
path (pathlib.Path): The root path at which to enumerate all Security Content Files. All directories will be traversed.
47+
allowedFileExtensions (set[str], optional): File extensions which are allowed to be present in this directory. In most cases, we do not want to allow the presence of non-YML files. Defaults to [".yml"].
48+
fileExtensionsToReturn (set[str], optional): Filenames with extensions that should be returned from this function. For example, the lookups/ directory contains YML, CSV, and MLMODEL directories, but only the YMLs are Security Content Objects for constructing Lookyps. Defaults to[".yml"].
49+
50+
Raises:
51+
Exception: Will raise an exception if allowedFileExtensions is not a subset of fileExtensionsToReturn.
52+
Exception: Will raise an exception if the path passed to the function does not exist or is not a directory
53+
Exception: Will raise an exception if there are any files rooted in the directory which are not in allowedFileExtensions
54+
55+
Returns:
56+
list[pathlib.Path]: list of files with an extension in fileExtensionsToReturn found in path
57+
"""
58+
if not set(fileExtensionsToReturn).issubset(set(allowedFileExtensions)):
59+
raise Exception(f"allowedFileExtensions {allowedFileExtensions} MUST be a subset of fileExtensionsToReturn {fileExtensionsToReturn}, but it is not")
60+
61+
if not path.exists() or not path.is_dir():
62+
raise Exception(f"Unable to get security_content files, required directory '{str(path)}' does not exist or is not a directory")
63+
64+
allowedFiles:list[pathlib.Path] = []
65+
erroneousFiles:list[pathlib.Path] = []
66+
#Get every single file extension
67+
for filePath in path.glob("**/*.*"):
68+
if filePath.suffix in allowedFileExtensions:
69+
# Yes these are allowed
70+
allowedFiles.append(filePath)
71+
else:
72+
# No these have not been allowed
73+
erroneousFiles.append(filePath)
74+
75+
if len(erroneousFiles):
76+
raise Exception(f"The following files are not allowed in the directory '{path}'. Only files with the extensions {allowedFileExtensions} are allowed:{[str(filePath) for filePath in erroneousFiles]}")
77+
78+
# There were no errorneous files, so return the requested files
79+
return [filePath for filePath in allowedFiles if filePath.suffix in fileExtensionsToReturn]
3780

3881
@staticmethod
3982
def get_all_yml_files_from_directory_one_layer_deep(path: str) -> list[pathlib.Path]:

0 commit comments

Comments
 (0)