Skip to content

Commit 4e12a3a

Browse files
committed
Add support for KafkaSchemaRegistry for MSK and SMK
1 parent 198bd79 commit 4e12a3a

28 files changed

+4424
-37
lines changed

.cfnlintrc.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ ignore_templates:
139139
- tests/translator/output/**/managed_policies_everything.json # intentionally contains wrong arns
140140
- tests/translator/output/**/function_with_provisioned_poller_config.json
141141
- tests/translator/output/**/function_with_metrics_config.json
142+
- tests/translator/output/**/function_with_self_managed_kafka_and_schema_registry.json # cfnlint is not updated to recognize the SchemaRegistryConfig property
143+
- tests/translator/output/**/function_with_msk_with_schema_registry_config.json # cfnlint is not updated to recognize the SchemaRegistryConfig property
142144

143145
ignore_checks:
144146
- E2531 # Deprecated runtime; not relevant for transform tests

integration/combination/test_function_with_msk.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ def test_function_with_msk_trigger_and_s3_onfailure_events_destinations(self):
4141
"combination/function_with_msk_trigger_and_s3_onfailure_events_destinations", parameters
4242
)
4343

44+
def test_function_with_msk_trigger_and_confluent_schema_registry(self):
45+
companion_stack_outputs = self.companion_stack_outputs
46+
parameters = self.get_parameters(companion_stack_outputs)
47+
cluster_name = "MskCluster4-" + generate_suffix()
48+
parameters.append(self.generate_parameter("MskClusterName4", cluster_name))
49+
self._common_validations_for_MSK(
50+
"combination/function_with_msk_trigger_and_confluent_schema_registry", parameters
51+
)
52+
4453
def _common_validations_for_MSK(self, file_name, parameters):
4554
self.create_and_verify_stack(file_name, parameters)
4655

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[
2+
{
3+
"LogicalResourceId": "MyMskStreamProcessor",
4+
"ResourceType": "AWS::Lambda::Function"
5+
},
6+
{
7+
"LogicalResourceId": "MyLambdaExecutionRole",
8+
"ResourceType": "AWS::IAM::Role"
9+
},
10+
{
11+
"LogicalResourceId": "MyMskCluster",
12+
"ResourceType": "AWS::MSK::Cluster"
13+
},
14+
{
15+
"LogicalResourceId": "MyMskStreamProcessorMyMskEvent",
16+
"ResourceType": "AWS::Lambda::EventSourceMapping"
17+
},
18+
{
19+
"LogicalResourceId": "PreCreatedS3Bucket",
20+
"ResourceType": "AWS::S3::Bucket"
21+
}
22+
]

integration/resources/templates/combination/function_with_msk.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Resources:
4545
VolumeSize: 1
4646
ClusterName:
4747
Ref: MskClusterName
48-
KafkaVersion: 2.4.1.1
48+
KafkaVersion: 3.8.x
4949
NumberOfBrokerNodes: 2
5050

5151
MyMskStreamProcessor:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
Parameters:
2+
PreCreatedSubnetOne:
3+
Type: String
4+
PreCreatedSubnetTwo:
5+
Type: String
6+
MskClusterName4:
7+
Type: String
8+
9+
Resources:
10+
MyLambdaExecutionRole:
11+
Type: AWS::IAM::Role
12+
Properties:
13+
AssumeRolePolicyDocument:
14+
Version: '2012-10-17'
15+
Statement:
16+
- Action: [sts:AssumeRole]
17+
Effect: Allow
18+
Principal:
19+
Service: [lambda.amazonaws.com]
20+
Policies:
21+
- PolicyName: IntegrationTestExecution
22+
PolicyDocument:
23+
Statement:
24+
- Action: [kafka:DescribeCluster, kafka:GetBootstrapBrokers, ec2:CreateNetworkInterface,
25+
ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface,
26+
ec2:DescribeSubnets, ec2:DescribeSecurityGroups, logs:CreateLogGroup,
27+
logs:CreateLogStream, logs:PutLogEvents, s3:ListBucket, kafka:DescribeClusterV2]
28+
Effect: Allow
29+
Resource: '*'
30+
ManagedPolicyArns:
31+
- !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
32+
Tags:
33+
- {Value: SAM, Key: lambda:createdBy}
34+
35+
MyMskCluster:
36+
Type: AWS::MSK::Cluster
37+
Properties:
38+
BrokerNodeGroupInfo:
39+
ClientSubnets:
40+
- Ref: PreCreatedSubnetOne
41+
- Ref: PreCreatedSubnetTwo
42+
InstanceType: kafka.t3.small
43+
StorageInfo:
44+
EBSStorageInfo:
45+
VolumeSize: 1
46+
ClusterName:
47+
Ref: MskClusterName4
48+
KafkaVersion: 3.8.x
49+
NumberOfBrokerNodes: 2
50+
51+
MyMskStreamProcessor:
52+
Type: AWS::Serverless::Function
53+
Properties:
54+
Runtime: nodejs18.x
55+
Handler: index.handler
56+
CodeUri: ${codeuri}
57+
Role:
58+
Fn::GetAtt: [MyLambdaExecutionRole, Arn]
59+
Events:
60+
MyMskEvent:
61+
Type: MSK
62+
Properties:
63+
StartingPosition: LATEST
64+
Stream:
65+
Ref: MyMskCluster
66+
Topics:
67+
- SchemaRegistryTestTopic
68+
ProvisionedPollerConfig:
69+
MinimumPollers: 1
70+
SchemaRegistryConfig:
71+
AccessConfigs:
72+
- Type: BASIC_AUTH
73+
URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
74+
SchemaValidationConfigs:
75+
- Attribute: KEY
76+
EventRecordFormat: JSON
77+
SchemaRegistryURI: https://confluent.us-east-2.aws.confluent.cloud:9092
78+
79+
80+
81+
PreCreatedS3Bucket:
82+
Type: AWS::S3::Bucket
83+
DeletionPolicy: Delete
84+
85+
Metadata:
86+
SamTransformTest: true

