Skip to content

Commit e038b2f

Browse files
MrCloudSecandoniaf
andauthored
chore(sdk): add validation for invalid checks, services, and categories (#8971)
Co-authored-by: Andoni Alonso <14891798+andoniaf@users.noreply.github.com>
1 parent 2e5f175 commit e038b2f

File tree

3 files changed

+214
-7
lines changed

3 files changed

+214
-7
lines changed

prowler/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
1414
- C5 compliance framework for Azure provider [(#9081)](https://github.com/prowler-cloud/prowler/pull/9081)
1515
- C5 compliance framework for the GCP provider [(#9097)](https://github.com/prowler-cloud/prowler/pull/9097)
1616
- HIPAA compliance framework for the GCP provider [(#8955)](https://github.com/prowler-cloud/prowler/pull/8955)
17+
- Added validation for invalid checks, services, and categories in `load_checks_to_execute` function [(#8971)](https://github.com/prowler-cloud/prowler/pull/8971)
1718

1819
### Changed
1920
- Update AWS Direct Connect service metadata to new format [(#8855)](https://github.com/prowler-cloud/prowler/pull/8855)

prowler/lib/check/checks_loader.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import sys
2+
13
from colorama import Fore, Style
24

35
from prowler.lib.check.check import parse_checks_from_file
@@ -57,15 +59,48 @@ def load_checks_to_execute(
5759

5860
# Handle if there are checks passed using -c/--checks
5961
if check_list:
62+
# Validate that all checks exist
63+
available_checks = set(bulk_checks_metadata.keys())
64+
available_checks.update(check_aliases.keys())
65+
invalid_checks = []
6066
for check_name in check_list:
61-
checks_to_execute.add(check_name)
67+
if check_name not in available_checks:
68+
invalid_checks.append(check_name)
69+
else:
70+
checks_to_execute.add(check_name)
71+
72+
if invalid_checks:
73+
logger.critical(
74+
f"Invalid check(s) specified: {', '.join(invalid_checks)}"
75+
)
76+
logger.critical(
77+
f"Please provide valid check names. Use 'prowler {provider} --list-checks' to see available checks."
78+
)
79+
sys.exit(1)
6280

6381
# Handle if there are some severities passed using --severity
6482
elif severities:
6583
for severity in severities:
6684
checks_to_execute.update(check_severities[severity])
6785

6886
if service_list:
87+
# Validate that all services exist
88+
available_services = set()
89+
for metadata in bulk_checks_metadata.values():
90+
available_services.add(metadata.ServiceName)
91+
92+
invalid_services = [
93+
s for s in service_list if s not in available_services
94+
]
95+
if invalid_services:
96+
logger.critical(
97+
f"Invalid service(s) specified: {', '.join(invalid_services)}"
98+
)
99+
logger.critical(
100+
f"Please provide valid service names. Use 'prowler {provider} --list-services' to see available services."
101+
)
102+
sys.exit(1)
103+
69104
checks_from_services = set()
70105
for service in service_list:
71106
service_checks = CheckMetadata.list(
@@ -81,6 +116,21 @@ def load_checks_to_execute(
81116

82117
# Handle if there are services passed using -s/--services
83118
elif service_list:
119+
# Validate that all services exist
120+
available_services = set()
121+
for metadata in bulk_checks_metadata.values():
122+
available_services.add(metadata.ServiceName)
123+
124+
invalid_services = [s for s in service_list if s not in available_services]
125+
if invalid_services:
126+
logger.critical(
127+
f"Invalid service(s) specified: {', '.join(invalid_services)}"
128+
)
129+
logger.critical(
130+
f"Please provide valid service names. Use 'prowler {provider} --list-services' to see available services."
131+
)
132+
sys.exit(1)
133+
84134
for service in service_list:
85135
checks_to_execute.update(
86136
CheckMetadata.list(
@@ -103,6 +153,20 @@ def load_checks_to_execute(
103153

104154
# Handle if there are categories passed using --categories
105155
elif categories:
156+
# Validate that all categories exist
157+
available_categories = set(check_categories.keys())
158+
invalid_categories = [
159+
c for c in categories if c not in available_categories
160+
]
161+
if invalid_categories:
162+
logger.critical(
163+
f"Invalid category(ies) specified: {', '.join(invalid_categories)}"
164+
)
165+
logger.critical(
166+
f"Please provide valid category names. Use 'prowler {provider} --list-categories' to see available categories."
167+
)
168+
sys.exit(1)
169+
106170
for category in categories:
107171
checks_to_execute.update(check_categories[category])
108172

tests/lib/check/check_loader_test.py

Lines changed: 148 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pytest
12
from mock import patch
23

34
from prowler.lib.check.checks_loader import (
@@ -190,18 +191,22 @@ def test_load_checks_to_execute_with_severities_and_services_multiple(self):
190191
def test_load_checks_to_execute_with_severities_and_services_not_within_severity(
191192
self,
192193
):
194+
"""Test that service not in metadata causes sys.exit(1) when used with severities"""
193195
bulk_checks_metatada = {
194196
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
195197
}
196198
service_list = ["ec2"]
197199
severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY]
198200

199-
assert set() == load_checks_to_execute(
200-
bulk_checks_metadata=bulk_checks_metatada,
201-
service_list=service_list,
202-
severities=severities,
203-
provider=self.provider,
204-
)
201+
# ec2 service doesn't exist in the metadata, so it should exit with error
202+
with pytest.raises(SystemExit) as exc_info:
203+
load_checks_to_execute(
204+
bulk_checks_metadata=bulk_checks_metatada,
205+
service_list=service_list,
206+
severities=severities,
207+
provider=self.provider,
208+
)
209+
assert exc_info.value.code == 1
205210

206211
def test_load_checks_to_execute_with_checks_file(
207212
self,
@@ -382,3 +387,140 @@ def test_threat_detection_single_check(self):
382387
categories=categories,
383388
provider=self.provider,
384389
)
390+
391+
def test_load_checks_to_execute_with_invalid_check(self):
392+
"""Test that invalid check names cause sys.exit(1)"""
393+
bulk_checks_metatada = {
394+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
395+
}
396+
check_list = ["invalid_check_name"]
397+
398+
with pytest.raises(SystemExit) as exc_info:
399+
load_checks_to_execute(
400+
bulk_checks_metadata=bulk_checks_metatada,
401+
check_list=check_list,
402+
provider=self.provider,
403+
)
404+
assert exc_info.value.code == 1
405+
406+
def test_load_checks_to_execute_with_multiple_invalid_checks(self):
407+
"""Test that multiple invalid check names cause sys.exit(1)"""
408+
bulk_checks_metatada = {
409+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
410+
}
411+
check_list = ["invalid_check_1", "invalid_check_2", "invalid_check_3"]
412+
413+
with pytest.raises(SystemExit) as exc_info:
414+
load_checks_to_execute(
415+
bulk_checks_metadata=bulk_checks_metatada,
416+
check_list=check_list,
417+
provider=self.provider,
418+
)
419+
assert exc_info.value.code == 1
420+
421+
def test_load_checks_to_execute_with_mixed_valid_invalid_checks(self):
422+
"""Test that mix of valid and invalid checks cause sys.exit(1)"""
423+
bulk_checks_metatada = {
424+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
425+
}
426+
check_list = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME, "invalid_check"]
427+
428+
with pytest.raises(SystemExit) as exc_info:
429+
load_checks_to_execute(
430+
bulk_checks_metadata=bulk_checks_metatada,
431+
check_list=check_list,
432+
provider=self.provider,
433+
)
434+
assert exc_info.value.code == 1
435+
436+
def test_load_checks_to_execute_with_invalid_service(self):
437+
"""Test that invalid service names cause sys.exit(1)"""
438+
bulk_checks_metatada = {
439+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
440+
}
441+
service_list = ["invalid_service"]
442+
443+
with pytest.raises(SystemExit) as exc_info:
444+
load_checks_to_execute(
445+
bulk_checks_metadata=bulk_checks_metatada,
446+
service_list=service_list,
447+
provider=self.provider,
448+
)
449+
assert exc_info.value.code == 1
450+
451+
def test_load_checks_to_execute_with_invalid_service_and_severity(self):
452+
"""Test that invalid service names with severity cause sys.exit(1)"""
453+
bulk_checks_metatada = {
454+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
455+
}
456+
service_list = ["invalid_service"]
457+
severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY]
458+
459+
with pytest.raises(SystemExit) as exc_info:
460+
load_checks_to_execute(
461+
bulk_checks_metadata=bulk_checks_metatada,
462+
service_list=service_list,
463+
severities=severities,
464+
provider=self.provider,
465+
)
466+
assert exc_info.value.code == 1
467+
468+
def test_load_checks_to_execute_with_multiple_invalid_services(self):
469+
"""Test that multiple invalid service names cause sys.exit(1)"""
470+
bulk_checks_metatada = {
471+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
472+
}
473+
service_list = ["invalid_service_1", "invalid_service_2"]
474+
475+
with pytest.raises(SystemExit) as exc_info:
476+
load_checks_to_execute(
477+
bulk_checks_metadata=bulk_checks_metatada,
478+
service_list=service_list,
479+
provider=self.provider,
480+
)
481+
assert exc_info.value.code == 1
482+
483+
def test_load_checks_to_execute_with_invalid_category(self):
484+
"""Test that invalid category names cause sys.exit(1)"""
485+
bulk_checks_metatada = {
486+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
487+
}
488+
categories = {"invalid_category"}
489+
490+
with pytest.raises(SystemExit) as exc_info:
491+
load_checks_to_execute(
492+
bulk_checks_metadata=bulk_checks_metatada,
493+
categories=categories,
494+
provider=self.provider,
495+
)
496+
assert exc_info.value.code == 1
497+
498+
def test_load_checks_to_execute_with_multiple_invalid_categories(self):
499+
"""Test that multiple invalid category names cause sys.exit(1)"""
500+
bulk_checks_metatada = {
501+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
502+
}
503+
categories = {"invalid_category_1", "invalid_category_2"}
504+
505+
with pytest.raises(SystemExit) as exc_info:
506+
load_checks_to_execute(
507+
bulk_checks_metadata=bulk_checks_metatada,
508+
categories=categories,
509+
provider=self.provider,
510+
)
511+
assert exc_info.value.code == 1
512+
513+
def test_load_checks_to_execute_with_mixed_valid_invalid_categories(self):
514+
"""Test that mix of valid and invalid categories cause sys.exit(1)"""
515+
bulk_checks_metatada = {
516+
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_s3_metadata()
517+
}
518+
categories = {"internet-exposed", "invalid_category"}
519+
520+
with pytest.raises(SystemExit) as exc_info:
521+
load_checks_to_execute(
522+
bulk_checks_metadata=bulk_checks_metatada,
523+
categories=categories,
524+
provider=self.provider,
525+
)
526+
assert exc_info.value.code == 1

0 commit comments

Comments
 (0)