Skip to content
This repository was archived by the owner on Oct 10, 2025. It is now read-only.

Commit d252eb4

Browse files
committed
Add SNS Topic ARN to CloudFormation Output
The ARN is now output as SNSTopicArn and export as <STACK_NAME>-SNSTopicArn
1 parent 2b6d70c commit d252eb4

File tree

5 files changed

+168
-76
lines changed

5 files changed

+168
-76
lines changed

source/aws_lambda/s3_event/handler.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for #
1111
# the specific language governing permissions and limitations under the License. #
1212
# ######################################################################################################################
13-
13+
import json
1414
import os
15-
from typing import List
1615

1716
from aws_lambda_powertools import Logger, Tracer, Metrics
1817
from aws_lambda_powertools.metrics import MetricUnit
@@ -43,21 +42,50 @@ def solution_name() -> str:
4342
return os.environ["SOLUTION_NAME"]
4443

4544

46-
def send_configuration_error(errors: List[str]):
45+
def send_configuration_error(configuration: Configuration):
46+
errors = configuration.errors
4747
sns = get_service_client("sns")
48+
dataset_group = configuration.dataset_group
49+
4850
subject = f"{solution_name()} Notifications"
4951

50-
message = "There were errors detected when reading a personalization job configuration file:\n\n"
51-
for error in errors:
52-
logger.error(f"Personalization job configuration error: {error}")
53-
message += f" - {error}\n"
54-
message += "\nPlease correct these errors and upload the configuration again."
52+
def build_default_message():
53+
f"The personalization workflow for {configuration.dataset_group} completed with errors."
54+
55+
def build_json_message():
56+
return json.dumps(
57+
{
58+
"datasetGroup": dataset_group,
59+
"status": "UPDATE FAILED",
60+
"summary": "There were errors detected when reading a personalization job configuration file",
61+
"description": [error for error in errors],
62+
}
63+
)
64+
65+
def build_long_message():
66+
message = "There were errors detected when reading a personalization job configuration file:\n\n"
67+
for error in errors:
68+
logger.error(f"Personalization job configuration error: {error}")
69+
message += f" - {error}\n"
70+
message += "\nPlease correct these errors and upload the configuration again."
71+
return message
5572

73+
logger.error("publishing configuration error to SQS")
5674
sns.publish(
5775
TopicArn=topic_arn(),
58-
Message=message,
76+
Message=json.dumps(
77+
{
78+
"default": build_default_message(),
79+
"sms": build_default_message(),
80+
"email": build_long_message(),
81+
"email-json": build_json_message(),
82+
"sqs": build_json_message(),
83+
}
84+
),
85+
MessageStructure="json",
5986
Subject=subject,
6087
)
88+
logger.error("published configuration error to SQS")
6189

6290

6391
@metrics.log_metrics
@@ -86,7 +114,7 @@ def lambda_handler(event, context):
86114
configuration = Configuration()
87115
configuration.load(config_text)
88116
if configuration.errors:
89-
send_configuration_error(configuration.errors)
117+
send_configuration_error(configuration)
90118
metrics.add_metric(
91119
"ConfigurationsProcessedFailures", unit=MetricUnit.Count, value=1
92120
)
@@ -98,7 +126,7 @@ def lambda_handler(event, context):
98126
metrics.add_metric(
99127
"ConfigurationsProcessedFailures", unit=MetricUnit.Count, value=1
100128
)
101-
send_configuration_error(configuration.errors)
129+
send_configuration_error(configuration)
102130
else:
103131
config = configuration.config_dict
104132
config = set_bucket(config, bucket, key)

source/aws_lambda/shared/personalize_service.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ class Configuration:
609609
def __init__(self):
610610
self._configuration_errors = []
611611
self.config_dict = {}
612+
self.dataset_group = "UNKNOWN"
612613

613614
def load(self, content: Union[Path, str]):
614615
if isinstance(content, Path):
@@ -670,6 +671,8 @@ def _validate_dataset_group(self, path="datasetGroup.serviceConfig"):
670671
)
671672
else:
672673
self._validate_resource(DatasetGroup(), dataset_group)
674+
if isinstance(dataset_group, dict):
675+
self.dataset_group = dataset_group.get("name", self.dataset_group)
673676

674677
def _validate_event_tracker(self, path="eventTracker.serviceConfig"):
675678
event_tracker = jmespath.search(path, self.config_dict)

source/aws_lambda/sns_notification/handler.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,26 @@ def __init__(self, event: Dict, context: LambdaContext):
6767
metrics.add_metric("JobSuccess", unit=MetricUnit.Count, value=1)
6868
self.message = self._build_success_message()
6969

