Skip to content

Commit 5d3ea53

Browse files
mndevecivalerena
andauthored
feat: event source filtering (#2241)
* feat: add support for event filtering (DDB, Kinesis, SQS) (#90) * Add support for Event Filtering (DDB, Kinesis, SQS) * PR feedback Rename test file. Move to a variable the list of event types that have event filtering. * chore: change a test case that CFN doesn't support (#98) Co-authored-by: Renato Valenzuela <[email protected]>
1 parent f7a5eb8 commit 5d3ea53

14 files changed

+598
-0
lines changed

samtranslator/model/eventsources/pull.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ class PullEventSource(ResourceMacro):
2121
:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source
2222
"""
2323

24+
# Event types that support `FilterCriteria`, stored as a list to keep the alphabetical order
25+
RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DynamoDB", "Kinesis", "SQS"]
26+
2427
resource_type = None
2528
requires_stream_queue_broker = True
2629
property_types = {
@@ -43,6 +46,7 @@ class PullEventSource(ResourceMacro):
4346
"TumblingWindowInSeconds": PropertyType(False, is_type(int)),
4447
"FunctionResponseTypes": PropertyType(False, is_type(list)),
4548
"KafkaBootstrapServers": PropertyType(False, is_type(list)),
49+
"FilterCriteria": PropertyType(False, is_type(dict)),
4650
}
4751

4852
def get_policy_arn(self):
@@ -102,6 +106,8 @@ def to_cloudformation(self, **kwargs):
102106
lambda_eventsourcemapping.SourceAccessConfigurations = self.SourceAccessConfigurations
103107
lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds
104108
lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes
109+
lambda_eventsourcemapping.FilterCriteria = self.FilterCriteria
110+
self._validate_filter_criteria()
105111

106112
if self.KafkaBootstrapServers:
107113
lambda_eventsourcemapping.SelfManagedEventSource = {
@@ -169,6 +175,20 @@ def _link_policy(self, role, destination_config_policy=None):
169175
if not destination_config_policy.get("PolicyDocument") in [d["PolicyDocument"] for d in role.Policies]:
170176
role.Policies.append(destination_config_policy)
171177

178+
def _validate_filter_criteria(self):
179+
if not self.FilterCriteria or is_intrinsic(self.FilterCriteria):
180+
return
181+
if self.resource_type not in self.RESOURCE_TYPES_WITH_EVENT_FILTERING:
182+
raise InvalidEventException(
183+
self.relative_id,
184+
"FilterCriteria is only available for {} events.".format(
185+
", ".join(self.RESOURCE_TYPES_WITH_EVENT_FILTERING)
186+
),
187+
)
188+
# FilterCriteria is either empty or only has "Filters"
189+
if list(self.FilterCriteria.keys()) not in [[], ["Filters"]]:
190+
raise InvalidEventException(self.relative_id, "FilterCriteria field has a wrong format")
191+
172192

173193
class Kinesis(PullEventSource):
174194
"""Kinesis event source."""

samtranslator/model/lambda_.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class LambdaEventSourceMapping(Resource):
7979
"TumblingWindowInSeconds": PropertyType(False, is_type(int)),
8080
"FunctionResponseTypes": PropertyType(False, is_type(list)),
8181
"SelfManagedEventSource": PropertyType(False, is_type(dict)),
82+
"FilterCriteria": PropertyType(False, is_type(dict)),
8283
}
8384

8485
runtime_attrs = {"name": lambda self: ref(self.logical_id)}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
Resources:
2+
WrongFilterName:
3+
Type: AWS::Serverless::Function
4+
Properties:
5+
CodeUri: s3://sam-demo-bucket/filtered_events.zip
6+
Handler: index.handler
7+
Runtime: nodejs16.x
8+
Events:
9+
DynamoDBStreamEvent:
10+
Type: DynamoDB
11+
Properties:
12+
Stream: !GetAtt DynamoDBTable.StreamArn
13+
StartingPosition: TRIM_HORIZON
14+
FilterCriteria:
15+
FiltersToUse:
16+
- Pattern: '{"name": "value"}'
17+
18+
NotSupportedPullEvent:
19+
Type: AWS::Serverless::Function
20+
Properties:
21+
CodeUri: s3://sam-demo-bucket/filtered_events.zip
22+
Handler: index.handler
23+
Runtime: nodejs16.x
24+
Events:
25+
KafkaEvent:
26+
Type: MSK
27+
Properties:
28+
StartingPosition: LATEST
29+
Stream: arn:aws:kafka:us-east-1:012345678012:cluster/clusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2
30+
Topics:
31+
- MyTopic
32+
FilterCriteria:
33+
Filters:
34+
- Pattern: '{"name": "value"}'
35+
36+
37+
NotSupportedPushEvent:
38+
Type: AWS::Serverless::Function
39+
Properties:
40+
CodeUri: s3://sam-demo-bucket/filtered_events.zip
41+
Handler: index.handler
42+
Runtime: nodejs16.x
43+
Events:
44+
SNSEvent:
45+
Type: SNS
46+
Properties:
47+
Topic: !GetAtt MySnsTopic.Arn
48+
FilterCriteria:
49+
Filters:
50+
- Pattern: '{"name": "value"}'
51+
52+
DynamoDBTable:
53+
Type: AWS::DynamoDB::Table
54+
55+
MySnsTopic:
56+
Type: AWS::SNS::Topic
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
Resources:
2+
FilteredEventsFunction:
3+
Type: AWS::Serverless::Function
4+
Properties:
5+
CodeUri: s3://sam-demo-bucket/filtered_events.zip
6+
Handler: index.handler
7+
Runtime: nodejs16.x
8+
Events:
9+
KinesisStream:
10+
Type: Kinesis
11+
Properties:
12+
Stream: !GetAtt KinesisStream.Arn
13+
StartingPosition: LATEST
14+
FilterCriteria:
15+
Filters:
16+
- Pattern: '{"name": "value"}'
17+
- Pattern: '{"name2": "value2"}'
18+
DynamoDBStreamEvent:
19+
Type: DynamoDB
20+
Properties:
21+
Stream: !GetAtt DynamoDBTable.StreamArn
22+
StartingPosition: TRIM_HORIZON
23+
FilterCriteria:
24+
Filters:
25+
- Pattern: '{
26+
"dynamodb": {
27+
"NewImage": {
28+
"value": { "S": ["test"] }
29+
}
30+
}
31+
}'
32+
MySqsQueue:
33+
Type: SQS
34+
Properties:
35+
Queue: !GetAtt MySqsQueue.Arn
36+
FilterCriteria:
37+
Filters:
38+
- Pattern: '{"name": "value"}'
39+
40+
KinesisStream:
41+
Type: AWS::Kinesis::Stream
42+
Properties:
43+
ShardCount: 1
44+
45+
DynamoDBTable:
46+
Type: AWS::DynamoDB::Table
47+
48+
MySqsQueue:
49+
Type: AWS::SQS::Queue

tests/translator/input/intrinsic_functions.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ Resources:
108108
Fn::GetAtt: [MyTable, "StreamArn"]
109109
BatchSize: 200
110110
StartingPosition: LATEST
111+
FilterCriteria:
112+
Fn::Select:
113+
- "1"
114+
-
115+
- {}
116+
- { "Filters": { "Pattern": "{\"value\": \"b\"}" }}
111117

112118
MyTable:
113119
Type: AWS::DynamoDB::Table
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
{
2+
"Resources": {
3+
"KinesisStream": {
4+
"Type": "AWS::Kinesis::Stream",
5+
"Properties": {
6+
"ShardCount": 1
7+
}
8+
},
9+
"DynamoDBTable": {
10+
"Type": "AWS::DynamoDB::Table"
11+
},
12+
"MySqsQueue": {
13+
"Type": "AWS::SQS::Queue"
14+
},
15+
"FilteredEventsFunction": {
16+
"Type": "AWS::Lambda::Function",
17+
"Properties": {
18+
"Code": {
19+
"S3Bucket": "sam-demo-bucket",
20+
"S3Key": "filtered_events.zip"
21+
},
22+
"Handler": "index.handler",
23+
"Role": {
24+
"Fn::GetAtt": [
25+
"FilteredEventsFunctionRole",
26+
"Arn"
27+
]
28+
},
29+
"Runtime": "nodejs16.x",
30+
"Tags": [
31+
{
32+
"Key": "lambda:createdBy",
33+
"Value": "SAM"
34+
}
35+
]
36+
}
37+
},
38+
"FilteredEventsFunctionRole": {
39+
"Type": "AWS::IAM::Role",
40+
"Properties": {
41+
"AssumeRolePolicyDocument": {
42+
"Version": "2012-10-17",
43+
"Statement": [
44+
{
45+
"Action": [
46+
"sts:AssumeRole"
47+
],
48+
"Effect": "Allow",
49+
"Principal": {
50+
"Service": [
51+
"lambda.amazonaws.com"
52+
]
53+
}
54+
}
55+
]
56+
},
57+
"ManagedPolicyArns": [
58+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
59+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole",
60+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole",
61+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
62+
],
63+
"Tags": [
64+
{
65+
"Key": "lambda:createdBy",
66+
"Value": "SAM"
67+
}
68+
]
69+
}
70+
},
71+
"FilteredEventsFunctionDynamoDBStreamEvent": {
72+
"Type": "AWS::Lambda::EventSourceMapping",
73+
"Properties": {
74+
"EventSourceArn": {
75+
"Fn::GetAtt": [
76+
"DynamoDBTable",
77+
"StreamArn"
78+
]
79+
},
80+
"FunctionName": {
81+
"Ref": "FilteredEventsFunction"
82+
},
83+
"StartingPosition": "TRIM_HORIZON",
84+
"FilterCriteria": {
85+
"Filters": [
86+
{
87+
"Pattern": "{ \"dynamodb\": { \"NewImage\": { \"value\": { \"S\": [\"test\"] } } } }"
88+
}
89+
]
90+
}
91+
}
92+
},
93+
"FilteredEventsFunctionKinesisStream": {
94+
"Type": "AWS::Lambda::EventSourceMapping",
95+
"Properties": {
96+
"EventSourceArn": {
97+
"Fn::GetAtt": [
98+
"KinesisStream",
99+
"Arn"
100+
]
101+
},
102+
"FunctionName": {
103+
"Ref": "FilteredEventsFunction"
104+
},
105+
"StartingPosition": "LATEST",
106+
"FilterCriteria": {
107+
"Filters": [
108+
{
109+
"Pattern": "{\"name\": \"value\"}"
110+
},
111+
{
112+
"Pattern": "{\"name2\": \"value2\"}"
113+
}
114+
]
115+
}
116+
}
117+
},
118+
"FilteredEventsFunctionMySqsQueue": {
119+
"Type": "AWS::Lambda::EventSourceMapping",
120+
"Properties": {
121+
"EventSourceArn": {
122+
"Fn::GetAtt": [
123+
"MySqsQueue",
124+
"Arn"
125+
]
126+
},
127+
"FunctionName": {
128+
"Ref": "FilteredEventsFunction"
129+
},
130+
"FilterCriteria": {
131+
"Filters": [
132+
{
133+
"Pattern": "{\"name\": \"value\"}"
134+
}
135+
]
136+
}
137+
}
138+
}
139+
}
140+
}

tests/translator/output/aws-cn/intrinsic_functions.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,19 @@
605605
"MyTable",
606606
"StreamArn"
607607
]
608+
},
609+
"FilterCriteria": {
610+
"Fn::Select": [
611+
"1",
612+
[
613+
{},
614+
{
615+
"Filters": {
616+
"Pattern": "{\"value\": \"b\"}"
617+
}
618+
}
619+
]
620+
]
608621
}
609622
}
610623
},

0 commit comments

Comments
 (0)