1
1
import logging
2
2
import os
3
3
import pathlib
4
+ from typing import TYPE_CHECKING , List , Optional
5
+
4
6
import pygit2
5
- from pygit2 .enums import DeltaStatus
6
- from typing import List , Optional
7
7
from pydantic import BaseModel , FilePath
8
- from typing import TYPE_CHECKING
8
+ from pygit2 .enums import DeltaStatus
9
+
9
10
if TYPE_CHECKING :
10
11
from contentctl .input .director import DirectorOutputDto
11
-
12
12
13
- from contentctl .objects .macro import Macro
14
- from contentctl .objects .lookup import Lookup
15
- from contentctl .objects .detection import Detection
13
+
14
+ from contentctl .objects .config import All , Changes , Selected , test_common
16
15
from contentctl .objects .data_source import DataSource
16
+ from contentctl .objects .detection import Detection
17
+ from contentctl .objects .lookup import Lookup , Lookup_Type
18
+ from contentctl .objects .macro import Macro
17
19
from contentctl .objects .security_content_object import SecurityContentObject
18
- from contentctl .objects .config import test_common , All , Changes , Selected
19
20
20
21
# Logger
21
22
logging .basicConfig (level = os .environ .get ("LOGLEVEL" , "INFO" ))
22
23
LOGGER = logging .getLogger (__name__ )
23
24
24
25
25
-
26
26
from contentctl .input .director import DirectorOutputDto
27
27
28
28
29
-
30
29
class GitService (BaseModel ):
31
30
director : DirectorOutputDto
32
31
config : test_common
33
32
gitHash : Optional [str ] = None
34
-
35
- def getHash (self )-> str :
33
+
34
+ def getHash (self ) -> str :
36
35
if self .gitHash is None :
37
36
raise Exception ("Cannot get hash of repo, it was not set" )
38
37
return self .gitHash
39
38
40
-
41
- def getContent (self )-> List [Detection ]:
39
+ def getContent (self ) -> List [Detection ]:
42
40
if isinstance (self .config .mode , Selected ):
43
41
return self .getSelected (self .config .mode .files )
44
42
elif isinstance (self .config .mode , Changes ):
45
43
return self .getChanges (self .config .mode .target_branch )
46
44
if isinstance (self .config .mode , All ):
47
45
return self .getAll ()
48
46
else :
49
- raise Exception (f"Could not get content to test. Unsupported test mode '{ self .config .mode } '" )
50
- def getAll (self )-> List [Detection ]:
47
+ raise Exception (
48
+ f"Could not get content to test. Unsupported test mode '{ self .config .mode } '"
49
+ )
50
+
51
+ def getAll (self ) -> List [Detection ]:
51
52
return self .director .detections
52
-
53
- def getChanges (self , target_branch :str )-> List [Detection ]:
53
+
54
+ def getChanges (self , target_branch : str ) -> List [Detection ]:
54
55
repo = pygit2 .Repository (path = str (self .config .path ))
55
56
56
57
try :
57
58
target_tree = repo .revparse_single (target_branch ).tree
58
59
self .gitHash = target_tree .id
59
60
diffs = repo .index .diff_to_tree (target_tree )
60
- except Exception as e :
61
- raise Exception (f"Error parsing diff target_branch '{ target_branch } '. Are you certain that it exists?" )
62
-
63
- #Get the uncommitted changes in the current directory
61
+ except Exception :
62
+ raise Exception (
63
+ f"Error parsing diff target_branch '{ target_branch } '. Are you certain that it exists?"
64
+ )
65
+
66
+ # Get the uncommitted changes in the current directory
64
67
diffs2 = repo .index .diff_to_workdir ()
65
-
66
- #Combine the uncommitted changes with the committed changes
68
+
69
+ # Combine the uncommitted changes with the committed changes
67
70
all_diffs = list (diffs ) + list (diffs2 )
68
71
69
- #Make a filename to content map
70
- filepath_to_content_map = { obj .file_path :obj for (_ ,obj ) in self .director .name_to_content_map .items ()}
72
+ # Make a filename to content map
73
+ filepath_to_content_map = {
74
+ obj .file_path : obj for (_ , obj ) in self .director .name_to_content_map .items ()
75
+ }
71
76
72
77
updated_detections : set [Detection ] = set ()
73
78
updated_macros : set [Macro ] = set ()
74
79
updated_lookups : set [Lookup ] = set ()
75
80
updated_datasources : set [DataSource ] = set ()
76
81
77
-
78
82
for diff in all_diffs :
79
83
if type (diff ) == pygit2 .Patch :
80
- if diff .delta .status in (DeltaStatus .ADDED , DeltaStatus .MODIFIED , DeltaStatus .RENAMED ):
81
- #print(f"{DeltaStatus(diff.delta.status).name:<8}:{diff.delta.new_file.raw_path}")
82
- decoded_path = pathlib .Path (diff .delta .new_file .raw_path .decode ('utf-8' ))
84
+ if diff .delta .status in (
85
+ DeltaStatus .ADDED ,
86
+ DeltaStatus .MODIFIED ,
87
+ DeltaStatus .RENAMED ,
88
+ ):
89
+ # print(f"{DeltaStatus(diff.delta.status).name:<8}:{diff.delta.new_file.raw_path}")
90
+ decoded_path = pathlib .Path (
91
+ diff .delta .new_file .raw_path .decode ("utf-8" )
92
+ )
83
93
# Note that we only handle updates to detections, lookups, and macros at this time. All other changes are ignored.
84
- if decoded_path .is_relative_to (self .config .path / "detections" ) and decoded_path .suffix == ".yml" :
85
- detectionObject = filepath_to_content_map .get (decoded_path , None )
94
+ if (
95
+ decoded_path .is_relative_to (self .config .path / "detections" )
96
+ and decoded_path .suffix == ".yml"
97
+ ):
98
+ detectionObject = filepath_to_content_map .get (
99
+ decoded_path , None
100
+ )
86
101
if isinstance (detectionObject , Detection ):
87
102
updated_detections .add (detectionObject )
88
103
else :
89
- raise Exception (f"Error getting detection object for file { str (decoded_path )} " )
90
-
91
- elif decoded_path .is_relative_to (self .config .path / "macros" ) and decoded_path .suffix == ".yml" :
104
+ raise Exception (
105
+ f"Error getting detection object for file { str (decoded_path )} "
106
+ )
107
+
108
+ elif (
109
+ decoded_path .is_relative_to (self .config .path / "macros" )
110
+ and decoded_path .suffix == ".yml"
111
+ ):
92
112
macroObject = filepath_to_content_map .get (decoded_path , None )
93
113
if isinstance (macroObject , Macro ):
94
114
updated_macros .add (macroObject )
95
115
else :
96
- raise Exception (f"Error getting macro object for file { str (decoded_path )} " )
97
-
98
- elif decoded_path .is_relative_to (self .config .path / "data_sources" ) and decoded_path .suffix == ".yml" :
99
- datasourceObject = filepath_to_content_map .get (decoded_path , None )
116
+ raise Exception (
117
+ f"Error getting macro object for file { str (decoded_path )} "
118
+ )
119
+
120
+ elif (
121
+ decoded_path .is_relative_to (self .config .path / "data_sources" )
122
+ and decoded_path .suffix == ".yml"
123
+ ):
124
+ datasourceObject = filepath_to_content_map .get (
125
+ decoded_path , None
126
+ )
100
127
if isinstance (datasourceObject , DataSource ):
101
128
updated_datasources .add (datasourceObject )
102
129
else :
103
- raise Exception (f"Error getting data source object for file { str (decoded_path )} " )
130
+ raise Exception (
131
+ f"Error getting data source object for file { str (decoded_path )} "
132
+ )
104
133
105
- elif decoded_path .is_relative_to (self .config .path / "lookups" ):
134
+ elif decoded_path .is_relative_to (self .config .path / "lookups" ):
106
135
# We need to convert this to a yml. This means we will catch
107
136
# both changes to a csv AND changes to the YML that uses it
108
137
if decoded_path .suffix == ".yml" :
109
- updatedLookup = filepath_to_content_map .get (decoded_path , None )
110
- if not isinstance (updatedLookup ,Lookup ):
111
- raise Exception (f"Expected { decoded_path } to be type { type (Lookup )} , but instead if was { (type (updatedLookup ))} " )
138
+ updatedLookup = filepath_to_content_map .get (
139
+ decoded_path , None
140
+ )
141
+ if not isinstance (updatedLookup , Lookup ):
142
+ raise Exception (
143
+ f"Expected { decoded_path } to be type { type (Lookup )} , but instead if was { (type (updatedLookup ))} "
144
+ )
112
145
updated_lookups .add (updatedLookup )
113
146
114
147
elif decoded_path .suffix == ".csv" :
115
- # If the CSV was updated, we want to make sure that we
148
+ # If the CSV was updated, we want to make sure that we
116
149
# add the correct corresponding Lookup object.
117
- #Filter to find the Lookup Object the references this CSV
118
- matched = list (filter (lambda x : x .filename is not None and x .filename == decoded_path , self .director .lookups ))
150
+ # Filter to find the Lookup Object the references this CSV
151
+ matched = list (
152
+ filter (
153
+ lambda x : x .lookup_type == Lookup_Type .csv
154
+ and x .filename is not None
155
+ and x .filename == decoded_path ,
156
+ self .director .lookups ,
157
+ )
158
+ )
119
159
if len (matched ) == 0 :
120
- raise Exception (f"Failed to find any lookups that reference the modified CSV file '{ decoded_path } '" )
160
+ raise Exception (
161
+ f"Failed to find any lookups that reference the modified CSV file '{ decoded_path } '"
162
+ )
121
163
elif len (matched ) > 1 :
122
- raise Exception (f"More than 1 Lookup reference the modified CSV file '{ decoded_path } ': { [l .file_path for l in matched ]} " )
164
+ raise Exception (
165
+ f"More than 1 Lookup reference the modified CSV file '{ decoded_path } ': { [l .file_path for l in matched ]} "
166
+ )
123
167
else :
124
168
updatedLookup = matched [0 ]
125
169
elif decoded_path .suffix == ".mlmodel" :
126
- # Detected a changed .mlmodel file. However, since we do not have testing for these detections at
170
+ # Detected a changed .mlmodel file. However, since we do not have testing for these detections at
127
171
# this time, we will ignore this change.
128
172
updatedLookup = None
129
173
130
174
else :
131
- raise Exception (f"Detected a changed file in the lookups/ directory '{ str (decoded_path )} '.\n "
132
- "Only files ending in .csv, .yml, or .mlmodel are supported in this "
133
- "directory. This file must be removed from the lookups/ directory." )
134
-
135
- if updatedLookup is not None and updatedLookup not in updated_lookups :
175
+ raise Exception (
176
+ f"Detected a changed file in the lookups/ directory '{ str (decoded_path )} '.\n "
177
+ "Only files ending in .csv, .yml, or .mlmodel are supported in this "
178
+ "directory. This file must be removed from the lookups/ directory."
179
+ )
180
+
181
+ if (
182
+ updatedLookup is not None
183
+ and updatedLookup not in updated_lookups
184
+ ):
136
185
# It is possible that both the CSV and YML have been modified for the same lookup,
137
- # and we do not want to add it twice.
186
+ # and we do not want to add it twice.
138
187
updated_lookups .add (updatedLookup )
139
188
140
189
else :
141
190
pass
142
- #print(f"Ignore changes to file {decoded_path} since it is not a detection, macro, or lookup.")
191
+ # print(f"Ignore changes to file {decoded_path} since it is not a detection, macro, or lookup.")
143
192
else :
144
193
raise Exception (f"Unrecognized diff type { type (diff )} " )
145
194
146
-
147
195
# If a detection has at least one dependency on changed content,
148
196
# then we must test it again
149
197
150
- changed_macros_and_lookups_and_datasources :set [SecurityContentObject ] = updated_macros .union (updated_lookups , updated_datasources )
151
-
198
+ changed_macros_and_lookups_and_datasources : set [SecurityContentObject ] = (
199
+ updated_macros .union (updated_lookups , updated_datasources )
200
+ )
201
+
152
202
for detection in self .director .detections :
153
203
if detection in updated_detections :
154
- # we are already planning to test it, don't need
204
+ # we are already planning to test it, don't need
155
205
# to add it again
156
206
continue
157
207
158
208
for obj in changed_macros_and_lookups_and_datasources :
159
209
if obj in detection .get_content_dependencies ():
160
- updated_detections .add (detection )
161
- break
210
+ updated_detections .add (detection )
211
+ break
162
212
163
- #Print out the names of all modified/new content
164
- modifiedAndNewContentString = "\n - " .join (sorted ([d .name for d in updated_detections ]))
213
+ # Print out the names of all modified/new content
214
+ modifiedAndNewContentString = "\n - " .join (
215
+ sorted ([d .name for d in updated_detections ])
216
+ )
165
217
166
- print (f"[{ len (updated_detections )} ] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - { modifiedAndNewContentString } " )
218
+ print (
219
+ f"[{ len (updated_detections )} ] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - { modifiedAndNewContentString } "
220
+ )
167
221
return sorted (list (updated_detections ))
168
222
169
223
def getSelected (self , detectionFilenames : List [FilePath ]) -> List [Detection ]:
170
224
filepath_to_content_map : dict [FilePath , SecurityContentObject ] = {
171
- obj .file_path : obj for (_ , obj ) in self .director .name_to_content_map .items () if obj .file_path is not None
172
- }
225
+ obj .file_path : obj
226
+ for (_ , obj ) in self .director .name_to_content_map .items ()
227
+ if obj .file_path is not None
228
+ }
173
229
errors = []
174
230
detections : List [Detection ] = []
175
231
for name in detectionFilenames :
176
232
obj = filepath_to_content_map .get (name , None )
177
233
if obj is None :
178
- errors .append (f"There is no detection file or security_content_object at '{ name } '" )
234
+ errors .append (
235
+ f"There is no detection file or security_content_object at '{ name } '"
236
+ )
179
237
elif not isinstance (obj , Detection ):
180
- errors .append (f"The security_content_object at '{ name } ' is of type '{ type (obj ).__name__ } ', NOT '{ Detection .__name__ } '" )
238
+ errors .append (
239
+ f"The security_content_object at '{ name } ' is of type '{ type (obj ).__name__ } ', NOT '{ Detection .__name__ } '"
240
+ )
181
241
else :
182
242
detections .append (obj )
183
243
184
244
if errors :
185
245
errorsString = "\n - " .join (errors )
186
- raise Exception (f"The following errors were encountered while getting selected detections to test:\n - { errorsString } " )
187
- return detections
246
+ raise Exception (
247
+ f"The following errors were encountered while getting selected detections to test:\n - { errorsString } "
248
+ )
249
+ return detections
0 commit comments