2020
2121import time
2222
23+ import uuid
24+
2325import backoff
2426
2527from google .api_core .exceptions import InternalServerError , NotFound , ServiceUnavailable
3638# GCLOUD_ORGANIZATION: The organization ID.
3739ORGANIZATION_ID = os .environ ["GCLOUD_ORGANIZATION" ]
3840LOCATION = "global"
39- PREFIX = "python_sample_etd_custom_module" # Prefix used for identifying test modules
41+ PREFIX = "python_sample_etd_custom_module"
4042
41- # Global list to track created modules
42- created_modules = []
43+ # Global list to track created shared modules
44+ shared_modules = []
4345
4446
4547@pytest .fixture (scope = "session" , autouse = True )
4648def setup_environment ():
4749 if not ORGANIZATION_ID :
4850 pytest .fail ("GCLOUD_ORGANIZATION environment variable is not set." )
4951
52+ setup_shared_modules ()
53+
5054
5155@pytest .fixture (scope = "session" , autouse = True )
5256def cleanup_after_tests (request ):
5357 """Fixture to clean up created custom modules after the test session."""
5458 def teardown ():
55- print ("\n Created Custom Modules:" )
56- print_all_created_modules ()
57- print ("Cleaning up created custom modules..." )
58- cleanup_created_custom_modules ()
59+ print_all_shared_modules ()
60+ cleanup_shared_modules ()
5961
6062 request .addfinalizer (teardown )
6163
6264
65+ def setup_shared_modules ():
66+ for _ in range (3 ) :
67+ _ , module_id = add_custom_module (ORGANIZATION_ID )
68+ if module_id != "" :
69+ shared_modules .append (module_id )
70+
71+
72+ def add_module_to_cleanup (module_id ):
73+ shared_modules .append (module_id )
74+
75+
76+ def print_all_shared_modules ():
77+ """Print all created custom modules."""
78+ if not shared_modules :
79+ print ("No custom modules were created." )
80+ else :
81+ print ("\n Created Custom Modules:" )
82+ for module_id in shared_modules :
83+ print (module_id )
84+
85+
86+ def cleanup_shared_modules ():
87+ """
88+ Deletes all created custom modules in this test session.
89+ """
90+ client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
91+
92+ print ("Cleaning up created custom modules..." )
93+
94+ for module_id in list (shared_modules ):
95+ if not custom_module_exists (module_id ):
96+ print (f"Module not found (already deleted): { module_id } " )
97+ shared_modules .remove (module_id )
98+ continue
99+ try :
100+ client .delete_event_threat_detection_custom_module (
101+ request = {"name" : f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } /eventThreatDetectionCustomModules/{ module_id } " }
102+ )
103+ print (f"Deleted custom module: { module_id } " )
104+ shared_modules .remove (module_id )
105+ except Exception as e :
106+ print (f"Failed to delete module { module_id } : { e } " )
107+ raise
108+
109+
110+ def custom_module_exists (module_id ):
111+ client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
112+ try :
113+ client .get_event_threat_detection_custom_module (
114+ request = {"name" : f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } /eventThreatDetectionCustomModules/{ module_id } " }
115+ )
116+ return True
117+ except Exception as e :
118+ if "404" in str (e ):
119+ return False
120+ raise
121+
122+
123+ def get_random_shared_module ():
124+ if not shared_modules :
125+ return ""
126+ random .seed (int (time .time () * 1000000 ))
127+ return shared_modules [random .randint (0 , len (shared_modules ) - 1 )]
128+
129+
130+ def extract_custom_module_id (module_name ):
131+ trimmed_full_name = module_name .strip ()
132+ parts = trimmed_full_name .split ("/" )
133+ if parts :
134+ return parts [- 1 ]
135+ return ""
136+
137+
63138def add_custom_module (org_id : str ):
64139
65140 parent = f"organizations/{ org_id } /locations/global"
66141 client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
67142
68143 # Generate a unique display name
69- unique_suffix = f" { int ( time . time ())} _ { random . randint ( 0 , 999 ) } "
144+ unique_suffix = str ( uuid . uuid4 ()). replace ( "-" , "_" )
70145 display_name = f"python_sample_etd_custom_module_test_{ unique_suffix } "
71146
72147 # Define the metadata and other config parameters as a dictionary
@@ -98,7 +173,7 @@ def add_custom_module(org_id: str):
98173 response = client .create_event_threat_detection_custom_module (request = request )
99174 print (f"Created Event Threat Detection Custom Module: { response .name } " )
100175 module_name = response .name
101- module_id = module_name . split ( "/" )[ - 1 ]
176+ module_id = extract_custom_module_id ( module_name )
102177 return module_name , module_id
103178
104179
@@ -110,7 +185,7 @@ def test_create_event_threat_detection_custom_module():
110185
111186 # Run the function to create the custom module
112187 response = event_threat_detection_custom_modules .create_event_threat_detection_custom_module (parent )
113- created_modules . append ( response .name )
188+ add_module_to_cleanup ( extract_custom_module_id ( response .name ) )
114189
115190 assert response is not None , "Custom module creation failed."
116191 # Verify that the custom module was created
@@ -123,27 +198,24 @@ def test_create_event_threat_detection_custom_module():
123198)
124199def test_get_event_threat_detection_custom_module ():
125200
126- module_name , module_id = add_custom_module (ORGANIZATION_ID )
127- created_modules .append (module_name )
201+ module_id = get_random_shared_module ()
128202 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
129203
130204 # Retrieve the custom module
131205 response = event_threat_detection_custom_modules .get_event_threat_detection_custom_module (parent , module_id )
132206
133207 assert response is not None , "Failed to retrieve the custom module."
134- # Verify that the custom module was created
135208 assert response .display_name .startswith (PREFIX )
136209 assert response .enablement_state == securitycentermanagement_v1 .EventThreatDetectionCustomModule .EnablementState .ENABLED
137- print (f"Retrieved Custom Module: { response .name } " )
138210
139211
140212@backoff .on_exception (
141213 backoff .expo , (InternalServerError , ServiceUnavailable , NotFound ), max_tries = 3
142214)
143215def test_list_event_threat_detection_custom_module ():
144216
145- module_name , module_id = add_custom_module ( ORGANIZATION_ID )
146- created_modules . append ( module_name )
217+ module_id = get_random_shared_module ( )
218+
147219 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
148220 # Retrieve the custom modules
149221 custom_modules = event_threat_detection_custom_modules .list_event_threat_detection_custom_module (parent )
@@ -153,7 +225,7 @@ def test_list_event_threat_detection_custom_module():
153225
154226 # Verify the created module is in the list
155227 created_module = next (
156- (module for module in custom_modules if module .name == module_name ), None
228+ (module for module in custom_modules if extract_custom_module_id ( module .name ) == module_id ), None
157229 )
158230 assert created_module is not None , "Created custom module not found in the list."
159231 assert created_module .display_name .startswith (PREFIX )
@@ -168,15 +240,14 @@ def test_list_event_threat_detection_custom_module():
168240)
169241def test_update_event_threat_detection_custom_module ():
170242
171- module_name , module_id = add_custom_module ( ORGANIZATION_ID )
172- created_modules . append ( module_name )
243+ module_id = get_random_shared_module ( )
244+
173245 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
174246
175247 # Retrieve the custom module
176248 response = event_threat_detection_custom_modules .update_event_threat_detection_custom_module (parent , module_id )
177249
178250 assert response is not None , "Failed to retrieve the custom module."
179- # Verify that the custom module was created
180251 assert response .display_name .startswith (PREFIX )
181252 assert response .enablement_state == securitycentermanagement_v1 .EventThreatDetectionCustomModule .EnablementState .DISABLED
182253
@@ -186,8 +257,8 @@ def test_update_event_threat_detection_custom_module():
186257)
187258def test_delete_event_threat_detection_custom_module ():
188259
189- module_name , module_id = add_custom_module ( ORGANIZATION_ID )
190- created_modules . append ( module_name )
260+ module_id = get_random_shared_module ( )
261+
191262 parent = f"organizations/{ ORGANIZATION_ID } /locations/{ LOCATION } "
192263 try :
193264 response = event_threat_detection_custom_modules .delete_event_threat_detection_custom_module (parent , module_id )
@@ -196,47 +267,3 @@ def test_delete_event_threat_detection_custom_module():
196267 assert response is None
197268
198269 print (f"Custom module was deleted successfully: { module_id } " )
199-
200-
201- def print_all_created_modules ():
202- """Print all created custom modules."""
203- if not created_modules :
204- print ("No custom modules were created." )
205- else :
206- for module in created_modules :
207- print (module )
208-
209-
210- def cleanup_created_custom_modules ():
211- """
212- Deletes all created custom modules in this test session.
213- """
214- client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
215-
216- for module in list (created_modules ):
217- if not custom_module_exists (module ):
218- print (f"Module not found (already deleted): { module } " )
219- created_modules .remove (module )
220- continue
221- try :
222- client .delete_event_threat_detection_custom_module (
223- request = {"name" : module }
224- )
225- print (f"Deleted custom module: { module } " )
226- created_modules .remove (module )
227- except Exception as e :
228- print (f"Failed to delete module { module } : { e } " )
229- raise
230-
231-
232- def custom_module_exists (module_name ):
233- client = securitycentermanagement_v1 .SecurityCenterManagementClient ()
234- try :
235- client .get_event_threat_detection_custom_module (
236- request = {"name" : module_name }
237- )
238- return True
239- except Exception as e :
240- if "404" in str (e ):
241- return False
242- raise
0 commit comments