Skip to content

Commit 66e66a2

Browse files
committed
feat: support for Provisioned Poller (MSK and SelfManagedKafka)
1 parent 158c674 commit 66e66a2

File tree

10 files changed

+512
-0
lines changed

10 files changed

+512
-0
lines changed

.cfnlintrc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ ignore_templates:
136136
- tests/translator/output/**/function_with_intrinsics_resource_attribute.json # CFN now supports intrinsics in DeletionPolicy
137137
- tests/translator/output/**/function_with_snapstart.json # Snapstart intentionally not attached to a lambda version which causes lint issues
138138
- tests/translator/output/**/managed_policies_everything.json # intentionally contains wrong arns
139+
- tests/translator/output/**/function_with_provisioned_poller_config.json
139140
ignore_checks:
140141
- E2531 # Deprecated runtime; not relevant for transform tests
141142
- E2533 # Another deprecated runtime; not relevant for transform tests

samtranslator/internal/schema_source/aws_serverless_function.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ class MSKEventProperties(BaseModel):
417417
Topics: PassThroughProp = mskeventproperties("Topics")
418418
SourceAccessConfigurations: Optional[PassThroughProp] = mskeventproperties("SourceAccessConfigurations")
419419
DestinationConfig: Optional[PassThroughProp] # TODO: add documentation
420+
ProvisionedPollerConfig: Optional[PassThroughProp]
420421

421422

422423
class MSKEvent(BaseModel):
@@ -455,6 +456,7 @@ class SelfManagedKafkaEventProperties(BaseModel):
455456
StartingPosition: Optional[PassThroughProp] # TODO: add documentation
456457
StartingPositionTimestamp: Optional[PassThroughProp] # TODO: add documentation
457458
Topics: PassThroughProp = selfmanagedkafkaeventproperties("Topics")
459+
ProvisionedPollerConfig: Optional[PassThroughProp]
458460

459461

460462
class SelfManagedKafkaEvent(BaseModel):

samtranslator/model/eventsources/pull.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class PullEventSource(ResourceMacro, metaclass=ABCMeta):
5555
"KmsKeyArn": PassThroughProperty(False),
5656
"ConsumerGroupId": PropertyType(False, IS_STR),
5757
"ScalingConfig": PropertyType(False, IS_DICT),
58+
"ProvisionedPollerConfig": PropertyType(False, IS_DICT),
5859
}
5960

6061
BatchSize: Optional[Intrinsicable[int]]
@@ -78,6 +79,7 @@ class PullEventSource(ResourceMacro, metaclass=ABCMeta):
7879
KmsKeyArn: Optional[Intrinsicable[str]]
7980
ConsumerGroupId: Optional[Intrinsicable[str]]
8081
ScalingConfig: Optional[Dict[str, Any]]
82+
ProvisionedPollerConfig: Optional[Dict[str, Any]]
8183

8284
@abstractmethod
8385
def get_policy_arn(self) -> Optional[str]:
@@ -145,6 +147,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
145147
lambda_eventsourcemapping.FilterCriteria = self.FilterCriteria
146148
lambda_eventsourcemapping.KmsKeyArn = self.KmsKeyArn
147149
lambda_eventsourcemapping.ScalingConfig = self.ScalingConfig
150+
lambda_eventsourcemapping.ProvisionedPollerConfig = self.ProvisionedPollerConfig
148151
self._validate_filter_criteria()
149152

150153
if self.KafkaBootstrapServers:

samtranslator/model/lambda_.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class LambdaEventSourceMapping(Resource):
122122
"AmazonManagedKafkaEventSourceConfig": GeneratedProperty(),
123123
"SelfManagedKafkaEventSourceConfig": GeneratedProperty(),
124124
"ScalingConfig": GeneratedProperty(),
125+
"ProvisionedPollerConfig": GeneratedProperty(),
125126
}
126127

127128
runtime_attrs = {"name": lambda self: ref(self.logical_id)}

