Skip to content

Commit 72a5e91

Browse files
authored
feat(securitycenter): Add Resource SCC Org Mgmt API SHA Custom Modules (GetEff, ListEff, ListDesc, Simulate) (#13023)
* feat(securitycenter): Add Resource SCC Mgt API Org SHA Cust Modules (GetEff, ListEff, ListDesc, Simulate) * feat(securitycenter): Add Resource SCC Org Mgt API SHA Cust Modules (GetEff, ListEff, ListDesc, Simulate) * Address comments by code review bot * Trigger CI pipeline * Refactor the cleaning up of created custom modules in the current test session * Refactor the module creation and clean up * fix the lint error * Refactor the update test method * Refactor the test file to resolve the python version error * add backoff to add custom module method * Reduce the loop count and add logs * Remove the loop and logs
1 parent 83b3051 commit 72a5e91

File tree

4 files changed

+272
-13
lines changed

4 files changed

+272
-13
lines changed

securitycenter/snippets_management_api/noxfile_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
# Default TEST_CONFIG_OVERRIDE for python repos.
1616

17+
1718
# You can copy this file into your directory, then it will be inported from
1819
# the noxfile.py.
1920

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
backoff==2.2.1
22
pytest==8.2.0
3-
google-cloud-bigquery==3.11.4
3+
google-cloud-bigquery==3.27.0
44
google-cloud-securitycentermanagement==0.1.17

securitycenter/snippets_management_api/security_health_analytics_custom_modules.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17+
1718
import uuid
1819

1920
from google.api_core.exceptions import GoogleAPICallError, NotFound
@@ -107,6 +108,7 @@ def get_security_health_analytics_custom_module(parent: str, module_id: str):
107108
- organizations/{organization_id}/locations/{location_id}
108109
- folders/{folder_id}/locations/{location_id}
109110
- projects/{project_id}/locations/{location_id}
111+
module_id: The unique identifier of the custom module.
110112
Returns:
111113
The retrieved Security Health Analytics custom module.
112114
Raises:
@@ -238,3 +240,186 @@ def update_security_health_analytics_custom_module(parent: str, module_id: str):
238240
raise
239241

240242
# [END securitycenter_update_security_health_analytics_custom_module]
243+
244+
# [START securitycenter_get_effective_security_health_analytics_custom_module]
245+
246+
247+
def get_effective_security_health_analytics_custom_module(parent: str, module_id: str):
248+
"""
249+
Retrieves a Security Health Analytics custom module using parent and module id as parameters.
250+
Args:
251+
parent: Use any one of the following options:
252+
- organizations/{organization_id}/locations/{location_id}
253+
- folders/{folder_id}/locations/{location_id}
254+
- projects/{project_id}/locations/{location_id}
255+
module_id: The unique identifier of the custom module.
256+
Returns:
257+
The retrieved Security Health Analytics custom module.
258+
Raises:
259+
NotFound: If the specified custom module does not exist.
260+
"""
261+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
262+
263+
try:
264+
request = securitycentermanagement_v1.GetEffectiveSecurityHealthAnalyticsCustomModuleRequest(
265+
name=f"{parent}/effectiveSecurityHealthAnalyticsCustomModules/{module_id}",
266+
)
267+
268+
response = client.get_effective_security_health_analytics_custom_module(request=request)
269+
print(f"Retrieved Effective Security Health Analytics Custom Module: {response.name}")
270+
return response
271+
except NotFound as e:
272+
print(f"Custom Module not found: {e}")
273+
raise e
274+
# [END securitycenter_get_effective_security_health_analytics_custom_module]
275+
276+
# [START securitycenter_list_descendant_security_health_analytics_custom_module]
277+
278+
279+
def list_descendant_security_health_analytics_custom_module(parent: str):
280+
"""
281+
Retrieves list of all resident Security Health Analytics custom modules and all of its descendants.
282+
Args:
283+
parent: Use any one of the following options:
284+
- organizations/{organization_id}/locations/{location_id}
285+
- folders/{folder_id}/locations/{location_id}
286+
- projects/{project_id}/locations/{location_id}
287+
Returns:
288+
A list of all resident Security Health Analytics custom modules and all of its descendants.
289+
Raises:
290+
NotFound: If the parent resource is not found.
291+
"""
292+
293+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
294+
295+
try:
296+
request = securitycentermanagement_v1.ListDescendantSecurityHealthAnalyticsCustomModulesRequest(
297+
parent=parent,
298+
)
299+
300+
response = client.list_descendant_security_health_analytics_custom_modules(request=request)
301+
302+
custom_modules = []
303+
for custom_module in response:
304+
print(f"Custom Module: {custom_module.name}")
305+
custom_modules.append(custom_module)
306+
return custom_modules
307+
except NotFound as e:
308+
print(f"Parent resource not found: {parent}")
309+
raise e
310+
except Exception as e:
311+
print(f"An error occurred while listing custom modules: {e}")
312+
raise e
313+
# [END securitycenter_list_descendant_security_health_analytics_custom_module]
314+
315+
# [START securitycenter_list_effective_security_health_analytics_custom_module]
316+
317+
318+
def list_effective_security_health_analytics_custom_module(parent: str):
319+
"""
320+
Retrieves list of all Security Health Analytics custom modules.
321+
This includes resident modules defined at the scope of the parent,
322+
and inherited modules, inherited from ancestor organizations, folders, and projects (no descendants).
323+
324+
Args:
325+
parent: Use any one of the following options:
326+
- organizations/{organization_id}/locations/{location_id}
327+
- folders/{folder_id}/locations/{location_id}
328+
- projects/{project_id}/locations/{location_id}
329+
Returns:
330+
List of retrieved all Security Health Analytics custom modules.
331+
Raises:
332+
NotFound: If the parent resource is not found.
333+
"""
334+
335+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
336+
337+
try:
338+
request = securitycentermanagement_v1.ListEffectiveSecurityHealthAnalyticsCustomModulesRequest(
339+
parent=parent,
340+
)
341+
342+
response = client.list_effective_security_health_analytics_custom_modules(request=request)
343+
344+
custom_modules = []
345+
for custom_module in response:
346+
print(f"Custom Module: {custom_module.name}")
347+
custom_modules.append(custom_module)
348+
return custom_modules
349+
except NotFound as e:
350+
print(f"Parent resource not found: {parent}")
351+
raise e
352+
except Exception as e:
353+
print(f"An error occurred while listing custom modules: {e}")
354+
raise e
355+
# [END securitycenter_list_effective_security_health_analytics_custom_module]
356+
357+
# [START securitycenter_simulate_security_health_analytics_custom_module]
358+
359+
360+
def simulate_security_health_analytics_custom_module(parent: str):
361+
"""
362+
Simulates the result of using a SecurityHealthAnalyticsCustomModule to check a resource.
363+
364+
Args:
365+
parent: Use any one of the following options:
366+
- organizations/{organization_id}/locations/{location_id}
367+
- folders/{folder_id}/locations/{location_id}
368+
- projects/{project_id}/locations/{location_id}
369+
Returns:
370+
The simulation response of Security Health Analytics custom module.
371+
"""
372+
373+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
374+
375+
# Define the custom config configuration
376+
custom_config = {
377+
"description": (
378+
"Sample custom module for testing purposes. This custom module evaluates "
379+
"Cloud KMS CryptoKeys to ensure their rotation period exceeds 30 days (2592000 seconds)."
380+
),
381+
"predicate": {
382+
"expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))",
383+
"title": "Cloud KMS CryptoKey Rotation Period",
384+
"description": (
385+
"Evaluates whether the rotation period of a Cloud KMS CryptoKey exceeds 30 days. "
386+
"A longer rotation period might increase the risk of exposure."
387+
),
388+
},
389+
"recommendation": (
390+
"Review and adjust the rotation period for Cloud KMS CryptoKeys to align with your security policies. "
391+
"Consider setting a shorter rotation period if possible."
392+
),
393+
"resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]},
394+
"severity": "CRITICAL",
395+
"custom_output": {
396+
"properties": [
397+
{
398+
"name": "example_property",
399+
"value_expression": {
400+
"description": "The resource name of the CryptoKey being evaluated.",
401+
"expression": "resource.name",
402+
"location": "global",
403+
"title": "CryptoKey Resource Name",
404+
},
405+
}
406+
]
407+
},
408+
}
409+
410+
# Initialize request argument(s)
411+
resource = securitycentermanagement_v1.types.SimulateSecurityHealthAnalyticsCustomModuleRequest.SimulatedResource()
412+
resource.resource_type = "cloudkms.googleapis.com/CryptoKey" # Replace with the correct resource type
413+
414+
request = securitycentermanagement_v1.SimulateSecurityHealthAnalyticsCustomModuleRequest(
415+
parent=parent,
416+
custom_config=custom_config,
417+
resource=resource,
418+
)
419+
420+
response = client.simulate_security_health_analytics_custom_module(request=request)
421+
422+
print(f"Simulated Security Health Analytics Custom Module: {response}")
423+
return response
424+
425+
# [END securitycenter_simulate_security_health_analytics_custom_module]

securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,9 @@ def teardown():
6060

6161

6262
def setup_shared_modules():
63-
for _ in range(3) :
64-
_, module_id = add_custom_module(ORGANIZATION_ID)
65-
if module_id != "" :
66-
shared_modules.append(module_id)
63+
_, module_id = add_custom_module(ORGANIZATION_ID)
64+
if module_id != "" :
65+
shared_modules.append(module_id)
6766

6867

6968
def add_module_to_cleanup(module_id):
@@ -132,7 +131,17 @@ def extract_custom_module_id(module_name):
132131
return ""
133132

134133

134+
@backoff.on_exception(
135+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
136+
)
135137
def add_custom_module(org_id: str):
138+
"""
139+
Adds a new SHA custom module.
140+
Args:
141+
org_id (str): The organization ID.
142+
Returns:
143+
Tuple[str, str]: The name and ID of the created custom module.
144+
"""
136145

137146
parent = f"organizations/{org_id}/locations/global"
138147
client = securitycentermanagement_v1.SecurityCenterManagementClient()
@@ -146,24 +155,24 @@ def add_custom_module(org_id: str):
146155
"display_name": display_name,
147156
"enablement_state": "ENABLED",
148157
"custom_config": {
149-
"description": "Sample custom module for testing purpose. Please do not delete.",
158+
"description": "Sample custom module for testing purposes. Please do not delete.",
150159
"predicate": {
151160
"expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))",
152-
"title": "GCE Instance High Severity",
153-
"description": "Custom module to detect high severity issues on GCE instances.",
161+
"title": "Cloud KMS CryptoKey Rotation Period",
162+
"description": "Custom module to detect CryptoKeys with rotation period greater than 30 days.",
154163
},
155-
"recommendation": "Ensure proper security configurations on GCE instances.",
164+
"recommendation": "Review and adjust the rotation period for Cloud KMS CryptoKeys.",
156165
"resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]},
157166
"severity": "CRITICAL",
158167
"custom_output": {
159168
"properties": [
160169
{
161170
"name": "example_property",
162171
"value_expression": {
163-
"description": "The name of the instance",
172+
"description": "The resource name of the CryptoKey",
164173
"expression": "resource.name",
165174
"location": "global",
166-
"title": "Instance Name",
175+
"title": "CryptoKey Resource Name",
167176
},
168177
}
169178
]
@@ -211,7 +220,8 @@ def test_get_security_health_analytics_custom_module():
211220

212221
assert response is not None, "Failed to retrieve the custom module."
213222
assert response.display_name.startswith(PREFIX)
214-
assert response.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
223+
response_org_id = response.name.split("/")[1] # Extract organization ID from the name field
224+
assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}."
215225

216226

217227
@backoff.on_exception(
@@ -229,6 +239,7 @@ def test_delete_security_health_analytics_custom_module():
229239
assert response is None
230240

231241
print(f"Custom module was deleted successfully: {module_id}")
242+
shared_modules.remove(module_id)
232243

233244

234245
@backoff.on_exception(
@@ -249,12 +260,74 @@ def test_list_security_health_analytics_custom_module():
249260
)
250261
def test_update_security_health_analytics_custom_module():
251262

252-
module_id = get_random_shared_module()
253263
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
264+
response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent)
265+
module_id = extract_custom_module_id(response.name)
266+
add_module_to_cleanup(module_id)
267+
254268
# Retrieve the custom modules
255269
updated_custom_module = security_health_analytics_custom_modules.update_security_health_analytics_custom_module(parent, module_id)
256270

257271
assert updated_custom_module is not None, "Failed to retrieve the updated custom module."
258272
response_org_id = updated_custom_module.name.split("/")[1] # Extract organization ID from the name field
259273
assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}."
260274
assert updated_custom_module.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.DISABLED
275+
276+
277+
@backoff.on_exception(
278+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
279+
)
280+
def test_get_effective_security_health_analytics_custom_module():
281+
"""Tests getting an effective SHA custom module."""
282+
module_id = get_random_shared_module()
283+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
284+
285+
# Retrieve the custom module
286+
response = security_health_analytics_custom_modules.get_effective_security_health_analytics_custom_module(parent, module_id)
287+
288+
assert response is not None, "Failed to retrieve the custom module."
289+
# Verify that the custom module was created
290+
assert response.display_name.startswith(PREFIX)
291+
response_org_id = response.name.split("/")[1] # Extract organization ID from the name field
292+
assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}."
293+
294+
295+
@backoff.on_exception(
296+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
297+
)
298+
def test_list_descendant_security_health_analytics_custom_module():
299+
"""Tests listing descendant SHA custom modules."""
300+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
301+
# Retrieve the list descendant custom modules
302+
custom_modules = security_health_analytics_custom_modules.list_descendant_security_health_analytics_custom_module(parent)
303+
304+
assert custom_modules is not None, "Failed to retrieve the custom modules."
305+
assert len(custom_modules) > 0, "No custom modules were retrieved."
306+
307+
308+
@backoff.on_exception(
309+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
310+
)
311+
def test_list_effective_security_health_analytics_custom_module():
312+
"""Tests listing effective SHA custom modules."""
313+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
314+
# Retrieve the list of custom modules
315+
custom_modules = security_health_analytics_custom_modules.list_effective_security_health_analytics_custom_module(parent)
316+
317+
assert custom_modules is not None, "Failed to retrieve the custom modules."
318+
assert len(custom_modules) > 0, "No custom modules were retrieved."
319+
320+
321+
@backoff.on_exception(
322+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
323+
)
324+
def test_simulate_security_health_analytics_custom_module():
325+
"""Tests simulating an SHA custom module."""
326+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
327+
328+
simulated_custom_module = security_health_analytics_custom_modules.simulate_security_health_analytics_custom_module(parent)
329+
330+
assert simulated_custom_module is not None, "Failed to retrieve the simulated custom module."
331+
assert simulated_custom_module.result.no_violation is not None, (
332+
f"Expected no_violation to be present, got {simulated_custom_module.result}."
333+
)

0 commit comments

Comments
 (0)