70+
self.default = self._build_default_message()
71+
self.sms = self._build_sms_message()
72+
self.json = self._build_json_message()
73+
74+
def _build_json_message(self):
75+
return json.dumps(
76+
{
77+
"datasetGroup": self.dataset_group,
78+
"status": "UPDATE FAILED" if self.error else "UPDATE COMPLETE",
79+
"summary": self._build_default_message(),
80+
"description": self.message,
81+
}
82+
)
83+
84+
def _build_default_message(self) -> str:
85+
return f"The personalization workflow for {self.dataset_group} completed {'with errors' if self.error else 'successfully'}"
86+
87+
def _build_sms_message(self) -> str:
88+
return self._build_default_message()
89+
7090
def _build_error_message(self) -> str:
7191
"""
7292
Build the error message
@@ -116,12 +136,21 @@ def lambda_handler(event, context):
116136
:return: None
117137
"""
118138
sns = get_service_client("sns")
119-
message = MessageBuilder(event, context).message
139+
message_builder = MessageBuilder(event, context)
120140
subject = f"{solution_name()} Notifications"
121141

122142
logger.info("publishing message for event", extra={"event": event})
123143
sns.publish(
124144
TopicArn=topic_arn(),
125-
Message=message,
145+
Message=json.dumps(
146+
{
147+
"default": message_builder.default,
148+
"sms": message_builder.sms,
149+
"email": message_builder.message,
150+
"email-json": message_builder.json,
151+
"sqs": message_builder.json,
152+
}
153+
),
154+
MessageStructure="json",
126155
Subject=subject,
127156
)

source/infrastructure/personalize/stack.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,25 @@ def __init__(
6868
super().__init__(scope, construct_id, *args, **kwargs)
6969

7070
# CloudFormation Parameters
71+
self.email = cdk.CfnParameter(
72+
self,
73+
id="Email",
74+
type="String",
75+
description="Email to notify with personalize workflow results",
76+
default="",
77+
max_length=50,
78+
allowed_pattern=r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$|^$)",
79+
constraint_description="Must be a valid email address or blank",
80+
)
81+
self.solutions_template_options.add_parameter(
82+
self.email, "Email", "Solution Configuration"
83+
)
84+
self.email_provided = CfnCondition(
85+
self,
86+
"EmailProvided",
87+
expression=Fn.condition_not(Fn.condition_equals(self.email, "")),
88+
)
89+
7190
self.personalize_kms_key_arn = cdk.CfnParameter(
7291
self,
7392
id="PersonalizeKmsKeyArn",
@@ -88,25 +107,6 @@ def __init__(
88107
),
89108
)
90109

91-
self.email = cdk.CfnParameter(
92-
self,
93-
id="Email",
94-
type="String",
95-
description="Email to notify with personalize workflow results",
96-
default="",
97-
max_length=50,
98-
allowed_pattern=r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$|^$)",
99-
constraint_description="Must be a valid email address or blank",
100-
)
101-
self.solutions_template_options.add_parameter(
102-
self.email, "Email", "Solution Configuration"
103-
)
104-
self.email_provided = CfnCondition(
105-
self,
106-
"EmailProvided",
107-
expression=Fn.condition_not(Fn.condition_equals(self.email, "")),
108-
)
109-
110110
# layers
111111
layer_powertools = PowertoolsLayer.get_or_create(self)
112112
layer_solutions = SolutionsLayer.get_or_create(self)
@@ -413,3 +413,9 @@ def __init__(
413413
value=self.dashboard.name,
414414
export_name=f"{Aws.STACK_NAME}-Dashboard",
415415
)
416+
cdk.CfnOutput(
417+
self,
418+
"SNSTopicArn",
419+
value=notifications.topic.topic_arn,
420+
export_name=f"{Aws.STACK_NAME}-SNSTopicArn",
421+
)

source/tests/aws_lambda/sns_notification/test_sns_notification.py

Lines changed: 70 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,40 @@
1010
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for #
1111
# the specific language governing permissions and limitations under the License. #
1212
# ######################################################################################################################
13+
import json
1314
import os
1415
from collections import namedtuple
1516

17+
import boto3
1618
import pytest
17-
from botocore.stub import Stubber
19+
from moto import mock_sns, mock_sqs
1820

1921
from aws_lambda.sns_notification.handler import lambda_handler
20-
from aws_solutions.core import get_service_client
2122

2223
TRACE_ID = "1-57f5498f-d91047849216d0f2ea3b6442"
2324

2425

2526
@pytest.fixture
26-
def sns_stubber():
27-
sns_client = get_service_client("sns")
28-
with Stubber(sns_client) as stubber:
29-
yield stubber
27+
def sqs_mock():
28+
topic_arn = os.environ.get("SNS_TOPIC_ARN")
29+
topic_name = topic_arn.split(":")[-1]
30+
31+
with mock_sqs():
32+
with mock_sns():
33+
34+
cli = boto3.client("sns")
35+
cli.create_topic(Name=topic_name)
36+
37+
sqs = boto3.client("sqs")
38+
sqs.create_queue(QueueName="TestQueue")
39+
40+
cli.subscribe(
41+
TopicArn=topic_arn,
42+
Protocol="sqs",
43+
Endpoint=f"arn:aws:sqs:us-east-1:{'1'*12}:TestQueue",
44+
)
45+
46+
yield sqs
3047

