Skip to content

Commit b512f6c

Browse files
fix(firehose): false positive in firehose_stream_encrypted_at_rest (#8599)
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
1 parent c4a8771 commit b512f6c

File tree

18 files changed

+721
-160
lines changed

18 files changed

+721
-160
lines changed

prowler/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
1818

1919
### Fixed
2020
- Replaced old check id with new ones for compliance files [(#8682)](https://github.com/prowler-cloud/prowler/pull/8682)
21+
- `firehose_stream_encrypted_at_rest` check false positives and new api call in kafka service [(#8599)](https://github.com/prowler-cloud/prowler/pull/8599)
2122
- Replace defender rules policies key to use old name [(#8702)](https://github.com/prowler-cloud/prowler/pull/8702)
2223

2324
## [v5.12.0] (Prowler v5.12.0)

prowler/providers/aws/services/firehose/firehose_stream_encrypted_at_rest/firehose_stream_encrypted_at_rest.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from prowler.lib.check.models import Check, Check_Report_AWS
44
from prowler.providers.aws.services.firehose.firehose_client import firehose_client
55
from prowler.providers.aws.services.firehose.firehose_service import EncryptionStatus
6+
from prowler.providers.aws.services.kafka.kafka_client import kafka_client
67
from prowler.providers.aws.services.kinesis.kinesis_client import kinesis_client
78
from prowler.providers.aws.services.kinesis.kinesis_service import EncryptionType
89

@@ -37,7 +38,28 @@ def execute(self) -> List[Check_Report_AWS]:
3738
report.status = "PASS"
3839
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled but the source stream {source_stream.name} has at rest encryption enabled."
3940

40-
# Check if the stream has encryption enabled directly
41+
# MSK source - check if the MSK cluster has encryption at rest with CMK
42+
elif stream.delivery_stream_type == "MSKAsSource":
43+
msk_cluster_arn = stream.source.msk.msk_cluster_arn
44+
if msk_cluster_arn:
45+
msk_cluster = None
46+
for cluster in kafka_client.clusters.values():
47+
if cluster.arn == msk_cluster_arn:
48+
msk_cluster = cluster
49+
break
50+
51+
if msk_cluster:
52+
# All MSK clusters (both provisioned and serverless) always have encryption at rest enabled by AWS
53+
# AWS MSK always encrypts data at rest - either with AWS managed keys or CMK
54+
report.status = "PASS"
55+
if msk_cluster.kafka_version == "SERVERLESS":
56+
report.status_extended = f"Firehose Stream {stream.name} uses MSK serverless source which always has encryption at rest enabled by default."
57+
else:
58+
report.status_extended = f"Firehose Stream {stream.name} uses MSK provisioned source which always has encryption at rest enabled by AWS (either with AWS managed keys or CMK)."
59+
else:
60+
report.status_extended = f"Firehose Stream {stream.name} uses MSK source which always has encryption at rest enabled by AWS."
61+
62+
# Check if the stream has encryption enabled directly (DirectPut or DatabaseAsSource cases)
4163
elif stream.kms_encryption == EncryptionStatus.ENABLED:
4264
report.status = "PASS"
4365
report.status_extended = f"Firehose Stream {stream.name} does have at rest encryption enabled."

prowler/providers/aws/services/kafka/kafka_cluster_encryption_at_rest_uses_cmk/kafka_cluster_encryption_at_rest_uses_cmk.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ def execute(self):
1212
report.status = "FAIL"
1313
report.status_extended = f"Kafka cluster '{cluster.name}' does not have encryption at rest enabled with a CMK."
1414

15-
if any(
15+
# Serverless clusters always have encryption at rest enabled by default
16+
if cluster.kafka_version == "SERVERLESS":
17+
report.status = "PASS"
18+
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has encryption at rest enabled by default."
19+
# For provisioned clusters, check if they use a customer managed KMS key
20+
elif any(
1621
(
1722
cluster.data_volume_kms_key_id == key.arn
1823
and getattr(key, "manager", "") == "CUSTOMER"

prowler/providers/aws/services/kafka/kafka_cluster_enhanced_monitoring_enabled/kafka_cluster_enhanced_monitoring_enabled.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ def execute(self):
1313
f"Kafka cluster '{cluster.name}' has enhanced monitoring enabled."
1414
)
1515

16-
if cluster.enhanced_monitoring == "DEFAULT":
16+
# Serverless clusters always have enhanced monitoring enabled by default
17+
if cluster.kafka_version == "SERVERLESS":
18+
report.status = "PASS"
19+
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has enhanced monitoring enabled by default."
20+
# For provisioned clusters, check the enhanced monitoring configuration
21+
elif cluster.enhanced_monitoring == "DEFAULT":
1722
report.status = "FAIL"
1823
report.status_extended = f"Kafka cluster '{cluster.name}' does not have enhanced monitoring enabled."
1924

prowler/providers/aws/services/kafka/kafka_cluster_in_transit_encryption_enabled/kafka_cluster_in_transit_encryption_enabled.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ def execute(self):
1111
report.status = "FAIL"
1212
report.status_extended = f"Kafka cluster '{cluster.name}' does not have encryption in transit enabled."
1313

14-
if (
14+
# Serverless clusters always have encryption in transit enabled by default
15+
if cluster.kafka_version == "SERVERLESS":
16+
report.status = "PASS"
17+
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has encryption in transit enabled by default."
18+
# For provisioned clusters, check the encryption configuration
19+
elif (
1520
cluster.encryption_in_transit.client_broker == "TLS"
1621
and cluster.encryption_in_transit.in_cluster
1722
):

prowler/providers/aws/services/kafka/kafka_cluster_is_public/kafka_cluster_is_public.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ def execute(self):
1313
f"Kafka cluster {cluster.name} is publicly accessible."
1414
)
1515

16-
if not cluster.public_access:
16+
# Serverless clusters are always private by default
17+
if cluster.kafka_version == "SERVERLESS":
18+
report.status = "PASS"
19+
report.status_extended = f"Kafka cluster {cluster.name} is serverless and always private by default."
20+
# For provisioned clusters, check the public access configuration
21+
elif not cluster.public_access:
1722
report.status = "PASS"
1823
report.status_extended = (
1924
f"Kafka cluster {cluster.name} is not publicly accessible."

prowler/providers/aws/services/kafka/kafka_cluster_mutual_tls_authentication_enabled/kafka_cluster_mutual_tls_authentication_enabled.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ def execute(self):
1111
report.status = "FAIL"
1212
report.status_extended = f"Kafka cluster '{cluster.name}' does not have mutual TLS authentication enabled."
1313

14-
if cluster.tls_authentication:
14+
# Serverless clusters always have TLS authentication enabled by default
15+
if cluster.kafka_version == "SERVERLESS":
16+
report.status = "PASS"
17+
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has TLS authentication enabled by default."
18+
# For provisioned clusters, check the TLS configuration
19+
elif cluster.tls_authentication:
1520
report.status = "PASS"
1621
report.status_extended = f"Kafka cluster '{cluster.name}' has mutual TLS authentication enabled."
1722

prowler/providers/aws/services/kafka/kafka_cluster_unrestricted_access_disabled/kafka_cluster_unrestricted_access_disabled.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ def execute(self):
1313
f"Kafka cluster '{cluster.name}' has unrestricted access enabled."
1414
)
1515

16-
if not cluster.unauthentication_access:
16+
# Serverless clusters always require authentication by default
17+
if cluster.kafka_version == "SERVERLESS":
18+
report.status = "PASS"
19+
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always requires authentication by default."
20+
# For provisioned clusters, check the unauthenticated access configuration
21+
elif not cluster.unauthentication_access:
1722
report.status = "PASS"
1823
report.status_extended = f"Kafka cluster '{cluster.name}' does not have unrestricted access enabled."
1924

prowler/providers/aws/services/kafka/kafka_cluster_uses_latest_version/kafka_cluster_uses_latest_version.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ def execute(self):
1313
f"Kafka cluster '{cluster.name}' is using the latest version."
1414
)
1515

16-
if cluster.kafka_version != kafka_client.kafka_versions[-1].version:
16+
# Serverless clusters don't have specific Kafka versions - AWS manages them automatically
17+
if cluster.kafka_version == "SERVERLESS":
18+
report.status = "PASS"
19+
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and AWS automatically manages the Kafka version."
20+
# For provisioned clusters, check if they're using the latest version
21+
elif cluster.kafka_version != kafka_client.kafka_versions[-1].version:
1722
report.status = "FAIL"
1823
report.status_extended = (
1924
f"Kafka cluster '{cluster.name}' is not using the latest version."

prowler/providers/aws/services/kafka/kafka_service.py

Lines changed: 109 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,61 +15,133 @@ def __init__(self, provider):
1515
self.__threading_call__(self._list_kafka_versions)
1616

1717
def _list_clusters(self, regional_client):
18+
logger.info(f"Kafka - Listing clusters in region {regional_client.region}...")
1819
try:
19-
cluster_paginator = regional_client.get_paginator("list_clusters")
20+
# Use list_clusters_v2 to support both provisioned and serverless clusters
21+
cluster_paginator = regional_client.get_paginator("list_clusters_v2")
22+
logger.info(
23+
f"Kafka - Paginator created for region {regional_client.region}"
24+
)
2025

2126
for page in cluster_paginator.paginate():
27+
logger.info(
28+
f"Kafka - Processing page with {len(page.get('ClusterInfoList', []))} clusters in region {regional_client.region}"
29+
)
2230
for cluster in page["ClusterInfoList"]:
31+
logger.info(
32+
f"Kafka - Found cluster: {cluster.get('ClusterName', 'Unknown')} in region {regional_client.region}"
33+
)
2334
arn = cluster.get(
2435
"ClusterArn",
2536
f"{self.account_arn_template}/{cluster.get('ClusterName', '')}",
2637
)
38+
cluster_type = cluster.get("ClusterType", "UNKNOWN")
2739

2840
if not self.audit_resources or is_resource_filtered(
2941
arn, self.audit_resources
3042
):
31-
self.clusters[cluster.get("ClusterArn", "")] = Cluster(
32-
id=arn.split(":")[-1].split("/")[-1],
33-
name=cluster.get("ClusterName", ""),
34-
arn=arn,
35-
region=regional_client.region,
36-
tags=list(cluster.get("Tags", {})),
37-
state=cluster.get("State", ""),
38-
kafka_version=cluster.get(
39-
"CurrentBrokerSoftwareInfo", {}
40-
).get("KafkaVersion", ""),
41-
data_volume_kms_key_id=cluster.get("EncryptionInfo", {})
42-
.get("EncryptionAtRest", {})
43-
.get("DataVolumeKMSKeyId", ""),
44-
encryption_in_transit=EncryptionInTransit(
45-
client_broker=cluster.get("EncryptionInfo", {})
46-
.get("EncryptionInTransit", {})
47-
.get("ClientBroker", "PLAINTEXT"),
48-
in_cluster=cluster.get("EncryptionInfo", {})
49-
.get("EncryptionInTransit", {})
50-
.get("InCluster", False),
51-
),
52-
tls_authentication=cluster.get("ClientAuthentication", {})
53-
.get("Tls", {})
54-
.get("Enabled", False),
55-
public_access=cluster.get("BrokerNodeGroupInfo", {})
56-
.get("ConnectivityInfo", {})
57-
.get("PublicAccess", {})
58-
.get("Type", "SERVICE_PROVIDED_EIPS")
59-
!= "DISABLED",
60-
unauthentication_access=cluster.get(
61-
"ClientAuthentication", {}
43+
# Handle provisioned clusters
44+
if cluster_type == "PROVISIONED" and "Provisioned" in cluster:
45+
provisioned = cluster["Provisioned"]
46+
self.clusters[cluster.get("ClusterArn", "")] = Cluster(
47+
id=arn.split(":")[-1].split("/")[-1],
48+
name=cluster.get("ClusterName", ""),
49+
arn=arn,
50+
region=regional_client.region,
51+
tags=(
52+
list(cluster.get("Tags", {}).values())
53+
if cluster.get("Tags")
54+
else []
55+
),
56+
state=cluster.get("State", ""),
57+
kafka_version=provisioned.get(
58+
"CurrentBrokerSoftwareInfo", {}
59+
).get("KafkaVersion", ""),
60+
data_volume_kms_key_id=provisioned.get(
61+
"EncryptionInfo", {}
62+
)
63+
.get("EncryptionAtRest", {})
64+
.get("DataVolumeKMSKeyId", ""),
65+
encryption_in_transit=EncryptionInTransit(
66+
client_broker=provisioned.get("EncryptionInfo", {})
67+
.get("EncryptionInTransit", {})
68+
.get("ClientBroker", "PLAINTEXT"),
69+
in_cluster=provisioned.get("EncryptionInfo", {})
70+
.get("EncryptionInTransit", {})
71+
.get("InCluster", False),
72+
),
73+
tls_authentication=provisioned.get(
74+
"ClientAuthentication", {}
75+
)
76+
.get("Tls", {})
77+
.get("Enabled", False),
78+
public_access=provisioned.get("BrokerNodeGroupInfo", {})
79+
.get("ConnectivityInfo", {})
80+
.get("PublicAccess", {})
81+
.get("Type", "SERVICE_PROVIDED_EIPS")
82+
!= "DISABLED",
83+
unauthentication_access=provisioned.get(
84+
"ClientAuthentication", {}
85+
)
86+
.get("Unauthenticated", {})
87+
.get("Enabled", False),
88+
enhanced_monitoring=provisioned.get(
89+
"EnhancedMonitoring", "DEFAULT"
90+
),
91+
)
92+
logger.info(
93+
f"Kafka - Added provisioned cluster {cluster.get('ClusterName', 'Unknown')} to clusters dict"
94+
)
95+
96+
# Handle serverless clusters
97+
elif cluster_type == "SERVERLESS" and "Serverless" in cluster:
98+
# For serverless clusters, encryption is always enabled by default
99+
# We'll create a Cluster object with default encryption values
100+
self.clusters[cluster.get("ClusterArn", "")] = Cluster(
101+
id=arn.split(":")[-1].split("/")[-1],
102+
name=cluster.get("ClusterName", ""),
103+
arn=arn,
104+
region=regional_client.region,
105+
tags=(
106+
list(cluster.get("Tags", {}).values())
107+
if cluster.get("Tags")
108+
else []
109+
),
110+
state=cluster.get("State", ""),
111+
kafka_version="SERVERLESS", # Serverless doesn't have specific Kafka version
112+
data_volume_kms_key_id="AWS_MANAGED", # Serverless uses AWS managed keys
113+
encryption_in_transit=EncryptionInTransit(
114+
client_broker="TLS", # Serverless always has TLS enabled
115+
in_cluster=True, # Serverless always has in-cluster encryption
116+
),
117+
tls_authentication=True, # Serverless always has TLS authentication
118+
public_access=False, # Serverless clusters are always private
119+
unauthentication_access=False, # Serverless requires authentication
120+
enhanced_monitoring="DEFAULT",
121+
)
122+
logger.info(
123+
f"Kafka - Added serverless cluster {cluster.get('ClusterName', 'Unknown')} to clusters dict"
62124
)
63-
.get("Unauthenticated", {})
64-
.get("Enabled", False),
65-
enhanced_monitoring=cluster.get(
66-
"EnhancedMonitoring", "DEFAULT"
67-
),
125+
126+
else:
127+
logger.warning(
128+
f"Kafka - Unknown cluster type {cluster_type} for cluster {cluster.get('ClusterName', 'Unknown')}"
129+
)
130+
else:
131+
logger.info(
132+
f"Kafka - Cluster {cluster.get('ClusterName', 'Unknown')} filtered out by audit_resources"
68133
)
134+
135+
logger.info(
136+
f"Kafka - Total clusters found in region {regional_client.region}: {len(self.clusters)}"
137+
)
69138
except Exception as error:
70139
logger.error(
71140
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
72141
)
142+
logger.error(
143+
f"Kafka - Error details in region {regional_client.region}: {str(error)}"
144+
)
73145

74146
def _list_kafka_versions(self, regional_client):
75147
try:

0 commit comments

Comments
 (0)