1
1
from __future__ import annotations
2
+ from typing import TYPE_CHECKING
3
+ if TYPE_CHECKING :
4
+ from contentctl .objects .config import validate
5
+
2
6
from contentctl .input .yml_reader import YmlReader
3
7
from pydantic import BaseModel , model_validator , ConfigDict , FilePath , UUID4
8
+ import dataclasses
4
9
from typing import List , Optional , Dict , Union , Self
5
10
import pathlib
6
-
7
-
8
11
from enum import StrEnum , auto
9
-
12
+ import uuid
10
13
11
14
class SupportedPlatform (StrEnum ):
12
15
windows = auto ()
@@ -84,47 +87,23 @@ class AtomicTest(BaseModel):
84
87
dependencies : Optional [List [AtomicDependency ]] = None
85
88
dependency_executor_name : Optional [DependencyExecutorType ] = None
86
89
87
- @staticmethod
88
- def AtomicTestWhenEnrichmentIsDisabled (auto_generated_guid : UUID4 ) -> AtomicTest :
89
- return AtomicTest (name = "Placeholder Atomic Test (enrichment disabled)" ,
90
- auto_generated_guid = auto_generated_guid ,
91
- description = "This is a placeholder AtomicTest. Because enrichments were not enabled, it has not been validated against the real Atomic Red Team Repo." ,
92
- supported_platforms = [],
93
- executor = AtomicExecutor (name = "Placeholder Executor (enrichment disabled)" ,
94
- command = "Placeholder command (enrichment disabled)" ))
95
-
96
90
@staticmethod
97
91
def AtomicTestWhenTestIsMissing (auto_generated_guid : UUID4 ) -> AtomicTest :
98
92
return AtomicTest (name = "Missing Atomic" ,
99
93
auto_generated_guid = auto_generated_guid ,
100
94
description = "This is a placeholder AtomicTest. Either the auto_generated_guid is incorrect or it there was an exception while parsing its AtomicFile." ,
101
95
supported_platforms = [],
102
96
executor = AtomicExecutor (name = "Placeholder Executor (failed to find auto_generated_guid)" ,
103
- command = "Placeholder command (failed to find auto_generated_guid)" ))
104
-
105
-
106
- @classmethod
107
- def getAtomicByAtomicGuid (cls , guid : UUID4 , all_atomics :list [AtomicTest ] | None )-> AtomicTest :
108
- if all_atomics is None :
109
- return AtomicTest .AtomicTestWhenEnrichmentIsDisabled (guid )
110
- matching_atomics = [atomic for atomic in all_atomics if atomic .auto_generated_guid == guid ]
111
- if len (matching_atomics ) == 0 :
112
- raise ValueError (f"Unable to find atomic_guid { guid } in { len (all_atomics )} atomic_tests from ART Repo" )
113
- elif len (matching_atomics ) > 1 :
114
- raise ValueError (f"Found { len (matching_atomics )} matching tests for atomic_guid { guid } in { len (all_atomics )} atomic_tests from ART Repo" )
115
-
116
- return matching_atomics [0 ]
97
+ command = "Placeholder command (failed to find auto_generated_guid)" ))
117
98
118
99
@classmethod
119
- def parseArtRepo (cls , repo_path :pathlib .Path )-> List [AtomicFile ]:
120
- if not repo_path .is_dir ():
121
- print (f"WARNING: Atomic Red Team repo does NOT exist at { repo_path .absolute ()} . You can check it out with:\n * git clone --single-branch https://github.com/redcanaryco/atomic-red-team. This will ONLY throw a validation error if you reference atomid_guids in your detection(s)." )
122
- return []
100
+ def parseArtRepo (cls , repo_path :pathlib .Path )-> dict [uuid .UUID , AtomicTest ]:
101
+ test_mapping : dict [uuid .UUID , AtomicTest ] = {}
123
102
atomics_path = repo_path / "atomics"
124
103
if not atomics_path .is_dir ():
125
- print (f"WARNING: Atomic Red Team repo exists at { repo_path . absolute } , but atomics directory does NOT exist at { atomics_path . absolute () } . Was it deleted or renamed? This will ONLY throw a validation error if you reference atomid_guids in your detection(s)." )
126
- return []
127
-
104
+ raise FileNotFoundError (f"WARNING: Atomic Red Team repo exists at { repo_path } , "
105
+ f"but atomics directory does NOT exist at { atomics_path } . "
106
+ "Was it deleted or renamed?" )
128
107
129
108
atomic_files :List [AtomicFile ] = []
130
109
error_messages :List [str ] = []
@@ -133,45 +112,36 @@ def parseArtRepo(cls, repo_path:pathlib.Path)->List[AtomicFile]:
133
112
atomic_files .append (cls .constructAtomicFile (obj_path ))
134
113
except Exception as e :
135
114
error_messages .append (f"File [{ obj_path } ]\n { str (e )} " )
115
+
136
116
if len (error_messages ) > 0 :
137
117
exceptions_string = '\n \n ' .join (error_messages )
138
118
print (f"WARNING: The following [{ len (error_messages )} ] ERRORS were generated when parsing the Atomic Red Team Repo.\n "
139
119
"Please raise an issue so that they can be fixed at https://github.com/redcanaryco/atomic-red-team/issues.\n "
140
120
"Note that this is only a warning and contentctl will ignore Atomics contained in these files.\n "
141
121
f"However, if you have written a detection that references them, 'contentctl build --enrichments' will fail:\n \n { exceptions_string } " )
142
122
143
- return atomic_files
123
+ # Now iterate over all the files, collect all the tests, and return the dict mapping
124
+ redefined_guids :set [uuid .UUID ] = set ()
125
+ for atomic_file in atomic_files :
126
+ for atomic_test in atomic_file .atomic_tests :
127
+ if atomic_test .auto_generated_guid in test_mapping :
128
+ redefined_guids .add (atomic_test .auto_generated_guid )
129
+ else :
130
+ test_mapping [atomic_test .auto_generated_guid ] = atomic_test
131
+ if len (redefined_guids ) > 0 :
132
+ guids_string = '\n \t ' .join ([str (guid ) for guid in redefined_guids ])
133
+ raise Exception (f"The following [{ len (redefined_guids )} ] Atomic Test"
134
+ " auto_generated_guid(s) were defined more than once. "
135
+ f"auto_generated_guids MUST be unique:\n \t { guids_string } " )
136
+
137
+ print (f"Successfully parsed [{ len (test_mapping )} ] Atomic Red Team Tests!" )
138
+ return test_mapping
144
139
145
140
@classmethod
146
141
def constructAtomicFile (cls , file_path :pathlib .Path )-> AtomicFile :
147
142
yml_dict = YmlReader .load_file (file_path )
148
143
atomic_file = AtomicFile .model_validate (yml_dict )
149
144
return atomic_file
150
-
151
- @classmethod
152
- def getAtomicTestsFromArtRepo (cls , repo_path :pathlib .Path , enabled :bool = True )-> list [AtomicTest ] | None :
153
- # Get all the atomic files. Note that if the ART repo is not found, we will not throw an error,
154
- # but will not have any atomics. This means that if atomic_guids are referenced during validation,
155
- # validation for those detections will fail
156
- if not enabled :
157
- return None
158
-
159
- atomic_files = cls .getAtomicFilesFromArtRepo (repo_path )
160
-
161
- atomic_tests :List [AtomicTest ] = []
162
- for atomic_file in atomic_files :
163
- atomic_tests .extend (atomic_file .atomic_tests )
164
- print (f"Found [{ len (atomic_tests )} ] Atomic Simulations in the Atomic Red Team Repo!" )
165
- return atomic_tests
166
-
167
-
168
- @classmethod
169
- def getAtomicFilesFromArtRepo (cls , repo_path :pathlib .Path )-> List [AtomicFile ]:
170
- return cls .parseArtRepo (repo_path )
171
-
172
-
173
-
174
-
175
145
176
146
177
147
class AtomicFile (BaseModel ):
@@ -182,27 +152,31 @@ class AtomicFile(BaseModel):
182
152
atomic_tests : List [AtomicTest ]
183
153
184
154
155
+ class AtomicEnrichment (BaseModel ):
156
+ data : dict [uuid .UUID ,AtomicTest ] = dataclasses .field (default_factory = dict )
157
+ use_enrichment : bool = False
185
158
159
+ @classmethod
160
+ def getAtomicEnrichment (cls , config :validate )-> AtomicEnrichment :
161
+ enrichment = AtomicEnrichment (use_enrichment = config .enrichments )
162
+ if config .enrichments :
163
+ enrichment .data = AtomicTest .parseArtRepo (config .atomic_red_team_repo_path )
164
+
165
+ return enrichment
166
+
167
+ def getAtomic (self , atomic_guid : uuid .UUID )-> AtomicTest :
168
+ if self .use_enrichment :
169
+ if atomic_guid in self .data :
170
+ return self .data [atomic_guid ]
171
+ else :
172
+ raise Exception (f"Atomic with GUID { atomic_guid } not found." )
173
+ else :
174
+ # If enrichment is not enabled, for the sake of compatability
175
+ # return a stub test with no useful or meaningful information.
176
+ return AtomicTest .AtomicTestWhenTestIsMissing (atomic_guid )
186
177
187
- # ATOMICS_PATH = pathlib.Path("./atomics")
188
- # atomic_objects = []
189
- # atomic_simulations = []
190
- # for obj_path in ATOMICS_PATH.glob("**/T*.yaml"):
191
- # try:
192
- # with open(obj_path, 'r', encoding="utf-8") as obj_handle:
193
- # obj_data = yaml.load(obj_handle, Loader=yaml.CSafeLoader)
194
- # atomic_obj = AtomicFile.model_validate(obj_data)
195
- # except Exception as e:
196
- # print(f"Error parsing object at path {obj_path}: {str(e)}")
197
- # print(f"We have successfully parsed {len(atomic_objects)}, however!")
198
- # sys.exit(1)
199
-
200
- # print(f"Successfully parsed {obj_path}!")
201
- # atomic_objects.append(atomic_obj)
202
- # atomic_simulations += atomic_obj.atomic_tests
178
+
203
179
204
- # print(f"Successfully parsed all {len(atomic_objects)} files!")
205
- # print(f"Successfully parsed all {len(atomic_simulations)} simulations!")
206
180
207
181
208
182
0 commit comments