Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions api/src/backend/tasks/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,172 @@ def test_generate_outputs_happy_path(
output_location="s3://bucket/zipped.zip"
)

@patch("tasks.tasks._upload_to_s3")
@patch("tasks.tasks._compress_output_files")
@patch("tasks.tasks.get_compliance_frameworks")
@patch("tasks.tasks.Compliance.get_bulk")
@patch("tasks.tasks.initialize_prowler_provider")
@patch("tasks.tasks.Provider.objects.get")
@patch("tasks.tasks.ScanSummary.objects.filter")
@patch("tasks.tasks.Finding.all_objects.filter")
def test_generate_outputs_accepts_legacy_persisted_check_metadata(
self,
mock_finding_filter,
mock_scan_summary_filter,
mock_provider_get,
mock_initialize_provider,
mock_compliance_get_bulk,
mock_get_available_frameworks,
mock_compress,
mock_upload,
):
mock_scan_summary_filter.return_value.exists.return_value = True

mock_provider = MagicMock()
mock_provider.uid = "azure-subscription-123"
mock_provider.provider = "azure"
mock_provider_get.return_value = mock_provider

prowler_provider = MagicMock()
prowler_provider.type = "azure"
prowler_provider.identity.identity_type = "mock_identity_type"
prowler_provider.identity.identity_id = "mock_identity_id"
prowler_provider.identity.subscriptions = {
"legacy-subscription": "legacy-sub-id"
}
prowler_provider.identity.tenant_ids = ["test-ing-432a-a828-d9c965196f87"]
prowler_provider.identity.tenant_domain = "mock_tenant_domain"
prowler_provider.region_config.name = "AzureCloud"
mock_initialize_provider.return_value = prowler_provider

mock_compliance_get_bulk.return_value = {}
mock_get_available_frameworks.return_value = []

resource = MagicMock()
resource.uid = (
"/subscriptions/legacy-sub-id/providers/Microsoft.Authorization/"
"policyAssignments/legacy"
)
resource.name = "legacy-policy"
resource.region = "global"
resource.metadata = "{}"
resource.details = ""
resource.tags.all.return_value = [MagicMock(key="env", value="prod")]

dummy_finding = MagicMock()
dummy_finding.uid = "finding-uid-legacy"
dummy_finding.status = "FAIL"
dummy_finding.status_extended = "Legacy metadata finding"
dummy_finding.muted = False
dummy_finding.compliance = {}
dummy_finding.raw_result = {}
dummy_finding.check_id = (
"entra_conditional_access_policy_require_mfa_for_management_api"
)
dummy_finding.check_metadata = {
"provider": "azure",
"checkid": "entra_conditional_access_policy_require_mfa_for_management_api",
"checktitle": "Ensure Multifactor Authentication is Required for Windows Azure Service Management API",
"checktype": [],
"servicename": "entra",
"subservicename": "",
"severity": "medium",
"resourcetype": "#microsoft.graph.conditionalAccess",
"resourcegroup": "IAM",
"description": "Legacy description",
"risk": "Legacy risk",
"relatedurl": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management",
"additionalurls": [
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
],
"remediation": {
"code": {
"cli": "",
"other": "",
"nativeiac": "",
"terraform": "",
},
"recommendation": {
"text": "Legacy remediation",
"url": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps",
},
},
"resourceidtemplate": "",
"categories": [],
"dependson": [],
"relatedto": [],
"notes": "Legacy notes",
}
dummy_finding.resources.first.return_value = resource

mock_finding_filter.return_value.order_by.return_value.iterator.return_value = [
dummy_finding
]

writer_instances = []

def writer_factory(*args, **kwargs):
writer = MagicMock()
writer._data = []
writer.transform = MagicMock()
writer.batch_write_data_to_file = MagicMock()
writer.findings = kwargs["findings"]
writer_instances.append(writer)
return writer

with (
patch(
"tasks.tasks.FindingOutput._transform_findings_stats",
return_value={"some": "stats"},
),
patch(
"tasks.tasks.OUTPUT_FORMATS_MAPPING",
{
"json": {
"class": writer_factory,
"suffix": ".json",
"kwargs": {},
}
},
),
patch(
"tasks.tasks._generate_output_directory",
return_value=(
"/tmp/test/out-dir",
"/tmp/test/comp-dir",
),
),
patch("tasks.tasks.Scan.all_objects.filter") as mock_scan_update,
patch("tasks.tasks.rmtree"),
):
mock_compress.return_value = "/tmp/zipped.zip"
mock_upload.return_value = "s3://bucket/zipped.zip"

