Skip to content

Commit bdfcc0a

Browse files
mbfrederaahung
andauthored
Added support for mTLS auth for MSK and Kafka (#2690)
Co-authored-by: _sam <[email protected]>
1 parent 28489f2 commit bdfcc0a

14 files changed

+846
-2
lines changed

samtranslator/model/eventsources/pull.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,28 @@ def get_policy_arn(self): # type: ignore[no-untyped-def]
291291
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole")
292292

293293
def get_policy_statements(self): # type: ignore[no-untyped-def]
294-
return None
294+
if self.SourceAccessConfigurations:
295+
for conf in self.SourceAccessConfigurations:
296+
# Lambda does not support multiple CLIENT_CERTIFICATE_TLS_AUTH configurations
297+
if isinstance(conf, dict) and conf.get("Type") == "CLIENT_CERTIFICATE_TLS_AUTH" and conf.get("URI"):
298+
return [
299+
{
300+
"PolicyName": "MSKExecutionRolePolicy",
301+
"PolicyDocument": {
302+
"Statement": [
303+
{
304+
"Action": [
305+
"secretsmanager:GetSecretValue",
306+
],
307+
"Effect": "Allow",
308+
"Resource": conf.get("URI"),
309+
}
310+
]
311+
},
312+
}
313+
]
314+
315+
return None
295316

296317

297318
class MQ(PullEventSource):
@@ -384,7 +405,13 @@ class SelfManagedKafka(PullEventSource):
384405

385406
resource_type = "SelfManagedKafka"
386407
requires_stream_queue_broker = False
387-
AUTH_MECHANISM = ["SASL_SCRAM_256_AUTH", "SASL_SCRAM_512_AUTH", "BASIC_AUTH"]
408+
AUTH_MECHANISM = [
409+
"SASL_SCRAM_256_AUTH",
410+
"SASL_SCRAM_512_AUTH",
411+
"BASIC_AUTH",
412+
"CLIENT_CERTIFICATE_TLS_AUTH",
413+
"SERVER_ROOT_CA_CERTIFICATE",
414+
]
388415

389416
def get_policy_arn(self): # type: ignore[no-untyped-def]
390417
return None

samtranslator/schema/aws_serverless_function.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ class MSKEventProperties(BaseModel):
358358
StartingPositionTimestamp: PassThrough # TODO: add documentation
359359
Stream: PassThrough = mskeventproperties("Stream")
360360
Topics: PassThrough = mskeventproperties("Topics")
361+
SourceAccessConfigurations: Optional[PassThrough] # TODO: update docs when live
361362

362363

363364
class MSKEvent(BaseModel):

samtranslator/schema/schema.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3472,6 +3472,9 @@
34723472
"title": "Topics",
34733473
"description": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
34743474
"markdownDescription": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\."
3475+
},
3476+
"SourceAccessConfigurations": {
3477+
"title": "Sourceaccessconfigurations"
34753478
}
34763479
},
34773480
"additionalProperties": false
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from unittest import TestCase
2+
from samtranslator.model.eventsources.pull import MSK
3+
4+
5+
class MSKEventSource(TestCase):
6+
def setUp(self):
7+
self.logical_id = "MSKEvent"
8+
self.kafka_event_source = MSK(self.logical_id)
9+
10+
def test_get_policy_arn(self):
11+
arn = self.kafka_event_source.get_policy_arn()
12+
expected_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole"
13+
self.assertEqual(arn, expected_arn)
14+
15+
def test_get_policy_statements(self):
16+
self.kafka_event_source.SourceAccessConfigurations = [
17+
{"Type": "CLIENT_CERTIFICATE_TLS_AUTH", "URI": "SECRET_URI"},
18+
]
19+
20+
policy_statements = self.kafka_event_source.get_policy_statements()
21+
expected_policy_document = [
22+
{
23+
"PolicyName": "MSKExecutionRolePolicy",
24+
"PolicyDocument": {
25+
"Statement": [
26+
{
27+
"Action": [
28+
"secretsmanager:GetSecretValue",
29+
],
30+
"Effect": "Allow",
31+
"Resource": "SECRET_URI",
32+
}
33+
]
34+
},
35+
}
36+
]
37+
38+
self.assertEqual(policy_statements, expected_policy_document)
39+
40+
def test_get_policy_statements_with_no_auth_mechanism(self):
41+
self.kafka_event_source.SourceAccessConfigurations = []
42+
43+
policy_statements = self.kafka_event_source.get_policy_statements()
44+
expected_policy_document = None
45+
46+
self.assertEqual(policy_statements, expected_policy_document)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Parameters: {}
3+
4+
Resources:
5+
MyMskStreamProcessor:
6+
Type: AWS::Serverless::Function
7+
Properties:
8+
Runtime: nodejs12.x
9+
Handler: index.handler
10+
CodeUri: s3://sam-demo-bucket/kafka.zip
11+
Events:
12+
MyMskEvent:
13+
Type: MSK
14+
Properties:
15+
StartingPosition: LATEST
16+
Stream: !Sub arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2
17+
Topics:
18+
- MyDummyTestTopic
19+
ConsumerGroupId: consumergroup1
20+
SourceAccessConfigurations: This should be a list
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Parameters: {}
3+
4+
Resources:
5+
MyMskStreamProcessor:
6+
Type: AWS::Serverless::Function
7+
Properties:
8+
Runtime: nodejs12.x
9+
Handler: index.handler
10+
CodeUri: s3://sam-demo-bucket/kafka.zip
11+
Events:
12+
MyMskEvent:
13+
Type: MSK
14+
Properties:
15+
StartingPosition: LATEST
16+
Stream: !Sub arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2
17+
Topics:
18+
- MyDummyTestTopic
19+
ConsumerGroupId: consumergroup1
20+
SourceAccessConfigurations:
21+
- Type: CLIENT_CERTIFICATE_TLS_AUTH
22+
URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Parameters: {}
3+
Resources:
4+
KafkaFunction:
5+
Type: AWS::Serverless::Function
6+
Properties:
7+
CodeUri: s3://sam-demo-bucket/kafka.zip
8+
Handler: index.kafka_handler
9+
Runtime: python3.9
10+
Events:
11+
MyKafkaCluster:
12+
Type: SelfManagedKafka
13+
Properties:
14+
KafkaBootstrapServers:
15+
- abc.xyz.com:9092
16+
- 123.45.67.89:9096
17+
Topics:
18+
- Topic1
19+
SourceAccessConfigurations:
20+
- Type: CLIENT_CERTIFICATE_TLS_AUTH
21+
URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
22+
- Type: VPC_SUBNET
23+
URI: subnet:subnet-12345
24+
- Type: VPC_SECURITY_GROUP
25+
URI: security_group:sg-67890
26+
ConsumerGroupId: consumergroup1
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
{
2+
"AWSTemplateFormatVersion": "2010-09-09",
3+
"Parameters": {},
4+
"Resources": {
5+
"MyMskStreamProcessor": {
6+
"Properties": {
7+
"Code": {
8+
"S3Bucket": "sam-demo-bucket",
9+
"S3Key": "kafka.zip"
10+
},
11+
"Handler": "index.handler",
12+
"Role": {
13+
"Fn::GetAtt": [
14+
"MyMskStreamProcessorRole",
15+
"Arn"
16+
]
17+
},
18+
"Runtime": "nodejs12.x",
19+
"Tags": [
20+
{
21+
"Key": "lambda:createdBy",
22+
"Value": "SAM"
23+
}
24+
]
25+
},
26+
"Type": "AWS::Lambda::Function"
27+
},
28+
"MyMskStreamProcessorMyMskEvent": {
29+
"Properties": {
30+
"AmazonManagedKafkaEventSourceConfig": {
31+
"ConsumerGroupId": "consumergroup1"
32+
},
33+
"EventSourceArn": {
34+
"Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2"
35+
},
36+
"FunctionName": {
37+
"Ref": "MyMskStreamProcessor"
38+
},
39+
"SourceAccessConfigurations": [
40+
{
41+
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",
42+
"URI": {
43+
"Fn::Sub": "arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
44+
}
45+
}
46+
],
47+
"StartingPosition": "LATEST",
48+
"Topics": [
49+
"MyDummyTestTopic"
50+
]
51+
},
52+
"Type": "AWS::Lambda::EventSourceMapping"
53+
},
54+
"MyMskStreamProcessorRole": {
55+
"Properties": {
56+
"AssumeRolePolicyDocument": {
57+
"Statement": [
58+
{
59+
"Action": [
60+
"sts:AssumeRole"
61+
],
62+
"Effect": "Allow",
63+
"Principal": {
64+
"Service": [
65+
"lambda.amazonaws.com"
66+
]
67+
}
68+
}
69+
],
70+
"Version": "2012-10-17"
71+
},
72+
"ManagedPolicyArns": [
73+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
74+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole"
75+
],
76+
"Policies": [
77+
{
78+
"PolicyDocument": {
79+
"Statement": [
80+
{
81+
"Action": [
82+
"secretsmanager:GetSecretValue"
83+
],
84+
"Effect": "Allow",
85+
"Resource": {
86+
"Fn::Sub": "arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
87+
}
88+
}
89+
]
90+
},
91+
"PolicyName": "MSKExecutionRolePolicy"
92+
}
93+
],
94+
"Tags": [
95+
{
96+
"Key": "lambda:createdBy",
97+
"Value": "SAM"
98+
}
99+
]
100+
},
101+
"Type": "AWS::IAM::Role"
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)