Skip to content

Commit ecffa7c

Browse files
authored
fix: Support creating a FIFO queue if using FIFO SNS event (#3357)
1 parent f4648d0 commit ecffa7c

File tree

8 files changed

+516
-12
lines changed

8 files changed

+516
-12
lines changed

samtranslator/model/eventsources/push.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,15 @@
1212
from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
1313
from samtranslator.model.eventsources.pull import SQS
1414
from samtranslator.model.exceptions import InvalidDocumentException, InvalidEventException, InvalidResourceException
15-
from samtranslator.model.intrinsics import fnGetAtt, fnSub, is_intrinsic, make_conditional, make_shorthand, ref
15+
from samtranslator.model.intrinsics import (
16+
fnGetAtt,
17+
fnSub,
18+
get_logical_id_from_intrinsic,
19+
is_intrinsic,
20+
make_conditional,
21+
make_shorthand,
22+
ref,
23+
)
1624
from samtranslator.model.iot import IotTopicRule
1725
from samtranslator.model.lambda_ import LambdaPermission
1826
from samtranslator.model.s3 import S3Bucket
@@ -517,6 +525,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
517525
if not function:
518526
raise TypeError("Missing required keyword argument: function")
519527

528+
intrinsics_resolver: IntrinsicsResolver = kwargs["intrinsics_resolver"]
529+
520530
# SNS -> Lambda
521531
if not self.SqsSubscription:
522532
subscription = self._inject_subscription(
@@ -534,7 +544,11 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
534544
# SNS -> SQS(Create New) -> Lambda
535545
if isinstance(self.SqsSubscription, bool):
536546
resources = [] # type: ignore[var-annotated]
537-
queue = self._inject_sqs_queue(function) # type: ignore[no-untyped-call]
547+
548+
fifo_topic = self._check_fifo_topic(
549+
get_logical_id_from_intrinsic(self.Topic), kwargs.get("original_template"), intrinsics_resolver
550+
)
551+
queue = self._inject_sqs_queue(function, fifo_topic) # type: ignore[no-untyped-call]
538552
queue_arn = queue.get_runtime_attr("arn")
539553
queue_url = queue.get_runtime_attr("queue_url")
540554

@@ -591,6 +605,19 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
591605
resources.append(subscription)
592606
return resources
593607

608+
def _check_fifo_topic(
609+
self,
610+
topic_id: Optional[str],
611+
template: Optional[Dict[str, Any]],
612+
intrinsics_resolver: IntrinsicsResolver,
613+
) -> bool:
614+
if not topic_id or not template:
615+
return False
616+
617+
resources = template.get("Resources", {})
618+
properties = resources.get(topic_id, {}).get("Properties", {})
619+
return intrinsics_resolver.resolve_parameter_refs(properties.get("FifoTopic", False)) # type: ignore[no-any-return]
620+
594621
def _inject_subscription( # noqa: PLR0913
595622
self,
596623
protocol: str,
@@ -621,8 +648,12 @@ def _inject_subscription( # noqa: PLR0913
621648

622649
return subscription
623650

624-
def _inject_sqs_queue(self, function): # type: ignore[no-untyped-def]
625-
return SQSQueue(self.logical_id + "Queue", attributes=function.get_passthrough_resource_attributes())
651+
def _inject_sqs_queue(self, function, fifo_topic=False): # type: ignore[no-untyped-def]
652+
queue = SQSQueue(self.logical_id + "Queue", attributes=function.get_passthrough_resource_attributes())
653+
654+
if fifo_topic:
655+
queue.FifoQueue = fifo_topic
656+
return queue
626657

627658
def _inject_sqs_event_source_mapping(self, function, role, queue_arn, batch_size=None, enabled=None): # type: ignore[no-untyped-def]
628659
event_source = SQS(

samtranslator/model/sam_resources.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
351351
kwargs["event_resources"],
352352
intrinsics_resolver,
353353
lambda_alias=lambda_alias,
354+
original_template=kwargs.get("original_template"),
354355
)
355356
except InvalidEventException as e:
356357
raise InvalidResourceException(self.logical_id, e.message) from e
@@ -775,13 +776,14 @@ def order_events(event: Tuple[str, Any]) -> Any:
775776
return logical_id
776777
return event_dict.get("Properties", {}).get("Path", logical_id)
777778

778-
def _generate_event_resources(
779+
def _generate_event_resources( # noqa: PLR0913
779780
self,
780781
lambda_function: LambdaFunction,
781782
execution_role: Optional[IAMRole],
782783
event_resources: Any,
783784
intrinsics_resolver: IntrinsicsResolver,
784785
lambda_alias: Optional[LambdaAlias] = None,
786+
original_template: Optional[Dict[str, Any]] = None,
785787
) -> List[Any]:
786788
"""Generates and returns the resources associated with this function's events.
787789
@@ -811,6 +813,7 @@ def _generate_event_resources(
811813
"function": lambda_alias or lambda_function,
812814
"role": execution_role,
813815
"intrinsics_resolver": intrinsics_resolver,
816+
"original_template": original_template,
814817
}
815818

816819
for name, resource in event_resources[logical_id].items():

samtranslator/model/sqs.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,22 @@
22

33
from samtranslator.model import GeneratedProperty, PropertyType, Resource
44
from samtranslator.model.intrinsics import fnGetAtt, ref
5+
from samtranslator.model.types import PassThrough
56

67

78
class SQSQueue(Resource):
89
resource_type = "AWS::SQS::Queue"
9-
property_types: Dict[str, PropertyType] = {"Tags": GeneratedProperty()}
10+
property_types: Dict[str, PropertyType] = {
11+
"FifoQueue": GeneratedProperty(),
12+
"Tags": GeneratedProperty(),
13+
}
1014
runtime_attrs = {
1115
"queue_url": lambda self: ref(self.logical_id),
1216
"arn": lambda self: fnGetAtt(self.logical_id, "Arn"),
1317
}
1418

19+
FifoQueue: PassThrough
20+
1521

1622
class SQSQueuePolicy(Resource):
1723
resource_type = "AWS::SQS::QueuePolicy"

tests/model/eventsources/test_sns_event_source.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ def setUp(self):
1818
self.function.get_passthrough_resource_attributes = Mock()
1919
self.function.get_passthrough_resource_attributes.return_value = {}
2020

21+
self.kwargs = {"function": self.function, "intrinsics_resolver": Mock()}
22+
2123
def test_to_cloudformation_returns_permission_and_subscription_resources(self):
22-
resources = self.sns_event_source.to_cloudformation(function=self.function)
24+
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
2325
self.assertEqual(len(resources), 2)
2426
self.assertEqual(resources[0].resource_type, "AWS::Lambda::Permission")
2527
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
@@ -37,7 +39,7 @@ def test_to_cloudformation_passes_the_region(self):
3739
region = "us-west-2"
3840
self.sns_event_source.Region = region
3941

40-
resources = self.sns_event_source.to_cloudformation(function=self.function)
42+
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
4143
self.assertEqual(len(resources), 2)
4244
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
4345
subscription = resources[1]
@@ -51,7 +53,7 @@ def test_to_cloudformation_passes_the_filter_policy(self):
5153
}
5254
self.sns_event_source.FilterPolicy = filterPolicy
5355

54-
resources = self.sns_event_source.to_cloudformation(function=self.function)
56+
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
5557
self.assertEqual(len(resources), 2)
5658
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
5759
subscription = resources[1]
@@ -61,7 +63,7 @@ def test_to_cloudformation_passes_the_filter_policy_scope(self):
6163
filterPolicyScope = "MessageAttributes"
6264
self.sns_event_source.FilterPolicyScope = filterPolicyScope
6365

64-
resources = self.sns_event_source.to_cloudformation(function=self.function)
66+
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
6567
self.assertEqual(len(resources), 2)
6668
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
6769
subscription = resources[1]
@@ -71,7 +73,7 @@ def test_to_cloudformation_passes_the_redrive_policy(self):
7173
redrive_policy = {"deadLetterTargetArn": "arn:aws:sqs:us-east-2:123456789012:MyDeadLetterQueue"}
7274
self.sns_event_source.RedrivePolicy = redrive_policy
7375

74-
resources = self.sns_event_source.to_cloudformation(function=self.function)
76+
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
7577
self.assertEqual(len(resources), 2)
7678
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
7779
subscription = resources[1]
@@ -89,7 +91,7 @@ def test_to_cloudformation_when_sqs_subscription_disable(self):
8991
sqsSubscription = False
9092
self.sns_event_source.SqsSubscription = sqsSubscription
9193

92-
resources = self.sns_event_source.to_cloudformation(function=self.function)
94+
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
9395
self.assertEqual(len(resources), 2)
9496
self.assertEqual(resources[0].resource_type, "AWS::Lambda::Permission")
9597
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
Transform: AWS::Serverless-2016-10-31
2+
Description: SNS Fifo
3+
Globals:
4+
Function:
5+
Timeout: 3
6+
7+
Resources:
8+
MyFifoTopic:
9+
Type: AWS::SNS::Topic
10+
Properties:
11+
ContentBasedDeduplication: true
12+
FifoTopic: true
13+
TopicName: myFifoTopic.fifo
14+
15+
HelloWorldFunction:
16+
Type: AWS::Serverless::Function
17+
Properties:
18+
InlineCode: |
19+
exports.handler = async (event, context, callback) => {
20+
return {
21+
statusCode: 200,
22+
body: 'Success'
23+
}
24+
}
25+
Handler: index.handler
26+
Runtime: nodejs16.x
27+
Events:
28+
FifoTrigger:
29+
Type: SNS
30+
Properties:
31+
SqsSubscription: true
32+
Topic: !Ref MyFifoTopic
33+
Metadata:
34+
DockerTag: nodejs12.x-v1
35+
DockerContext: ./hello-world
36+
Dockerfile: Dockerfile
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
{
2+
"Description": "SNS Fifo",
3+
"Resources": {
4+
"HelloWorldFunction": {
5+
"Metadata": {
6+
"DockerContext": "./hello-world",
7+
"DockerTag": "nodejs12.x-v1",
8+
"Dockerfile": "Dockerfile"
9+
},
10+
"Properties": {
11+
"Code": {
12+
"ZipFile": "exports.handler = async (event, context, callback) => {\n return {\n statusCode: 200,\n body: 'Success'\n }\n}\n"
13+
},
14+
"Handler": "index.handler",
15+
"Role": {
16+
"Fn::GetAtt": [
17+
"HelloWorldFunctionRole",
18+
"Arn"
19+
]
20+
},
21+
"Runtime": "nodejs16.x",
22+
"Tags": [
23+
{
24+
"Key": "lambda:createdBy",
25+
"Value": "SAM"
26+
}
27+
],
28+
"Timeout": 3
29+
},
30+
"Type": "AWS::Lambda::Function"
31+
},
32+
"HelloWorldFunctionFifoTrigger": {
33+
"Properties": {
34+
"Endpoint": {
35+
"Fn::GetAtt": [
36+
"HelloWorldFunctionFifoTriggerQueue",
37+
"Arn"
38+
]
39+
},
40+
"Protocol": "sqs",
41+
"TopicArn": {
42+
"Ref": "MyFifoTopic"
43+
}
44+
},
45+
"Type": "AWS::SNS::Subscription"
46+
},
47+
"HelloWorldFunctionFifoTriggerEventSourceMapping": {
48+
"Properties": {
49+
"BatchSize": 10,
50+
"Enabled": true,
51+
"EventSourceArn": {
52+
"Fn::GetAtt": [
53+
"HelloWorldFunctionFifoTriggerQueue",
54+
"Arn"
55+
]
56+
},
57+
"FunctionName": {
58+
"Ref": "HelloWorldFunction"
59+
}
60+
},
61+
"Type": "AWS::Lambda::EventSourceMapping"
62+
},
63+
"HelloWorldFunctionFifoTriggerQueue": {
64+
"Properties": {
65+
"FifoQueue": true
66+
},
67+
"Type": "AWS::SQS::Queue"
68+
},
69+
"HelloWorldFunctionFifoTriggerQueuePolicy": {
70+
"Properties": {
71+
"PolicyDocument": {
72+
"Statement": [
73+
{
74+
"Action": "sqs:SendMessage",
75+
"Condition": {
76+
"ArnEquals": {
77+
"aws:SourceArn": {
78+
"Ref": "MyFifoTopic"
79+
}
80+
}
81+
},
82+
"Effect": "Allow",
83+
"Principal": "*",
84+
"Resource": {
85+
"Fn::GetAtt": [
86+
"HelloWorldFunctionFifoTriggerQueue",
87+
"Arn"
88+
]
89+
}
90+
}
91+
],
92+
"Version": "2012-10-17"
93+
},
94+
"Queues": [
95+
{
96+
"Ref": "HelloWorldFunctionFifoTriggerQueue"
97+
}
98+
]
99+
},
100+
"Type": "AWS::SQS::QueuePolicy"
101+
},
102+
"HelloWorldFunctionRole": {
103+
"Properties": {
104+
"AssumeRolePolicyDocument": {
105+
"Statement": [
106+
{
107+
"Action": [
108+
"sts:AssumeRole"
109+
],
110+
"Effect": "Allow",
111+
"Principal": {
112+
"Service": [
113+
"lambda.amazonaws.com"
114+
]
115+
}
116+
}
117+
],
118+
"Version": "2012-10-17"
119+
},
120+
"ManagedPolicyArns": [
121+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
122+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
123+
],
124+
"Tags": [
125+
{
126+
"Key": "lambda:createdBy",
127+
"Value": "SAM"
128+
}
129+
]
130+
},
131+
"Type": "AWS::IAM::Role"
132+
},
133+
"MyFifoTopic": {
134+
"Properties": {
135+
"ContentBasedDeduplication": true,
136+
"FifoTopic": true,
137+
"TopicName": "myFifoTopic.fifo"
138+
},
139+
"Type": "AWS::SNS::Topic"
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)