result = generate_outputs_task(
scan_id=self.scan_id,
provider_id=self.provider_id,
tenant_id=self.tenant_id,
)

assert result == {"upload": True}
assert len(writer_instances) == 1

transformed_finding = writer_instances[0].findings[0]
assert transformed_finding.metadata.CheckTitle.startswith("Ensure")
assert (
transformed_finding.metadata.RelatedUrl
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management"
)
assert (
transformed_finding.metadata.Remediation.Recommendation.Url
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
)
assert transformed_finding.metadata.Severity.value == "medium"

mock_scan_update.return_value.update.assert_called_once_with(
output_location="s3://bucket/zipped.zip"
)

def test_generate_outputs_fails_upload(self):
with (
patch("tasks.tasks.ScanSummary.objects.filter") as mock_filter,
Expand Down
97 changes: 62 additions & 35 deletions prowler/lib/outputs/finding.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Code,
Recommendation,
Remediation,
Severity,
)
from prowler.lib.logger import logger
from prowler.lib.outputs.common import Status, fill_common_finding_data
Expand Down Expand Up @@ -536,47 +537,73 @@ def transform_api_finding(cls, finding, provider) -> "Finding":
finding.zone_name = getattr(resource, "zone_name", resource.name)
finding.account_id = getattr(finding, "account_id", "")

