Skip to content

Commit c6ea664

Browse files
authored
feat(lambda): support for schema registry for kafka (#34746)
### Reason for this change Lambda is introducing a new property in Event Sources named `SchemaRegistryConfig` in `SelfManagedKafkaEventSourceConfig` and `AmazonManagedKafkaEventSourceConfig` to set configuration settings for a schema registry that will be used to de-serialize the event read from these Kafka event sources. When specified, it allows de-serialization events before they are passed to target function and validation of their format. The users may use a Confluent registry, a self managed registry or AWS Glue Registry. Note, the even source mapping must have `ProvisionedPollerConfig` set (be on provisioned mode) for this feature to be used. This feature is currently supported for MSK and Self-managed Kafka event sources. ### Description of changes This new property can be opted in by setting `SchemaRegistryConfig` in `SelfManagedKafkaEventSourceConfig` or `AmazonManagedKafkaEventSourceConfig`. An example is shown bellow: ``` myFunction.addEventSource(new ManagedKafkaEventSource({ clusterArn, topic, startingPosition: lambda.StartingPosition.TRIM_HORIZON, provisionedPollerConfig: { minimumPollers: 1, maximumPollers: 3, }, schemaRegistryConfig: { schemaRegistryUri: 'https://example.com', eventRecordFormat: lambda.EventRecordFormat.JSON, accessConfigs: [ { type: lambda.SchemaRegistryAccessConfigType.BASIC_AUTH, uri: 'https://example.com', }, ], schemaValidationConfigs: [{ attribute: lambda.SchemaValidationAttribute.KEY }], }, })); ``` ### Describe any new or updated permissions being added Following IAM permissions will be added to the target function execution role **only if user passed a Glue registry**. ``` { Action: 'glue:GetRegistry', Effect: 'Allow', Resource: { 'Fn::GetAtt': ['Registry', 'Arn'], // Glue registry ARN }, }, { Action: [ 'glue:GetSchemaVersion', 'glue:GetSchema', ], Effect: 'Allow', Resource: [ { 'Fn::GetAtt': ['Registry', 'Arn'], }, 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/lambda-gp-test-glue-schema-registry/*', ], }, ``` ### Description of how you validated changes Unit tests for each case have been added in the PR. Note, MSK and SMK validations follow the same path so for validations there are only unit tests for MSK cases which should apply for both. Integration test for both Glue and confluent case have been added for SMK. Since, MSK requires a Kafka cluster in VPC that we typically do not add integration tests for it. ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 817a21a commit c6ea664

20 files changed

+2460
-4
lines changed

packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-schema-registry.js.snapshot/SchemaRegistryIntegDefaultTestDeployAssertC190A9FD.assets.json

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-schema-registry.js.snapshot/SchemaRegistryIntegDefaultTestDeployAssertC190A9FD.template.json

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-schema-registry.js.snapshot/cdk.out

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-schema-registry.js.snapshot/integ.json

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-schema-registry.js.snapshot/lambda-event-source-confluent-schema-registry.assets.json

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
{
2+
"Resources": {
3+
"ConfluentFunctionServiceRole116DBB1D": {
4+
"Type": "AWS::IAM::Role",
5+
"Properties": {
6+
"AssumeRolePolicyDocument": {
7+
"Statement": [
8+
{
9+
"Action": "sts:AssumeRole",
10+
"Effect": "Allow",
11+
"Principal": {
12+
"Service": "lambda.amazonaws.com"
13+
}
14+
}
15+
],
16+
"Version": "2012-10-17"
17+
},
18+
"ManagedPolicyArns": [
19+
{
20+
"Fn::Join": [
21+
"",
22+
[
23+
"arn:",
24+
{
25+
"Ref": "AWS::Partition"
26+
},
27+
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
28+
]
29+
]
30+
}
31+
]
32+
}
33+
},
34+
"ConfluentFunctionServiceRoleDefaultPolicy7BF57A6E": {
35+
"Type": "AWS::IAM::Policy",
36+
"Properties": {
37+
"PolicyDocument": {
38+
"Statement": [
39+
{
40+
"Action": [
41+
"secretsmanager:DescribeSecret",
42+
"secretsmanager:GetSecretValue"
43+
],
44+
"Effect": "Allow",
45+
"Resource": [
46+
{
47+
"Ref": "ConfluentClientCertSecret23688D94"
48+
},
49+
{
50+
"Ref": "ConfluentRootCASecret99CAE53B"
51+
}
52+
]
53+
}
54+
],
55+
"Version": "2012-10-17"
56+
},
57+
"PolicyName": "ConfluentFunctionServiceRoleDefaultPolicy7BF57A6E",
58+
"Roles": [
59+
{
60+
"Ref": "ConfluentFunctionServiceRole116DBB1D"
61+
}
62+
]
63+
}
64+
},
65+
"ConfluentFunction5D29A9A6": {
66+
"Type": "AWS::Lambda::Function",
67+
"Properties": {
68+
"Code": {
69+
"ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}"
70+
},
71+
"Handler": "index.handler",
72+
"Role": {
73+
"Fn::GetAtt": [
74+
"ConfluentFunctionServiceRole116DBB1D",
75+
"Arn"
76+
]
77+
},
78+
"Runtime": "nodejs18.x"
79+
},
80+
"DependsOn": [
81+
"ConfluentFunctionServiceRoleDefaultPolicy7BF57A6E",
82+
"ConfluentFunctionServiceRole116DBB1D"
83+
]
84+
},
85+
"ConfluentFunctionKafkaEventSource46eb851822d1a9f28d63dc4ec210952etesttopicsmkconfluent90EB60AA": {
86+
"Type": "AWS::Lambda::EventSourceMapping",
87+
"Properties": {
88+
"BatchSize": 100,
89+
"FunctionName": {
90+
"Ref": "ConfluentFunction5D29A9A6"
91+
},
92+
"ProvisionedPollerConfig": {
93+
"MaximumPollers": 3,
94+
"MinimumPollers": 1
95+
},
96+
"SelfManagedEventSource": {
97+
"Endpoints": {
98+
"KafkaBootstrapServers": [
99+
"kafka-broker-1:9092",
100+
"kafka-broker-2:9092",
101+
"kafka-broker-3:9092"
102+
]
103+
}
104+
},
105+
"SelfManagedKafkaEventSourceConfig": {
106+
"ConsumerGroupId": "test-consumer-group-smk-confluent",
107+
"SchemaRegistryConfig": {
108+
"AccessConfigs": [
109+
{
110+
"Type": "BASIC_AUTH",
111+
"URI": {
112+
"Ref": "ConfluentClientCertSecret23688D94"
113+
}
114+
}
115+
],
116+
"EventRecordFormat": "JSON",
117+
"SchemaRegistryURI": "https://schema-registry.example.com",
118+
"SchemaValidationConfigs": [
119+
{
120+
"Attribute": "KEY"
121+
}
122+
]
123+
}
124+
},
125+
"SourceAccessConfigurations": [
126+
{
127+
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",
128+
"URI": {
129+
"Ref": "ConfluentClientCertSecret23688D94"
130+
}
131+
},
132+
{
133+
"Type": "SERVER_ROOT_CA_CERTIFICATE",
134+
"URI": {
135+
"Ref": "ConfluentRootCASecret99CAE53B"
136+
}
137+
}
138+
],
139+
"StartingPosition": "TRIM_HORIZON",
140+
"Topics": [
141+
"test-topic-smk-confluent"
142+
]
143+
}
144+
},
145+
"ConfluentRootCASecret99CAE53B": {
146+
"Type": "AWS::SecretsManager::Secret",
147+
"Properties": {
148+
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\n MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\n cmUuiAii9R0=\\n -----END CERTIFICATE-----\\n -----BEGIN CERTIFICATE-----\\n MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\n c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n -----END CERTIFICATE-----\\\"\\n \"}"
149+
},
150+
"UpdateReplacePolicy": "Delete",
151+
"DeletionPolicy": "Delete"
152+
},
153+
"ConfluentClientCertSecret23688D94": {
154+
"Type": "AWS::SecretsManager::Secret",
155+
"Properties": {
156+
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\n MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\n cmUuiAii9R0=\\n -----END CERTIFICATE-----\\n -----BEGIN CERTIFICATE-----\\n MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\n c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n -----END CERTIFICATE-----\\\"\\n \",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\n zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n -----END ENCRYPTED PRIVATE KEY-----\"}"
157+
},
158+
"UpdateReplacePolicy": "Delete",
159+
"DeletionPolicy": "Delete"
160+
}
161+
},
162+
"Parameters": {
163+
"BootstrapVersion": {
164+
"Type": "AWS::SSM::Parameter::Value<String>",
165+
"Default": "/cdk-bootstrap/hnb659fds/version",
166+
"Description": "Version of the CDK Bootstrap resources in this environment, automatically retrieved from SSM Parameter Store. [cdk:skip]"
167+
}
168+
},
169+
"Rules": {
170+
"CheckBootstrapVersion": {
171+
"Assertions": [
172+
{
173+
"Assert": {
174+
"Fn::Not": [
175+
{
176+
"Fn::Contains": [
177+
[
178+
"1",
179+
"2",
180+
"3",
181+
"4",
182+
"5"
183+
],
184+
{
185+
"Ref": "BootstrapVersion"
186+
}
187+
]
188+
}
189+
]
190+
},
191+
"AssertDescription": "CDK bootstrap stack version 6 required. Please run 'cdk bootstrap' with a recent version of the CDK CLI."
192+
}
193+
]
194+
}
195+
}
196+
}

packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-schema-registry.js.snapshot/lambda-event-source-glue-schema-registry.assets.json

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)