1
1
import logging
2
2
import os
3
3
import pathlib
4
+ from typing import TYPE_CHECKING , List , Optional
5
+
4
6
import pygit2
5
- from pygit2 .enums import DeltaStatus
6
- from typing import List , Optional
7
7
from pydantic import BaseModel , FilePath
8
- from typing import TYPE_CHECKING
8
+ from pygit2 .enums import DeltaStatus
9
+
9
10
if TYPE_CHECKING :
10
11
from contentctl .input .director import DirectorOutputDto
11
-
12
12
13
- from contentctl .objects .macro import Macro
14
- from contentctl .objects .lookup import Lookup
15
- from contentctl .objects .detection import Detection
13
+ from contentctl .objects .config import All , Changes , Selected , test_common
16
14
from contentctl .objects .data_source import DataSource
15
+ from contentctl .objects .detection import Detection
16
+ from contentctl .objects .lookup import CSVLookup , Lookup
17
+ from contentctl .objects .macro import Macro
17
18
from contentctl .objects .security_content_object import SecurityContentObject
18
- from contentctl .objects .config import test_common , All , Changes , Selected
19
19
20
20
# Logger
21
21
logging .basicConfig (level = os .environ .get ("LOGLEVEL" , "INFO" ))
22
22
LOGGER = logging .getLogger (__name__ )
23
23
24
24
25
-
26
25
from contentctl .input .director import DirectorOutputDto
27
26
28
27
29
-
30
28
class GitService (BaseModel ):
31
29
director : DirectorOutputDto
32
30
config : test_common
33
31
gitHash : Optional [str ] = None
34
-
35
- def getHash (self )-> str :
32
+
33
+ def getHash (self ) -> str :
36
34
if self .gitHash is None :
37
35
raise Exception ("Cannot get hash of repo, it was not set" )
38
36
return self .gitHash
39
37
40
-
41
- def getContent (self )-> List [Detection ]:
38
+ def getContent (self ) -> List [Detection ]:
42
39
if isinstance (self .config .mode , Selected ):
43
40
return self .getSelected (self .config .mode .files )
44
41
elif isinstance (self .config .mode , Changes ):
45
42
return self .getChanges (self .config .mode .target_branch )
46
43
if isinstance (self .config .mode , All ):
47
44
return self .getAll ()
48
45
else :
49
- raise Exception (f"Could not get content to test. Unsupported test mode '{ self .config .mode } '" )
50
- def getAll (self )-> List [Detection ]:
46
+ raise Exception (
47
+ f"Could not get content to test. Unsupported test mode '{ self .config .mode } '"
48
+ )
49
+
50
+ def getAll (self ) -> List [Detection ]:
51
51
return self .director .detections
52
-
53
- def getChanges (self , target_branch :str )-> List [Detection ]:
52
+
53
+ def getChanges (self , target_branch : str ) -> List [Detection ]:
54
54
repo = pygit2 .Repository (path = str (self .config .path ))
55
55
56
56
try :
57
57
target_tree = repo .revparse_single (target_branch ).tree
58
58
self .gitHash = target_tree .id
59
59
diffs = repo .index .diff_to_tree (target_tree )
60
- except Exception as e :
61
- raise Exception (f"Error parsing diff target_branch '{ target_branch } '. Are you certain that it exists?" )
62
-
63
- #Get the uncommitted changes in the current directory
60
+ except Exception :
61
+ raise Exception (
62
+ f"Error parsing diff target_branch '{ target_branch } '. Are you certain that it exists?"
63
+ )
64
+
65
+ # Get the uncommitted changes in the current directory
64
66
diffs2 = repo .index .diff_to_workdir ()
65
-
66
- #Combine the uncommitted changes with the committed changes
67
+
68
+ # Combine the uncommitted changes with the committed changes
67
69
all_diffs = list (diffs ) + list (diffs2 )
68
70
69
- #Make a filename to content map
70
- filepath_to_content_map = { obj .file_path :obj for (_ ,obj ) in self .director .name_to_content_map .items ()}
71
+ # Make a filename to content map
72
+ filepath_to_content_map = {
73
+ obj .file_path : obj for (_ , obj ) in self .director .name_to_content_map .items ()
74
+ }
71
75
72
76
updated_detections : set [Detection ] = set ()
73
77
updated_macros : set [Macro ] = set ()
74
78
updated_lookups : set [Lookup ] = set ()
75
79
updated_datasources : set [DataSource ] = set ()
76
80
77
-
78
81
for diff in all_diffs :
79
82
if type (diff ) == pygit2 .Patch :
80
- if diff .delta .status in (DeltaStatus .ADDED , DeltaStatus .MODIFIED , DeltaStatus .RENAMED ):
81
- #print(f"{DeltaStatus(diff.delta.status).name:<8}:{diff.delta.new_file.raw_path}")
82
- decoded_path = pathlib .Path (diff .delta .new_file .raw_path .decode ('utf-8' ))
83
+ if diff .delta .status in (
84
+ DeltaStatus .ADDED ,
85
+ DeltaStatus .MODIFIED ,
86
+ DeltaStatus .RENAMED ,
87
+ ):
88
+ # print(f"{DeltaStatus(diff.delta.status).name:<8}:{diff.delta.new_file.raw_path}")
89
+ decoded_path = pathlib .Path (
90
+ diff .delta .new_file .raw_path .decode ("utf-8" )
91
+ )
83
92
# Note that we only handle updates to detections, lookups, and macros at this time. All other changes are ignored.
84
- if decoded_path .is_relative_to (self .config .path / "detections" ) and decoded_path .suffix == ".yml" :
85
- detectionObject = filepath_to_content_map .get (decoded_path , None )
93
+ if (
94
+ decoded_path .is_relative_to (self .config .path / "detections" )
95
+ and decoded_path .suffix == ".yml"
96
+ ):
97
+ detectionObject = filepath_to_content_map .get (
98
+ decoded_path , None
99
+ )
86
100
if isinstance (detectionObject , Detection ):
87
101
updated_detections .add (detectionObject )
88
102
else :
89
- raise Exception (f"Error getting detection object for file { str (decoded_path )} " )
90
-
91
- elif decoded_path .is_relative_to (self .config .path / "macros" ) and decoded_path .suffix == ".yml" :
103
+ raise Exception (
104
+ f"Error getting detection object for file { str (decoded_path )} "
105
+ )
106
+
107
+ elif (
108
+ decoded_path .is_relative_to (self .config .path / "macros" )
109
+ and decoded_path .suffix == ".yml"
110
+ ):
92
111
macroObject = filepath_to_content_map .get (decoded_path , None )
93
112
if isinstance (macroObject , Macro ):
94
113
updated_macros .add (macroObject )
95
114
else :
96
- raise Exception (f"Error getting macro object for file { str (decoded_path )} " )
97
-
98
- elif decoded_path .is_relative_to (self .config .path / "data_sources" ) and decoded_path .suffix == ".yml" :
99
- datasourceObject = filepath_to_content_map .get (decoded_path , None )
115
+ raise Exception (
116
+ f"Error getting macro object for file { str (decoded_path )} "
117
+ )
118
+
119
+ elif (
120
+ decoded_path .is_relative_to (self .config .path / "data_sources" )
121
+ and decoded_path .suffix == ".yml"
122
+ ):
123
+ datasourceObject = filepath_to_content_map .get (
124
+ decoded_path , None
125
+ )
100
126
if isinstance (datasourceObject , DataSource ):
101
127
updated_datasources .add (datasourceObject )
102
128
else :
103
- raise Exception (f"Error getting data source object for file { str (decoded_path )} " )
129
+ raise Exception (
130
+ f"Error getting data source object for file { str (decoded_path )} "
131
+ )
104
132
105
- elif decoded_path .is_relative_to (self .config .path / "lookups" ):
133
+ elif decoded_path .is_relative_to (self .config .path / "lookups" ):
106
134
# We need to convert this to a yml. This means we will catch
107
135
# both changes to a csv AND changes to the YML that uses it
108
136
if decoded_path .suffix == ".yml" :
109
- updatedLookup = filepath_to_content_map .get (decoded_path , None )
110
- if not isinstance (updatedLookup ,Lookup ):
111
- raise Exception (f"Expected { decoded_path } to be type { type (Lookup )} , but instead if was { (type (updatedLookup ))} " )
137
+ updatedLookup = filepath_to_content_map .get (
138
+ decoded_path , None
139
+ )
140
+ if not isinstance (updatedLookup , Lookup ):
141
+ raise Exception (
142
+ f"Expected { decoded_path } to be type { type (Lookup )} , but instead if was { (type (updatedLookup ))} "
143
+ )
112
144
updated_lookups .add (updatedLookup )
113
145
114
146
elif decoded_path .suffix == ".csv" :
115
- # If the CSV was updated, we want to make sure that we
147
+ # If the CSV was updated, we want to make sure that we
116
148
# add the correct corresponding Lookup object.
117
- #Filter to find the Lookup Object the references this CSV
118
- matched = list (filter (lambda x : x .filename is not None and x .filename == decoded_path , self .director .lookups ))
149
+ # Filter to find the Lookup Object the references this CSV
150
+ matched = list (
151
+ filter (
152
+ lambda x : isinstance (x , CSVLookup )
153
+ and x .filename == decoded_path ,
154
+ self .director .lookups ,
155
+ )
156
+ )
119
157
if len (matched ) == 0 :
120
- raise Exception (f"Failed to find any lookups that reference the modified CSV file '{ decoded_path } '" )
158
+ raise Exception (
159
+ f"Failed to find any lookups that reference the modified CSV file '{ decoded_path } '"
160
+ )
121
161
elif len (matched ) > 1 :
122
- raise Exception (f"More than 1 Lookup reference the modified CSV file '{ decoded_path } ': { [l .file_path for l in matched ]} " )
162
+ raise Exception (
163
+ f"More than 1 Lookup reference the modified CSV file '{ decoded_path } ': { [match .file_path for match in matched ]} "
164
+ )
123
165
else :
124
166
updatedLookup = matched [0 ]
125
167
elif decoded_path .suffix == ".mlmodel" :
126
- # Detected a changed .mlmodel file. However, since we do not have testing for these detections at
168
+ # Detected a changed .mlmodel file. However, since we do not have testing for these detections at
127
169
# this time, we will ignore this change.
128
170
updatedLookup = None
129
171
130
172
else :
131
- raise Exception (f"Detected a changed file in the lookups/ directory '{ str (decoded_path )} '.\n "
132
- "Only files ending in .csv, .yml, or .mlmodel are supported in this "
133
- "directory. This file must be removed from the lookups/ directory." )
134
-
135
- if updatedLookup is not None and updatedLookup not in updated_lookups :
173
+ raise Exception (
174
+ f"Detected a changed file in the lookups/ directory '{ str (decoded_path )} '.\n "
175
+ "Only files ending in .csv, .yml, or .mlmodel are supported in this "
176
+ "directory. This file must be removed from the lookups/ directory."
177
+ )
178
+
179
+ if (
180
+ updatedLookup is not None
181
+ and updatedLookup not in updated_lookups
182
+ ):
136
183
# It is possible that both the CSV and YML have been modified for the same lookup,
137
- # and we do not want to add it twice.
184
+ # and we do not want to add it twice.
138
185
updated_lookups .add (updatedLookup )
139
186
140
187
else :
141
188
pass
142
- #print(f"Ignore changes to file {decoded_path} since it is not a detection, macro, or lookup.")
189
+ # print(f"Ignore changes to file {decoded_path} since it is not a detection, macro, or lookup.")
143
190
else :
144
191
raise Exception (f"Unrecognized diff type { type (diff )} " )
145
192
146
-
147
193
# If a detection has at least one dependency on changed content,
148
194
# then we must test it again
149
195
150
- changed_macros_and_lookups_and_datasources :set [SecurityContentObject ] = updated_macros .union (updated_lookups , updated_datasources )
151
-
196
+ changed_macros_and_lookups_and_datasources : set [Macro | Lookup | DataSource ] = (
197
+ updated_macros .union (updated_lookups , updated_datasources )
198
+ )
199
+
152
200
for detection in self .director .detections :
153
201
if detection in updated_detections :
154
- # we are already planning to test it, don't need
202
+ # we are already planning to test it, don't need
155
203
# to add it again
156
204
continue
157
205
158
206
for obj in changed_macros_and_lookups_and_datasources :
159
207
if obj in detection .get_content_dependencies ():
160
- updated_detections .add (detection )
161
- break
208
+ updated_detections .add (detection )
209
+ break
162
210
163
- #Print out the names of all modified/new content
164
- modifiedAndNewContentString = "\n - " .join (sorted ([d .name for d in updated_detections ]))
211
+ # Print out the names of all modified/new content
212
+ modifiedAndNewContentString = "\n - " .join (
213
+ sorted ([d .name for d in updated_detections ])
214
+ )
165
215
166
- print (f"[{ len (updated_detections )} ] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - { modifiedAndNewContentString } " )
216
+ print (
217
+ f"[{ len (updated_detections )} ] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - { modifiedAndNewContentString } "
218
+ )
167
219
return sorted (list (updated_detections ))
168
220
169
221
def getSelected (self , detectionFilenames : List [FilePath ]) -> List [Detection ]:
170
222
filepath_to_content_map : dict [FilePath , SecurityContentObject ] = {
171
- obj .file_path : obj for (_ , obj ) in self .director .name_to_content_map .items () if obj .file_path is not None
172
- }
223
+ obj .file_path : obj
224
+ for (_ , obj ) in self .director .name_to_content_map .items ()
225
+ if obj .file_path is not None
226
+ }
173
227
errors = []
174
228
detections : List [Detection ] = []
175
229
for name in detectionFilenames :
176
230
obj = filepath_to_content_map .get (name , None )
177
231
if obj is None :
178
- errors .append (f"There is no detection file or security_content_object at '{ name } '" )
232
+ errors .append (
233
+ f"There is no detection file or security_content_object at '{ name } '"
234
+ )
179
235
elif not isinstance (obj , Detection ):
180
- errors .append (f"The security_content_object at '{ name } ' is of type '{ type (obj ).__name__ } ', NOT '{ Detection .__name__ } '" )
236
+ errors .append (
237
+ f"The security_content_object at '{ name } ' is of type '{ type (obj ).__name__ } ', NOT '{ Detection .__name__ } '"
238
+ )
181
239
else :
182
240
detections .append (obj )
183
241
184
242
if errors :
185
243
errorsString = "\n - " .join (errors )
186
- raise Exception (f"The following errors were encountered while getting selected detections to test:\n - { errorsString } " )
187
- return detections
244
+ raise Exception (
245
+ f"The following errors were encountered while getting selected detections to test:\n - { errorsString } "
246
+ )
247
+ return detections
0 commit comments