finding.check_metadata = CheckMetadata(
Provider=finding.check_metadata["provider"],
CheckID=finding.check_metadata["checkid"],
CheckTitle=finding.check_metadata["checktitle"],
CheckType=finding.check_metadata["checktype"],
ServiceName=finding.check_metadata["servicename"],
SubServiceName=finding.check_metadata["subservicename"],
Severity=finding.check_metadata["severity"],
ResourceType=finding.check_metadata["resourcetype"],
Description=finding.check_metadata["description"],
Risk=finding.check_metadata["risk"],
RelatedUrl=finding.check_metadata["relatedurl"],
Remediation=Remediation(
metadata_kwargs = cls._get_api_check_metadata_kwargs(finding.check_metadata)
try:
finding.check_metadata = CheckMetadata(**metadata_kwargs)
except ValidationError as validation_error:
check_id = metadata_kwargs.get("CheckID", getattr(finding, "check_id", ""))
logger.warning(
"Legacy persisted check metadata failed validation during API finding transformation "
f"for {check_id}. Falling back to compatibility mode. Errors: {validation_error.errors()}"
)
finding.check_metadata = cls._construct_legacy_check_metadata(
metadata_kwargs
)
finding.resource_tags = unroll_tags(
[{"key": tag.key, "value": tag.value} for tag in resource.tags.all()]
)

return cls.generate_output(provider, finding, SimpleNamespace())

@staticmethod
def _get_api_check_metadata_kwargs(check_metadata: dict) -> dict:
remediation = check_metadata["remediation"]
remediation_code = remediation["code"]
remediation_recommendation = remediation["recommendation"]

return {
"Provider": check_metadata["provider"],
"CheckID": check_metadata["checkid"],
"CheckTitle": check_metadata["checktitle"],
"CheckType": check_metadata["checktype"],
"CheckAliases": check_metadata.get("checkaliases", []),
"ServiceName": check_metadata["servicename"],
"SubServiceName": check_metadata["subservicename"],
"Severity": check_metadata["severity"],
"ResourceType": check_metadata["resourcetype"],
"ResourceGroup": check_metadata.get("resourcegroup", ""),
"Description": check_metadata["description"],
"Risk": check_metadata["risk"],
"RelatedUrl": check_metadata["relatedurl"],
"Remediation": Remediation(
Recommendation=Recommendation(
Text=finding.check_metadata["remediation"]["recommendation"][
"text"
],
Url=finding.check_metadata["remediation"]["recommendation"]["url"],
Text=remediation_recommendation["text"],
Url=remediation_recommendation["url"],
),
Code=Code(
NativeIaC=finding.check_metadata["remediation"]["code"][
"nativeiac"
],
Terraform=finding.check_metadata["remediation"]["code"][
"terraform"
],
CLI=finding.check_metadata["remediation"]["code"]["cli"],
Other=finding.check_metadata["remediation"]["code"]["other"],
NativeIaC=remediation_code["nativeiac"],
Terraform=remediation_code["terraform"],
CLI=remediation_code["cli"],
Other=remediation_code["other"],
),
),
ResourceIdTemplate=finding.check_metadata["resourceidtemplate"],
Categories=finding.check_metadata["categories"],
DependsOn=finding.check_metadata["dependson"],
RelatedTo=finding.check_metadata["relatedto"],
Notes=finding.check_metadata["notes"],
)
finding.resource_tags = unroll_tags(
[{"key": tag.key, "value": tag.value} for tag in resource.tags.all()]
)
"ResourceIdTemplate": check_metadata["resourceidtemplate"],
"AdditionalURLs": check_metadata.get("additionalurls", []),
"Categories": check_metadata["categories"],
"DependsOn": check_metadata["dependson"],
"RelatedTo": check_metadata["relatedto"],
"Notes": check_metadata["notes"],
"Compliance": check_metadata.get("compliance", []),
}

return cls.generate_output(provider, finding, SimpleNamespace())
@staticmethod
def _construct_legacy_check_metadata(metadata_kwargs: dict) -> CheckMetadata:
severity = metadata_kwargs["Severity"]
if not isinstance(severity, Severity):
severity = Severity(severity)

legacy_metadata_kwargs = {**metadata_kwargs, "Severity": severity}
return CheckMetadata.construct(**legacy_metadata_kwargs)

def _transform_findings_stats(scan_summaries: list[dict]) -> dict:
"""
Expand Down
89 changes: 89 additions & 0 deletions tests/lib/outputs/finding_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,95 @@ def test_transform_api_finding_azure(self):
for segment in expected_segments:
assert segment in finding_obj.uid

@patch(
"prowler.lib.outputs.finding.get_check_compliance",
new=mock_get_check_compliance,
)
def test_transform_api_finding_azure_accepts_legacy_persisted_metadata(self):
provider = MagicMock()
provider.type = "azure"
provider.identity.identity_type = "mock_identity_type"
provider.identity.identity_id = "mock_identity_id"
provider.identity.subscriptions = {"legacy-subscription": "legacy-sub-id"}
provider.identity.tenant_ids = ["test-ing-432a-a828-d9c965196f87"]
provider.identity.tenant_domain = "mock_tenant_domain"
provider.region_config.name = "AzureCloud"

api_finding = DummyAPIFinding()
api_finding.uid = "finding-uid-legacy"
api_finding.status = "FAIL"
api_finding.status_extended = "Legacy metadata finding"
api_finding.check_id = (
"entra_conditional_access_policy_require_mfa_for_management_api"
)
api_finding.check_metadata = {
"provider": "azure",
"checkid": "entra_conditional_access_policy_require_mfa_for_management_api",
"checktitle": "Ensure Multifactor Authentication is Required for Windows Azure Service Management API",
"checktype": [],
"servicename": "entra",
"subservicename": "",
"severity": "medium",
"resourcetype": "#microsoft.graph.conditionalAccess",
"resourcegroup": "IAM",
"description": "Legacy description",
"risk": "Legacy risk",
"relatedurl": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management",
"additionalurls": [
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
],
"remediation": {
"code": {
"cli": "",
"other": "",
"nativeiac": "",
"terraform": "",
},
"recommendation": {
"text": "Legacy remediation",
"url": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps",
},
},
"resourceidtemplate": "",
"categories": [],
"dependson": [],
"relatedto": [],
"notes": "Legacy notes",
}
api_finding.muted = False
api_resource = DummyResource(
uid="/subscriptions/legacy-sub-id/providers/Microsoft.Authorization/policyAssignments/legacy",
name="legacy-policy",
resource_arn="arn",
region="global",
tags=[],
)
api_finding.resources = DummyResources(api_resource)

finding_obj = Finding.transform_api_finding(api_finding, provider)

assert finding_obj.account_uid == "legacy-sub-id"
assert finding_obj.resource_uid == api_resource.uid
assert finding_obj.resource_name == api_resource.name

meta = finding_obj.metadata
assert (
meta.CheckTitle
== "Ensure Multifactor Authentication is Required for Windows Azure Service Management API"
)
assert (
meta.RelatedUrl
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management"
)
assert (
meta.Remediation.Recommendation.Url
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
)
assert meta.ResourceGroup == "IAM"
assert meta.AdditionalURLs == [
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
]

@patch(
"prowler.lib.outputs.finding.get_check_compliance",
new=mock_get_check_compliance,
Expand Down
Loading