integration/resources/templates/combination/function_with_msk_trigger_and_s3_onfailure_events_destinations.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Resources:
4545
VolumeSize: 1
4646
ClusterName:
4747
Ref: MskClusterName3
48-
KafkaVersion: 2.4.1.1
48+
KafkaVersion: 3.8.x
4949
NumberOfBrokerNodes: 2
5050

5151
MyMskStreamProcessor:

integration/resources/templates/combination/function_with_msk_using_managed_policy.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Resources:
2020
VolumeSize: 1
2121
ClusterName:
2222
Ref: MskClusterName2
23-
KafkaVersion: 2.4.1.1
23+
KafkaVersion: 3.8.x
2424
NumberOfBrokerNodes: 2
2525

2626
MyMskStreamProcessor:

requirements/base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
boto3>=1.19.5,==1.*
1+
boto3>=1.34.0,<2.0.0
22
jsonschema<5,>=3.2 # TODO: evaluate risk of removing jsonschema 3.x support
33
typing_extensions>=4.4 # 3.8 doesn't have Required, TypeGuard and ParamSpec
44

requirements/dev.txt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ parameterized~=0.7
1212

1313
# Integration tests
1414
dateparser~=1.1
15-
boto3>=1.23,<2
16-
tenacity~=8.0
15+
boto3>=1.34.0,<2.0.0
16+
tenacity~=9.0
1717

1818
# Requirements for examples
1919
requests~=2.28
@@ -26,6 +26,9 @@ ruamel.yaml==0.17.21 # It can parse yaml while perserving comments
2626
mypy~=1.3.0
2727

2828
# types
29-
boto3-stubs[appconfig,serverlessrepo]>=1.19.5,==1.*
29+
boto3-stubs[appconfig,serverlessrepo]>=1.34.0,<2.0.0
3030
types-PyYAML~=6.0
3131
types-jsonschema~=3.2
32+
33+
# CloudFormation CLI tools
34+
cloudformation-cli>=0.2.39,<0.3.0

samtranslator/internal/schema_source/aws_serverless_function.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ class MSKEventProperties(BaseModel):
421421
SourceAccessConfigurations: Optional[PassThroughProp] = mskeventproperties("SourceAccessConfigurations")
422422
DestinationConfig: Optional[PassThroughProp] # TODO: add documentation
423423
ProvisionedPollerConfig: Optional[PassThroughProp]
424+
SchemaRegistryConfig: Optional[PassThroughProp]
424425

425426

426427
class MSKEvent(BaseModel):
@@ -460,6 +461,7 @@ class SelfManagedKafkaEventProperties(BaseModel):
460461
StartingPositionTimestamp: Optional[PassThroughProp] # TODO: add documentation
461462
Topics: PassThroughProp = selfmanagedkafkaeventproperties("Topics")
462463
ProvisionedPollerConfig: Optional[PassThroughProp]
464+
SchemaRegistryConfig: Optional[PassThroughProp]
463465

464466

465467
class SelfManagedKafkaEvent(BaseModel):

0 commit comments

Comments
 (0)