samtranslator/schema/schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276146,6 +276146,9 @@
276146276146
"markdownDescription": "The maximum amount of time to gather records before invoking the function, in seconds\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumBatchingWindowInSeconds`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumbatchingwindowinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
276147276147
"title": "MaximumBatchingWindowInSeconds"
276148276148
},
276149+
"ProvisionedPollerConfig": {
276150+
"$ref": "#/definitions/PassThroughProp"
276151+
},
276149276152
"SourceAccessConfigurations": {
276150276153
"allOf": [
276151276154
{
@@ -276985,6 +276988,9 @@
276985276988
"KmsKeyArn": {
276986276989
"$ref": "#/definitions/PassThroughProp"
276987276990
},
276991+
"ProvisionedPollerConfig": {
276992+
"$ref": "#/definitions/PassThroughProp"
276993+
},
276988276994
"SourceAccessConfigurations": {
276989276995
"allOf": [
276990276996
{

schema_source/sam.schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,6 +2413,9 @@
24132413
"markdownDescription": "The maximum amount of time to gather records before invoking the function, in seconds\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumBatchingWindowInSeconds`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumbatchingwindowinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
24142414
"title": "MaximumBatchingWindowInSeconds"
24152415
},
2416+
"ProvisionedPollerConfig": {
2417+
"$ref": "#/definitions/PassThroughProp"
2418+
},
24162419
"SourceAccessConfigurations": {
24172420
"allOf": [
24182421
{
@@ -3183,6 +3186,9 @@
31833186
"KmsKeyArn": {
31843187
"$ref": "#/definitions/PassThroughProp"
31853188
},
3189+
"ProvisionedPollerConfig": {
3190+
"$ref": "#/definitions/PassThroughProp"
3191+
},
31863192
"SourceAccessConfigurations": {
31873193
"allOf": [
31883194
{
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
Resources:
2+
KafkaEventsFunction:
3+
Type: AWS::Serverless::Function
4+
Properties:
5+
CodeUri: s3://sam-demo-bucket/metricsConfig.zip
6+
Handler: index.handler
7+
Runtime: nodejs16.x
8+
Events:
9+
MyMskEvent:
10+
Type: MSK
11+
Properties:
12+
StartingPosition: LATEST
13+
Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2
14+
Topics:
15+
- MyDummyTestTopic
16+
ConsumerGroupId: consumergroup1
17+
ProvisionedPollerConfig:
18+
MinimumPollers: 5
19+
MaximumPollers: 10
20+
MyKafkaCluster:
21+
Type: SelfManagedKafka
22+
Properties:
23+
KafkaBootstrapServers:
24+
- abc.xyz.com:9092
25+
- 123.45.67.89:9096
26+
Topics:
27+
- Topic1
28+
SourceAccessConfigurations:
29+
- Type: SASL_SCRAM_512_AUTH
30+
URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
31+
- Type: VPC_SUBNET
32+
URI: subnet:subnet-12345
33+
- Type: VPC_SECURITY_GROUP
34+
URI: security_group:sg-67890
35+
ConsumerGroupId: consumergroup1
36+
StartingPosition: AT_TIMESTAMP
37+
StartingPositionTimestamp: 1672560000
38+
ProvisionedPollerConfig:
39+
MinimumPollers: 2
40+
MaximumPollers: 100
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
{
2+
"Resources": {
3+
"KafkaEventsFunction": {
4+
"Properties": {
5+
"Code": {
6+
"S3Bucket": "sam-demo-bucket",
7+
"S3Key": "metricsConfig.zip"
8+
},
9+
"Handler": "index.handler",
10+
"Role": {
11+
"Fn::GetAtt": [
12+
"KafkaEventsFunctionRole",
13+
"Arn"
14+
]
15+
},
16+
"Runtime": "nodejs16.x",
17+
"Tags": [
18+
{
19+
"Key": "lambda:createdBy",
20+
"Value": "SAM"
21+
}
22+
]
23+
},
24+
"Type": "AWS::Lambda::Function"
25+
},
26+
"KafkaEventsFunctionMyKafkaCluster": {
27+
"Properties": {
28+
"FunctionName": {
29+
"Ref": "KafkaEventsFunction"
30+
},
31+
"ProvisionedPollerConfig": {
32+
"MaximumPollers": 100,
33+
"MinimumPollers": 2
34+
},
35+
"SelfManagedEventSource": {
36+
"Endpoints": {
37+
"KafkaBootstrapServers": [
38+
"abc.xyz.com:9092",
39+
"123.45.67.89:9096"
40+
]
41+
}
42+
},
43+
"SelfManagedKafkaEventSourceConfig": {
44+
"ConsumerGroupId": "consumergroup1"
45+
},
46+
"SourceAccessConfigurations": [
47+
{
48+
"Type": "SASL_SCRAM_512_AUTH",
49+
"URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
50+
},
51+
{
52+
"Type": "VPC_SUBNET",
53+
"URI": "subnet:subnet-12345"
54+
},
55+
{
56+
"Type": "VPC_SECURITY_GROUP",
57+
"URI": "security_group:sg-67890"
58+
}
59+
],
60+
"StartingPosition": "AT_TIMESTAMP",
61+
"StartingPositionTimestamp": 1672560000,
62+
"Topics": [
63+
"Topic1"
64+
]
65+
},
66+
"Type": "AWS::Lambda::EventSourceMapping"
67+
},
68+
"KafkaEventsFunctionMyMskEvent": {
69+
"Properties": {
70+
"AmazonManagedKafkaEventSourceConfig": {
71+
"ConsumerGroupId": "consumergroup1"
72+
},
73+
"EventSourceArn": {
74+
"Fn::Sub": "arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2"
75+
},
76+
"FunctionName": {
77+
"Ref": "KafkaEventsFunction"
78+
},
79+
"ProvisionedPollerConfig": {
80+
"MaximumPollers": 10,
81+
"MinimumPollers": 5
82+
},
83+
"StartingPosition": "LATEST",
84+
"Topics": [
85+
"MyDummyTestTopic"
86+
]
87+
},
88+
"Type": "AWS::Lambda::EventSourceMapping"
89+
},
90+
"KafkaEventsFunctionRole": {
91+
"Properties": {
92+
"AssumeRolePolicyDocument": {
93+
"Statement": [
94+
{
95+
"Action": [
96+
"sts:AssumeRole"
97+
],
98+
"Effect": "Allow",
99+
"Principal": {
100+
"Service": [
101+
"lambda.amazonaws.com"
102+
]
103+
}
104+
}
105+
],
106+
"Version": "2012-10-17"
107+
},
108+
"ManagedPolicyArns": [
109+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
110+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole"
111+
],
112+
"Policies": [
113+
{
114+
"PolicyDocument": {
115+
"Statement": [
116+
{
117+
"Action": [
118+
"secretsmanager:GetSecretValue"
119+
],
120+
"Effect": "Allow",
121+
"Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
122+
},
123+
{
124+
"Action": [
125+
"ec2:CreateNetworkInterface",
126+
"ec2:DescribeNetworkInterfaces",
127+
"ec2:DeleteNetworkInterface",
128+
"ec2:DescribeVpcs",
129+
"ec2:DescribeSubnets",
130+
"ec2:DescribeSecurityGroups"
131+
],
132+
"Effect": "Allow",
133+
"Resource": "*"
134+
}
135+
],
136+
"Version": "2012-10-17"
137+
},
138+
"PolicyName": "SelfManagedKafkaExecutionRolePolicy"
139+
}
140+
],
141+
"Tags": [
142+
{
143+
"Key": "lambda:createdBy",
144+
"Value": "SAM"
145+
}
146+
]
147+
},
148+
"Type": "AWS::IAM::Role"
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)