3148

3249
@pytest.fixture
@@ -37,23 +54,6 @@ def trace_enabled():
3754

3855

3956
DATASET_GROUP_NAME = "DATASET_GROUP_NAME"
40-
EXPECTED_MESSAGE = """
41-
There was an error running the personalization job for dataset group DATASET_GROUP_NAME
42-
43-
Message: ERROR_MESSAGE
44-
45-
""".lstrip(
46-
"\n"
47-
)
48-
EXPECTED_MESSAGE_TRACE = f"""
49-
There was an error running the personalization job for dataset group DATASET_GROUP_NAME
50-
51-
Message: ERROR_MESSAGE
52-
53-
Traces: https://console.aws.amazon.com/xray/home?region=us-east-1#/traces/{TRACE_ID}
54-
""".strip(
55-
"\n"
56-
)
5757

5858

5959
@pytest.fixture
@@ -62,18 +62,8 @@ def context():
6262
return ctx(f"arn:aws:lambda:us-east-1:{'1' * 12}:function:my-function:1")
6363

6464

65-
def test_sns_notification(sns_stubber, context):
65+
def test_sns_notification(context, sqs_mock):
6666
"""Test without traces"""
67-
sns_stubber.add_response(
68-
"publish",
69-
{},
70-
expected_params={
71-
"TopicArn": os.environ.get("SNS_TOPIC_ARN"),
72-
"Subject": "Maintaining Personalized Experiences with Machine Learning Notifications",
73-
"Message": EXPECTED_MESSAGE,
74-
},
75-
)
76-
7767
lambda_handler(
7868
{
7969
"datasetGroup": DATASET_GROUP_NAME,
@@ -84,19 +74,32 @@ def test_sns_notification(sns_stubber, context):
8474
context,
8575
)
8676

77+
url = sqs_mock.get_queue_url(QueueName="TestQueue")["QueueUrl"]
78+
msg = json.loads(
79+
json.loads(
80+
sqs_mock.receive_message(QueueUrl=url, MaxNumberOfMessages=1,)["Messages"][
81+
0
82+
]["Body"]
83+
)["Message"]
84+
)
8785

88-
def test_sns_notification_trace(sns_stubber, trace_enabled, context):
89-
"""Test with traces"""
90-
sns_stubber.add_response(
91-
"publish",
92-
{},
93-
expected_params={
94-
"TopicArn": os.environ.get("SNS_TOPIC_ARN"),
95-
"Subject": "Maintaining Personalized Experiences with Machine Learning Notifications",
96-
"Message": EXPECTED_MESSAGE_TRACE,
97-
},
86+
error_default = (
87+
f"The personalization workflow for {DATASET_GROUP_NAME} completed with errors"
9888
)
89+
error_json = {
90+
"datasetGroup": DATASET_GROUP_NAME,
91+
"status": "UPDATE FAILED",
92+
"summary": f"The personalization workflow for {DATASET_GROUP_NAME} completed with errors",
93+
"description": f"There was an error running the personalization job for dataset group {DATASET_GROUP_NAME}\n\nMessage: ERROR_MESSAGE\n\n",
94+
}
95+
96+
assert msg["default"] == error_default
97+
assert msg["sms"] == error_default
98+
assert json.loads(msg["sqs"]) == error_json
99+
99100

101+
def test_sns_notification_trace(sqs_mock, trace_enabled, context):
102+
"""Test with traces"""
100103
lambda_handler(
101104
{
102105
"datasetGroup": DATASET_GROUP_NAME,
@@ -106,3 +109,26 @@ def test_sns_notification_trace(sns_stubber, trace_enabled, context):
106109
},
107110
context,
108111
)
112+
113+
url = sqs_mock.get_queue_url(QueueName="TestQueue")["QueueUrl"]
114+
msg = json.loads(
115+
json.loads(
116+
sqs_mock.receive_message(QueueUrl=url, MaxNumberOfMessages=1,)["Messages"][
117+
0
118+
]["Body"]
119+
)["Message"]
120+
)
121+
122+
error_default = (
123+
f"The personalization workflow for {DATASET_GROUP_NAME} completed with errors"
124+
)
125+
error_json = {
126+
"datasetGroup": f"{DATASET_GROUP_NAME}",
127+
"status": "UPDATE FAILED",
128+
"summary": f"The personalization workflow for {DATASET_GROUP_NAME} completed with errors",
129+
"description": f"There was an error running the personalization job for dataset group {DATASET_GROUP_NAME}\n\nMessage: ERROR_MESSAGE\n\nTraces: https://console.aws.amazon.com/xray/home?region=us-east-1#/traces/1-57f5498f-d91047849216d0f2ea3b6442",
130+
}
131+
132+
assert msg["default"] == error_default
133+
assert msg["sms"] == error_default
134+
assert json.loads(msg["sqs"]) == error_json

0 commit comments

Comments
 (0)