From ba032ab3cb77dcb543d3430541b252f38c09e27b Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 12:56:04 +0000 Subject: [PATCH 01/26] update: start format aws security hub --- functions/notify_slack.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index d7e7c864..095a319c 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -123,6 +123,19 @@ def format_cloudwatch_alarm(message: Dict[str, Any], region: str) -> Dict[str, A } +def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, Any] + """ + Format AWS Secuirty Hub finding event into Slack message format + + :params message: SNS message body containing SecurityHub finding event + :params region: AWS region where the event originated from + :returns: formatted Slack message payload + """ + + service_url = get_service_url(region=region, service="security_hub") + findings = message["detail"]["findings"] + + class GuardDutyFindingSeverity(Enum): """Maps GuardDuty finding severity to Slack message format color""" @@ -130,7 +143,6 @@ class GuardDutyFindingSeverity(Enum): Medium = "warning" High = "danger" - def format_guardduty_finding(message: Dict[str, Any], region: str) -> Dict[str, Any]: """ Format GuardDuty finding event into Slack message format @@ -401,6 +413,9 @@ def get_slack_message_payload( ) attachment = notification + elif isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": + notification = format_aws_security_hub(message=message, region=message["region"]) + elif isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": notification = format_aws_health(message=message, region=message["region"]) attachment = notification From 4a229057e2eb7560234093bbe58102634b169928 Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 12:58:05 +0000 Subject: [PATCH 02/26] chore --- functions/notify_slack.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 095a319c..2d7ddc54 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -123,18 +123,18 @@ def format_cloudwatch_alarm(message: Dict[str, Any], region: str) -> Dict[str, A } -def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, Any] - """ - Format AWS Secuirty Hub finding event into Slack message format - - :params message: SNS message body containing SecurityHub finding event - :params region: AWS region where the event originated from - :returns: formatted Slack message payload - """ - - service_url = get_service_url(region=region, service="security_hub") - findings = message["detail"]["findings"] - +#def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, Any] +# """ +# Format AWS Secuirty Hub finding event into Slack message format +# +# :params message: SNS message body containing SecurityHub finding event +# :params region: AWS region where the event originated from +# :returns: formatted Slack message payload +# """ +# +# service_url = get_service_url(region=region, service="security_hub") +# findings = message["detail"]["findings"] +# class GuardDutyFindingSeverity(Enum): """Maps GuardDuty finding severity to Slack message format color""" @@ -413,8 +413,8 @@ def get_slack_message_payload( ) attachment = notification - elif isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": - notification = format_aws_security_hub(message=message, region=message["region"]) + # elif isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": + # notification = format_aws_security_hub(message=message, region=message["region"]) elif isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": notification = format_aws_health(message=message, region=message["region"]) @@ -468,8 +468,7 @@ def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> str: :param context: lambda expected context object :returns: none """ - if os.environ.get("LOG_EVENTS", "False") == "True": - logging.info(f"Event logging enabled: `{json.dumps(event)}`") + logging.warning(f"Event logging enabled: `{json.dumps(event)}`") for record in event["Records"]: sns = record["Sns"] From 667cbe6a029d9a80d033f725fcb7d483cc0a6d60 Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 14:24:52 +0000 Subject: [PATCH 03/26] feat: security hub --- functions/notify_slack.py | 103 ++++++++++++++++++++++++++++++++------ 1 file changed, 89 insertions(+), 14 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 2d7ddc54..2d962333 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -123,18 +123,93 @@ def format_cloudwatch_alarm(message: Dict[str, Any], region: str) -> Dict[str, A } -#def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, Any] -# """ -# Format AWS Secuirty Hub finding event into Slack message format -# -# :params message: SNS message body containing SecurityHub finding event -# :params region: AWS region where the event originated from -# :returns: formatted Slack message payload -# """ -# -# service_url = get_service_url(region=region, service="security_hub") -# findings = message["detail"]["findings"] -# +def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, Any]: + """ + Format AWS Security Hub finding event into Slack message format + + :params message: SNS message body containing SecurityHub finding event + :params region: AWS region where the event originated from + :returns: formatted Slack message payload + """ + + finding = message["detail"]["findings"][0] # Get first finding + + # Check if the finding is from Security Hub + if finding.get("ProductName") != "Security Hub": + return format_default(message=message) + + severity = finding["Severity"]["Label"] + compliance_status = finding["Compliance"]["Status"] + + status_emoji = "✅" if compliance_status == "PASSED" else "⚠️" + + return { + "color": SecurityHubSeverity[severity].value, + "fallback": f"Security Hub Finding: {finding.get('Title')}", + "fields": [ + { + "title": "Title", + "value": f"{status_emoji} `{finding['Title']}`", + "short": False, + }, + { + "title": "Description", + "value": f"`{finding['Description']}`", + "short": False, + }, + { + "title": "Compliance Status", + "value": f"`{compliance_status}`", + "short": True, + }, + { + "title": "Severity", + "value": f"`{severity}`", + "short": True, + }, + { + "title": "Control ID", + "value": f"`{finding['ProductFields'].get('ControlId', 'N/A')}`", + "short": True, + }, + { + "title": "Account ID", + "value": f"`{finding['AwsAccountId']}`", + "short": True, + }, + { + "title": "First Observed", + "value": f"`{finding['FirstObservedAt']}`", + "short": True, + }, + { + "title": "Last Updated", + "value": f"`{finding['UpdatedAt']}`", + "short": True, + }, + { + "title": "Affected Resource", + "value": f"`{finding['Resources'][0]['Id']}`", + "short": False, + }, + { + "title": "Remediation", + "value": f"<{finding['Remediation']['Recommendation']['Url']}|Click here for remediation steps>", + "short": False, + } + ], + "text": f"AWS Security Hub Finding - {finding.get('Title')}", + } + + +class SecurityHubSeverity(Enum): + """Maps Security Hub finding severity to Slack message format color""" + + CRITICAL = "danger" + HIGH = "danger" + MEDIUM = "warning" + LOW = "#777777" + INFORMATIONAL = "#439FE0" class GuardDutyFindingSeverity(Enum): """Maps GuardDuty finding severity to Slack message format color""" @@ -413,8 +488,8 @@ def get_slack_message_payload( ) attachment = notification - # elif isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": - # notification = format_aws_security_hub(message=message, region=message["region"]) + elif isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": + notification = format_aws_security_hub(message=message, region=message["region"]) elif isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": notification = format_aws_health(message=message, region=message["region"]) From 1725f62b5246fcb0b2fda3180366f965a64f5f94 Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 14:41:28 +0000 Subject: [PATCH 04/26] feat: security hub --- functions/notify_slack.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 2d962333..40d08e9e 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -489,7 +489,8 @@ def get_slack_message_payload( attachment = notification elif isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": - notification = format_aws_security_hub(message=message, region=message["region"]) + notification = format_aws_security_hub(message=message, region=message["region"]) + attachment = notification elif isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": notification = format_aws_health(message=message, region=message["region"]) From c97dfb8d02e1408d3a9ec0c7be7b93df8d610baa Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 15:04:28 +0000 Subject: [PATCH 05/26] feat: security hub --- functions/notify_slack.py | 90 ++++++++++++++------------------------- 1 file changed, 32 insertions(+), 58 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 40d08e9e..4402bf59 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -132,76 +132,43 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A :returns: formatted Slack message payload """ - finding = message["detail"]["findings"][0] # Get first finding + finding = message["detail"]["findings"][0] - # Check if the finding is from Security Hub if finding.get("ProductName") != "Security Hub": return format_default(message=message) - severity = finding["Severity"]["Label"] - compliance_status = finding["Compliance"]["Status"] + severity = finding["Severity"].get("Label", "INFORMATIONAL") + compliance_status = finding["Compliance"].get("Status", "UNKNOWN") status_emoji = "✅" if compliance_status == "PASSED" else "⚠️" + title = finding.get("Title", "No Title Provided") + description = finding.get("Description", "No Description Provided") + control_id = finding['ProductFields'].get('ControlId', 'N/A') + aws_account_id = finding.get('AwsAccountId', 'Unknown Account') + first_observed = finding.get('FirstObservedAt', 'Unknown Date') + last_updated = finding.get('UpdatedAt', 'Unknown Date') + affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') + remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") + return { - "color": SecurityHubSeverity[severity].value, - "fallback": f"Security Hub Finding: {finding.get('Title')}", + "color": SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value, + "fallback": f"Security Hub Finding: {title}", "fields": [ - { - "title": "Title", - "value": f"{status_emoji} `{finding['Title']}`", - "short": False, - }, - { - "title": "Description", - "value": f"`{finding['Description']}`", - "short": False, - }, - { - "title": "Compliance Status", - "value": f"`{compliance_status}`", - "short": True, - }, - { - "title": "Severity", - "value": f"`{severity}`", - "short": True, - }, - { - "title": "Control ID", - "value": f"`{finding['ProductFields'].get('ControlId', 'N/A')}`", - "short": True, - }, - { - "title": "Account ID", - "value": f"`{finding['AwsAccountId']}`", - "short": True, - }, - { - "title": "First Observed", - "value": f"`{finding['FirstObservedAt']}`", - "short": True, - }, - { - "title": "Last Updated", - "value": f"`{finding['UpdatedAt']}`", - "short": True, - }, - { - "title": "Affected Resource", - "value": f"`{finding['Resources'][0]['Id']}`", - "short": False, - }, - { - "title": "Remediation", - "value": f"<{finding['Remediation']['Recommendation']['Url']}|Click here for remediation steps>", - "short": False, - } + {"title": "Title", "value": f"{status_emoji} `{title}`", "short": False}, + {"title": "Description", "value": f"`{description}`", "short": False}, + {"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True}, + {"title": "Severity", "value": f"`{severity}`", "short": True}, + {"title": "Control ID", "value": f"`{control_id}`", "short": True}, + {"title": "Account ID", "value": f"`{aws_account_id}`", "short": True}, + {"title": "First Observed", "value": f"`{first_observed}`", "short": True}, + {"title": "Last Updated", "value": f"`{last_updated}`", "short": True}, + {"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False}, + {"title": "Remediation", "value": f"<{remediation_url}|Click here for remediation steps>", "short": False}, ], - "text": f"AWS Security Hub Finding - {finding.get('Title')}", + "text": f"AWS Security Hub Finding - {title}", } - class SecurityHubSeverity(Enum): """Maps Security Hub finding severity to Slack message format color""" @@ -211,6 +178,13 @@ class SecurityHubSeverity(Enum): LOW = "#777777" INFORMATIONAL = "#439FE0" + @staticmethod + def get(name, default): + try: + return SecurityHubSeverity[name] + except KeyError: + return default + class GuardDutyFindingSeverity(Enum): """Maps GuardDuty finding severity to Slack message format color""" From 355524ba98ef968176b4183ca46173be2ed3e5ed Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 15:15:52 +0000 Subject: [PATCH 06/26] feat: security hub --- functions/notify_slack.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 4402bf59..08d33565 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -151,7 +151,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") - return { + slack_message = { "color": SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value, "fallback": f"Security Hub Finding: {title}", "fields": [ @@ -169,6 +169,9 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A "text": f"AWS Security Hub Finding - {title}", } + logging.warning(f"slack message: `{json.dumps(slack_message)}`") + return slack_message + class SecurityHubSeverity(Enum): """Maps Security Hub finding severity to Slack message format color""" From e978494cd49e8020642001bea04628462cd6b8e1 Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 16:22:10 +0000 Subject: [PATCH 07/26] feat: security hub --- functions/notify_slack.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 08d33565..58d9625e 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -132,6 +132,8 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A :returns: formatted Slack message payload """ + + service_url = get_service_url(region=region, service="securityhub") finding = message["detail"]["findings"][0] if finding.get("ProductName") != "Security Hub": @@ -142,14 +144,21 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A status_emoji = "✅" if compliance_status == "PASSED" else "⚠️" + Id = finding.get("Id", "No ID Provided") title = finding.get("Title", "No Title Provided") description = finding.get("Description", "No Description Provided") control_id = finding['ProductFields'].get('ControlId', 'N/A') + control_url = service_url + f"#/control/{control_id}" aws_account_id = finding.get('AwsAccountId', 'Unknown Account') first_observed = finding.get('FirstObservedAt', 'Unknown Date') last_updated = finding.get('UpdatedAt', 'Unknown Date') affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") + generator_id = finding.get("GeneratorId", "Unknown Generator") + + finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A" + encoded_id = urllib.parse.quote(Id, safe='') + finding_url = f"{service_url}{finding_base_path}{encoded_id}" slack_message = { "color": SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value, @@ -164,12 +173,14 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A {"title": "First Observed", "value": f"`{first_observed}`", "short": True}, {"title": "Last Updated", "value": f"`{last_updated}`", "short": True}, {"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False}, - {"title": "Remediation", "value": f"<{remediation_url}|Click here for remediation steps>", "short": False}, + {"title": "Generator", "value": f"`{generator_id}`", "short": False}, + {"title": "Control Url", "value": f"`{control_url}`", "short": False}, + {"title": "Finding Url", "value": f"`{finding_url}`", "short": False}, + {"title": "Remediation", "value": f"`{remediation_url}`", "short": False}, ], "text": f"AWS Security Hub Finding - {title}", } - logging.warning(f"slack message: `{json.dumps(slack_message)}`") return slack_message class SecurityHubSeverity(Enum): From 550268aefecf4492a14361f082608cbb0f85080a Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 16:30:04 +0000 Subject: [PATCH 08/26] feat: security hub --- functions/notify_slack.py | 1 + 1 file changed, 1 insertion(+) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 58d9625e..39b239ff 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -32,6 +32,7 @@ class AwsService(Enum): cloudwatch = "cloudwatch" guardduty = "guardduty" + securityhub = "securityhub" def decrypt_url(encrypted_url: str) -> str: From bf19f81550f9f60590287c97739dc81873feddd6 Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 16:39:10 +0000 Subject: [PATCH 09/26] feat: security hub --- functions/notify_slack.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 39b239ff..7264300e 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -149,7 +149,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A title = finding.get("Title", "No Title Provided") description = finding.get("Description", "No Description Provided") control_id = finding['ProductFields'].get('ControlId', 'N/A') - control_url = service_url + f"#/control/{control_id}" + control_url = service_url + f"#/controls/{control_id}" aws_account_id = finding.get('AwsAccountId', 'Unknown Account') first_observed = finding.get('FirstObservedAt', 'Unknown Date') last_updated = finding.get('UpdatedAt', 'Unknown Date') @@ -158,8 +158,8 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A generator_id = finding.get("GeneratorId", "Unknown Generator") finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A" - encoded_id = urllib.parse.quote(Id, safe='') - finding_url = f"{service_url}{finding_base_path}{encoded_id}" + double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='') + finding_url = f"{service_url}{finding_base_path}{double_encoded_id}" slack_message = { "color": SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value, From abf7972adfae8c840d33815b5df7514405c228e6 Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 16:48:44 +0000 Subject: [PATCH 10/26] feat: security hub --- functions/notify_slack.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 7264300e..2ad4ee87 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -161,8 +161,12 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='') finding_url = f"{service_url}{finding_base_path}{double_encoded_id}" + color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value + if compliance_status == "PASSED": + color = "#4BB543" + slack_message = { - "color": SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value, + "color": color, "fallback": f"Security Hub Finding: {title}", "fields": [ {"title": "Title", "value": f"{status_emoji} `{title}`", "short": False}, From ef9e59ab9ed6b519748b9724e2fa5ec39a047dc1 Mon Sep 17 00:00:00 2001 From: Kay Date: Thu, 14 Nov 2024 16:49:33 +0000 Subject: [PATCH 11/26] feat: security hub --- functions/notify_slack.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 2ad4ee87..43d08b7a 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -143,8 +143,6 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A severity = finding["Severity"].get("Label", "INFORMATIONAL") compliance_status = finding["Compliance"].get("Status", "UNKNOWN") - status_emoji = "✅" if compliance_status == "PASSED" else "⚠️" - Id = finding.get("Id", "No ID Provided") title = finding.get("Title", "No Title Provided") description = finding.get("Description", "No Description Provided") @@ -169,7 +167,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A "color": color, "fallback": f"Security Hub Finding: {title}", "fields": [ - {"title": "Title", "value": f"{status_emoji} `{title}`", "short": False}, + {"title": "Title", "value": f"`{title}`", "short": False}, {"title": "Description", "value": f"`{description}`", "short": False}, {"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True}, {"title": "Severity", "value": f"`{severity}`", "short": True}, From a0c9c53fd58c505a7dec5f06e99b6cddec7f7deb Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 11:34:57 +0000 Subject: [PATCH 12/26] feat: security hub --- functions/notify_slack.py | 144 +++++++++++++++++++++++++------------- 1 file changed, 96 insertions(+), 48 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 43d08b7a..68e983f5 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -137,54 +137,102 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A service_url = get_service_url(region=region, service="securityhub") finding = message["detail"]["findings"][0] - if finding.get("ProductName") != "Security Hub": - return format_default(message=message) - - severity = finding["Severity"].get("Label", "INFORMATIONAL") - compliance_status = finding["Compliance"].get("Status", "UNKNOWN") - - Id = finding.get("Id", "No ID Provided") - title = finding.get("Title", "No Title Provided") - description = finding.get("Description", "No Description Provided") - control_id = finding['ProductFields'].get('ControlId', 'N/A') - control_url = service_url + f"#/controls/{control_id}" - aws_account_id = finding.get('AwsAccountId', 'Unknown Account') - first_observed = finding.get('FirstObservedAt', 'Unknown Date') - last_updated = finding.get('UpdatedAt', 'Unknown Date') - affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') - remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") - generator_id = finding.get("GeneratorId", "Unknown Generator") - - finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A" - double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='') - finding_url = f"{service_url}{finding_base_path}{double_encoded_id}" - - color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value - if compliance_status == "PASSED": - color = "#4BB543" - - slack_message = { - "color": color, - "fallback": f"Security Hub Finding: {title}", - "fields": [ - {"title": "Title", "value": f"`{title}`", "short": False}, - {"title": "Description", "value": f"`{description}`", "short": False}, - {"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True}, - {"title": "Severity", "value": f"`{severity}`", "short": True}, - {"title": "Control ID", "value": f"`{control_id}`", "short": True}, - {"title": "Account ID", "value": f"`{aws_account_id}`", "short": True}, - {"title": "First Observed", "value": f"`{first_observed}`", "short": True}, - {"title": "Last Updated", "value": f"`{last_updated}`", "short": True}, - {"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False}, - {"title": "Generator", "value": f"`{generator_id}`", "short": False}, - {"title": "Control Url", "value": f"`{control_url}`", "short": False}, - {"title": "Finding Url", "value": f"`{finding_url}`", "short": False}, - {"title": "Remediation", "value": f"`{remediation_url}`", "short": False}, - ], - "text": f"AWS Security Hub Finding - {title}", - } - - return slack_message + if finding.get("ProductName") == "Inspector": + severity = finding["Severity"].get("Label", "INFORMATIONAL") + compliance_status = finding["Compliance"].get("Status", "UNKNOWN") + + Id = finding.get("Id", "No ID Provided") + title = finding.get("Title", "No Title Provided") + description = finding.get("Description", "No Description Provided") + control_id = finding['ProductFields'].get('ControlId', 'N/A') + control_url = service_url + f"#/controls/{control_id}" + aws_account_id = finding.get('AwsAccountId', 'Unknown Account') + first_observed = finding.get('FirstObservedAt', 'Unknown Date') + last_updated = finding.get('UpdatedAt', 'Unknown Date') + affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') + remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") + + finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A" + double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='') + finding_url = f"{service_url}{finding_base_path}{double_encoded_id}" + generator_id = finding.get("GeneratorId", "Unknown Generator") + + color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value + if compliance_status == "PASSED": + color = "#4BB543" + + slack_message = { + "color": color, + "fallback": f"Inspector Finding: {title}", + "fields": [ + {"title": "Title", "value": f"`{title}`", "short": False}, + {"title": "Description", "value": f"`{description}`", "short": False}, + {"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True}, + {"title": "Severity", "value": f"`{severity}`", "short": True}, + {"title": "Control ID", "value": f"`{control_id}`", "short": True}, + {"title": "Account ID", "value": f"`{aws_account_id}`", "short": True}, + {"title": "First Observed", "value": f"`{first_observed}`", "short": True}, + {"title": "Last Updated", "value": f"`{last_updated}`", "short": True}, + {"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False}, + {"title": "Generator", "value": f"`{generator_id}`", "short": False}, + {"title": "Control Url", "value": f"`{control_url}`", "short": False}, + {"title": "Finding Url", "value": f"`{finding_url}`", "short": False}, + {"title": "Remediation", "value": f"`{remediation_url}`", "short": False}, + ], + "text": f"AWS Inspector Finding - {title}", + } + + return slack_message + + elif finding.get("ProductName") == "Security Hub": + severity = finding["Severity"].get("Label", "INFORMATIONAL") + compliance_status = finding["Compliance"].get("Status", "UNKNOWN") + + Id = finding.get("Id", "No ID Provided") + title = finding.get("Title", "No Title Provided") + description = finding.get("Description", "No Description Provided") + control_id = finding['ProductFields'].get('ControlId', 'N/A') + control_url = service_url + f"#/controls/{control_id}" + aws_account_id = finding.get('AwsAccountId', 'Unknown Account') + first_observed = finding.get('FirstObservedAt', 'Unknown Date') + last_updated = finding.get('UpdatedAt', 'Unknown Date') + affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') + remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") + generator_id = finding.get("GeneratorId", "Unknown Generator") + + finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A" + double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='') + finding_url = f"{service_url}{finding_base_path}{double_encoded_id}" + + color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value + if compliance_status == "PASSED": + color = "#4BB543" + + slack_message = { + "color": color, + "fallback": f"Security Hub Finding: {title}", + "fields": [ + {"title": "Title", "value": f"`{title}`", "short": False}, + {"title": "Description", "value": f"`{description}`", "short": False}, + {"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True}, + {"title": "Severity", "value": f"`{severity}`", "short": True}, + {"title": "Control ID", "value": f"`{control_id}`", "short": True}, + {"title": "Account ID", "value": f"`{aws_account_id}`", "short": True}, + {"title": "First Observed", "value": f"`{first_observed}`", "short": True}, + {"title": "Last Updated", "value": f"`{last_updated}`", "short": True}, + {"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False}, + {"title": "Generator", "value": f"`{generator_id}`", "short": False}, + {"title": "Control Url", "value": f"`{control_url}`", "short": False}, + {"title": "Finding Url", "value": f"`{finding_url}`", "short": False}, + {"title": "Remediation", "value": f"`{remediation_url}`", "short": False}, + ], + "text": f"AWS Security Hub Finding - {title}", + } + + return slack_message + + + return format_default(message=message) class SecurityHubSeverity(Enum): """Maps Security Hub finding severity to Slack message format color""" From 3e98f0a41dcd7ce756ae708d8ec5a73d9e6782d7 Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 12:56:09 +0000 Subject: [PATCH 13/26] feat: security hub --- functions/notify_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 68e983f5..89e2f37a 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -184,7 +184,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A return slack_message - elif finding.get("ProductName") == "Security Hub": + if finding.get("ProductName") == "Security Hub": severity = finding["Severity"].get("Label", "INFORMATIONAL") compliance_status = finding["Compliance"].get("Status", "UNKNOWN") From 78ce6cfd353a54d1c819785ec102ed6dbe794f4e Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 13:51:45 +0000 Subject: [PATCH 14/26] update: automatically switch status from new to notified --- functions/notify_slack.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 89e2f37a..495d6b39 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -26,6 +26,7 @@ # Create client so its cached/frozen between invocations KMS_CLIENT = boto3.client("kms", region_name=REGION) +SECURITY_HUB_CLIENT = boto3.client('securityhub', region_name=REGION) class AwsService(Enum): """AWS service supported by function""" @@ -133,6 +134,17 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A :returns: formatted Slack message payload """ + # Switch Status From New To Notified To Prevent Repeated Messaages + notified = SECURITY_HUB_CLIENT.update_findings( + FindingIdentifiers=[{ + 'Id': message["details"]["findings"][0].get("Id"), + 'ProductArn': message["details"]["findings"][0].get("ProductArn")}], + Workflow={"Status": "NOTIFIED"} + ) + + logging.warning(f"Update Notified Status: `{json.dumps(notified)}`") + print(notified) + service_url = get_service_url(region=region, service="securityhub") finding = message["detail"]["findings"][0] From 04623a3f74aa9a9b2386bc62484e3d59478cd9db Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 14:09:27 +0000 Subject: [PATCH 15/26] update: automatically switch status from new to notified --- functions/notify_slack.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 495d6b39..070e05f6 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -134,16 +134,19 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A :returns: formatted Slack message payload """ - # Switch Status From New To Notified To Prevent Repeated Messaages - notified = SECURITY_HUB_CLIENT.update_findings( - FindingIdentifiers=[{ - 'Id': message["details"]["findings"][0].get("Id"), - 'ProductArn': message["details"]["findings"][0].get("ProductArn")}], - Workflow={"Status": "NOTIFIED"} - ) - - logging.warning(f"Update Notified Status: `{json.dumps(notified)}`") - print(notified) + # Switch Status From New To Notified To Prevent Repeated Messages + try: + notified = SECURITY_HUB_CLIENT.update_findings( + FindingIdentifiers=[{ + 'Id': message["detail"]["findings"][0]["Id"], + 'ProductArn': message["detail"]["findings"][0]["ProductArn"] + }], + Workflow={"Status": "NOTIFIED"} + ) + logging.info(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") + except Exception as e: + logging.error(f"Failed to update finding status: {str(e)}") + pass service_url = get_service_url(region=region, service="securityhub") From 20f55fd9e17bc5f72c849f86609bb542d4a0fe67 Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 14:30:03 +0000 Subject: [PATCH 16/26] update: automatically switch status from new to notified --- functions/notify_slack.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 070e05f6..8b1b891e 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -133,24 +133,27 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A :params region: AWS region where the event originated from :returns: formatted Slack message payload """ + service_url = get_service_url(region=region, service="securityhub") + finding = message["detail"]["findings"][0] # Switch Status From New To Notified To Prevent Repeated Messages try: - notified = SECURITY_HUB_CLIENT.update_findings( - FindingIdentifiers=[{ - 'Id': message["detail"]["findings"][0]["Id"], - 'ProductArn': message["detail"]["findings"][0]["ProductArn"] - }], - Workflow={"Status": "NOTIFIED"} - ) - logging.info(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") + severity = finding["Severity"].get("Label", "INFORMATIONAL") + if severity == "FAILED": + notified = SECURITY_HUB_CLIENT.batch_update_findings( + FindingIdentifiers=[{ + 'Id': finding.get('Id'), + 'ProductArn': finding.get("ProductArn") + }], + Workflow={"Status": "NOTIFIED"} + ) + logging.info(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") except Exception as e: logging.error(f"Failed to update finding status: {str(e)}") pass - service_url = get_service_url(region=region, service="securityhub") - finding = message["detail"]["findings"][0] + if finding.get("ProductName") == "Inspector": severity = finding["Severity"].get("Label", "INFORMATIONAL") From ec5111496b0b3f7efea49d460d72070cf489bf2b Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 14:37:36 +0000 Subject: [PATCH 17/26] update: automatically switch status from new to notified --- functions/notify_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 8b1b891e..2262093f 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -147,7 +147,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A }], Workflow={"Status": "NOTIFIED"} ) - logging.info(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") + logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") except Exception as e: logging.error(f"Failed to update finding status: {str(e)}") pass From d0dcb6c2da8a4a2adc843deed97fad3342f05a3b Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 14:43:53 +0000 Subject: [PATCH 18/26] update: automatically switch status from new to notified --- functions/notify_slack.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 2262093f..4139725a 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -138,8 +138,8 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A # Switch Status From New To Notified To Prevent Repeated Messages try: - severity = finding["Severity"].get("Label", "INFORMATIONAL") - if severity == "FAILED": + compliance_status = finding["Compliance"].get("Status", "UNKNOWN") + if compliance_status == "FAILED": notified = SECURITY_HUB_CLIENT.batch_update_findings( FindingIdentifiers=[{ 'Id': finding.get('Id'), From 06106406759e458f0f5df270c29a87c80183b4a9 Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 14:54:28 +0000 Subject: [PATCH 19/26] update: automatically switch status from new to notified --- main.tf | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/main.tf b/main.tf index 401d33ce..9a25178c 100644 --- a/main.tf +++ b/main.tf @@ -26,6 +26,13 @@ locals { resources = [var.kms_key_arn] } + lambda_policy_document_securityhub = { + sid = "AllowSecurityHub" + effect = "Allow" + actions = ["securityhub:BatchImportFindings"] + resources = ["*"] + } + lambda_handler = try(split(".", basename(var.lambda_source_path))[0], "notify_slack") } @@ -33,7 +40,8 @@ data "aws_iam_policy_document" "lambda" { count = var.create ? 1 : 0 dynamic "statement" { - for_each = concat([local.lambda_policy_document], var.kms_key_arn != "" ? [local.lambda_policy_document_kms] : []) + for_each = concat([local.lambda_policy_document, + local.lambda_policy_document_securityhub], var.kms_key_arn != "" ? [local.lambda_policy_document_kms] : []) content { sid = statement.value.sid effect = statement.value.effect From 47efa948b4710a11ce144f2a0dcc9dfafe9a5607 Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 15:01:27 +0000 Subject: [PATCH 20/26] update: automatically switch status from new to notified --- main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.tf b/main.tf index 9a25178c..601cf51a 100644 --- a/main.tf +++ b/main.tf @@ -29,7 +29,7 @@ locals { lambda_policy_document_securityhub = { sid = "AllowSecurityHub" effect = "Allow" - actions = ["securityhub:BatchImportFindings"] + actions = ["securityhub:BatchUpdateFindings"] resources = ["*"] } From eb26ec4649e42d5e0fa6986d1a77144d8edf8d2f Mon Sep 17 00:00:00 2001 From: Kay Date: Fri, 15 Nov 2024 15:45:53 +0000 Subject: [PATCH 21/26] update: automatically switch status from new to notified# --- functions/notify_slack.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 4139725a..66929a96 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -139,7 +139,8 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A # Switch Status From New To Notified To Prevent Repeated Messages try: compliance_status = finding["Compliance"].get("Status", "UNKNOWN") - if compliance_status == "FAILED": + workflow_status = finding["Workflow"].get("Status", "UNKNOWN") + if compliance_status == "FAILED" and workflow_status == "NEW": notified = SECURITY_HUB_CLIENT.batch_update_findings( FindingIdentifiers=[{ 'Id': finding.get('Id'), From 0357c203370ab8d67f3b7441bec2a1b8b490865d Mon Sep 17 00:00:00 2001 From: Kay Date: Tue, 7 Jan 2025 08:05:05 +0000 Subject: [PATCH 22/26] fix: lint --- functions/notify_slack.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 66929a96..1ad4fb36 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -28,6 +28,7 @@ SECURITY_HUB_CLIENT = boto3.client('securityhub', region_name=REGION) + class AwsService(Enum): """AWS service supported by function""" @@ -141,21 +142,18 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A compliance_status = finding["Compliance"].get("Status", "UNKNOWN") workflow_status = finding["Workflow"].get("Status", "UNKNOWN") if compliance_status == "FAILED" and workflow_status == "NEW": - notified = SECURITY_HUB_CLIENT.batch_update_findings( - FindingIdentifiers=[{ - 'Id': finding.get('Id'), - 'ProductArn': finding.get("ProductArn") - }], - Workflow={"Status": "NOTIFIED"} - ) - logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") + notified = SECURITY_HUB_CLIENT.batch_update_findings( + FindingIdentifiers=[{ + 'Id': finding.get('Id'), + 'ProductArn': finding.get("ProductArn") + }], + Workflow={"Status": "NOTIFIED"} + ) + logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") except Exception as e: logging.error(f"Failed to update finding status: {str(e)}") pass - - - if finding.get("ProductName") == "Inspector": severity = finding["Severity"].get("Label", "INFORMATIONAL") compliance_status = finding["Compliance"].get("Status", "UNKNOWN") @@ -178,7 +176,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value if compliance_status == "PASSED": - color = "#4BB543" + color = "#4BB543" slack_message = { "color": color, @@ -225,7 +223,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value if compliance_status == "PASSED": - color = "#4BB543" + color = "#4BB543" slack_message = { "color": color, @@ -250,9 +248,9 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A return slack_message - return format_default(message=message) + class SecurityHubSeverity(Enum): """Maps Security Hub finding severity to Slack message format color""" @@ -269,6 +267,7 @@ def get(name, default): except KeyError: return default + class GuardDutyFindingSeverity(Enum): """Maps GuardDuty finding severity to Slack message format color""" @@ -276,6 +275,7 @@ class GuardDutyFindingSeverity(Enum): Medium = "warning" High = "danger" + def format_guardduty_finding(message: Dict[str, Any], region: str) -> Dict[str, Any]: """ Format GuardDuty finding event into Slack message format From d48acab158f926ef71b18d56a11fcbb42b497eaa Mon Sep 17 00:00:00 2001 From: Kay Date: Tue, 11 Mar 2025 15:01:05 +0000 Subject: [PATCH 23/26] refactor: parsing of each notification into its own function --- functions/notify_slack.py | 51 ++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 1ad4fb36..182e9630 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -503,6 +503,27 @@ def format_default( return attachments +def parse_notification(message: Dict[str, Any], subject: Optional[str], region: str) -> Optional[Dict]: + """ + Parse notification message and format into Slack message payload + + :params message: SNS message body notification payload + :params subject: Optional subject line for Slack notification + :params region: AWS region where the event originated from + :returns: Slack message payload + """ + if "AlarmName" in message: + return format_cloudwatch_alarm(message=message, region=region) + if message.get("detail-type") == "GuardDuty Finding": + return format_guardduty_finding(message=message, region=message["region"]) + if message.get("detail-type") == "Security Hub Findings - Imported": + return format_aws_security_hub(message=message, region=message["region"]) + if message.get("detail-type") == "AWS Health Event": + return format_aws_health(message=message, region=message["region"]) + if subject == "Notification from AWS Backup": + return format_aws_backup(message=str(message)) + return format_default(message=message, subject=subject) + def get_slack_message_payload( message: Union[str, Dict], region: str, subject: Optional[str] = None ) -> Dict: @@ -534,35 +555,10 @@ def get_slack_message_payload( message = cast(Dict[str, Any], message) - if "AlarmName" in message: - notification = format_cloudwatch_alarm(message=message, region=region) - attachment = notification - - elif ( - isinstance(message, Dict) and message.get("detail-type") == "GuardDuty Finding" - ): - notification = format_guardduty_finding( - message=message, region=message["region"] - ) - attachment = notification - - elif isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": - notification = format_aws_security_hub(message=message, region=message["region"]) - attachment = notification - - elif isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": - notification = format_aws_health(message=message, region=message["region"]) - attachment = notification - - elif subject == "Notification from AWS Backup": - notification = format_aws_backup(message=str(message)) - attachment = notification - - elif "attachments" in message or "text" in message: + if "attachments" in message or "text" in message: payload = {**payload, **message} - else: - attachment = format_default(message=message, subject=subject) + attachment = parse_notification(message, subject, region) if attachment: payload["attachments"] = [attachment] # type: ignore @@ -602,7 +598,6 @@ def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> str: :param context: lambda expected context object :returns: none """ - logging.warning(f"Event logging enabled: `{json.dumps(event)}`") for record in event["Records"]: sns = record["Sns"] From e140b8f92d239e4090cf77859a594e43ed1d0202 Mon Sep 17 00:00:00 2001 From: Kay Date: Tue, 11 Mar 2025 17:46:01 +0000 Subject: [PATCH 24/26] fix: lint --- functions/notify_slack.py | 1 + 1 file changed, 1 insertion(+) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 182e9630..937a3c9a 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -524,6 +524,7 @@ def parse_notification(message: Dict[str, Any], subject: Optional[str], region: return format_aws_backup(message=str(message)) return format_default(message=message, subject=subject) + def get_slack_message_payload( message: Union[str, Dict], region: str, subject: Optional[str] = None ) -> Dict: From b017387d4edb89783340a9fcaad0cf143e257b01 Mon Sep 17 00:00:00 2001 From: Kay Date: Tue, 11 Mar 2025 17:48:51 +0000 Subject: [PATCH 25/26] fix: lint --- functions/notify_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 68466e38..491a756e 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -599,7 +599,7 @@ def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> str: :param context: lambda expected context object :returns: none """ - + if os.environ.get("LOG_EVENTS", "False") == "True": logging.info("Event logging enabled: %s", json.dumps(event)) From 2b128e442fef3bbcd989e97996113cce1ad92240 Mon Sep 17 00:00:00 2001 From: Kay Date: Wed, 12 Mar 2025 16:13:31 +0000 Subject: [PATCH 26/26] fix: unit tests --- functions/notify_slack.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 491a756e..b68e0543 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -514,11 +514,11 @@ def parse_notification(message: Dict[str, Any], subject: Optional[str], region: """ if "AlarmName" in message: return format_cloudwatch_alarm(message=message, region=region) - if message.get("detail-type") == "GuardDuty Finding": + if isinstance(message, Dict) and message.get("detail-type") == "GuardDuty Finding": return format_guardduty_finding(message=message, region=message["region"]) - if message.get("detail-type") == "Security Hub Findings - Imported": + if isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": return format_aws_security_hub(message=message, region=message["region"]) - if message.get("detail-type") == "AWS Health Event": + if isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": return format_aws_health(message=message, region=message["region"]) if subject == "Notification from AWS Backup": return format_aws_backup(message=str(message))