Skip to content

Commit 7cb53ab

Browse files
author
Connor Robertson
authored
fix: Kafka Auth Mechanism "SERVER_ROOT_CA_CERTIFICATE" and other Auth can't be used together (#3341)
1 parent 263fb5b commit 7cb53ab

5 files changed

+436
-5
lines changed

samtranslator/model/eventsources/pull.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABCMeta, abstractmethod
2-
from typing import Any, Dict, List, Optional
2+
from typing import Any, Dict, List, Optional, Tuple
33

44
from samtranslator.internal.deprecation_control import deprecated
55
from samtranslator.metrics.method_decorator import cw_timer
@@ -520,7 +520,6 @@ class SelfManagedKafka(PullEventSource):
520520
"SASL_SCRAM_512_AUTH",
521521
"BASIC_AUTH",
522522
"CLIENT_CERTIFICATE_TLS_AUTH",
523-
"SERVER_ROOT_CA_CERTIFICATE",
524523
]
525524

526525
def get_event_source_arn(self) -> Optional[PassThrough]:
@@ -558,11 +557,15 @@ def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
558557

559558
def generate_policy_document(self, source_access_configurations: List[Any]): # type: ignore[no-untyped-def]
560559
statements = []
561-
authentication_uri, has_vpc_config = self.get_secret_key(source_access_configurations)
560+
authentication_uri, authentication_uri_2, has_vpc_config = self.get_secret_key(source_access_configurations)
562561
if authentication_uri:
563562
secret_manager = self.get_secret_manager_secret(authentication_uri) # type: ignore[no-untyped-call]
564563
statements.append(secret_manager)
565564

565+
if authentication_uri_2:
566+
secret_manager = self.get_secret_manager_secret(authentication_uri) # type: ignore[no-untyped-call]
567+
statements.append(secret_manager)
568+
566569
if has_vpc_config:
567570
vpc_permissions = self.get_vpc_permission()
568571
statements.append(vpc_permissions)
@@ -580,10 +583,11 @@ def generate_policy_document(self, source_access_configurations: List[Any]): #
580583
"PolicyName": "SelfManagedKafkaExecutionRolePolicy",
581584
}
582585

583-
def get_secret_key(self, source_access_configurations: List[Any]): # type: ignore[no-untyped-def]
586+
def get_secret_key(self, source_access_configurations: List[Any]) -> Tuple[Optional[str], Optional[str], bool]:
584587
authentication_uri = None
585588
has_vpc_subnet = False
586589
has_vpc_security_group = False
590+
authentication_uri_2 = None
587591

588592
if not isinstance(source_access_configurations, list):
589593
raise InvalidEventException(
@@ -609,6 +613,10 @@ def get_secret_key(self, source_access_configurations: List[Any]): # type: igno
609613
self.validate_uri(config.get("URI"), "auth mechanism")
610614
authentication_uri = config.get("URI")
611615

616+
elif config.get("Type") == "SERVER_ROOT_CA_CERTIFICATE":
617+
self.validate_uri(config.get("URI"), "SERVER_ROOT_CA_CERTIFICATE")
618+
authentication_uri_2 = config.get("URI")
619+
612620
else:
613621
raise InvalidEventException(
614622
self.relative_id,
@@ -620,7 +628,7 @@ def get_secret_key(self, source_access_configurations: List[Any]): # type: igno
620628
self.relative_id,
621629
"VPC_SUBNET and VPC_SECURITY_GROUP in SourceAccessConfigurations for SelfManagedKafka must be both provided.",
622630
)
623-
return authentication_uri, (has_vpc_subnet and has_vpc_security_group)
631+
return authentication_uri, authentication_uri_2, (has_vpc_subnet and has_vpc_security_group)
624632

625633
def validate_uri(self, uri: Optional[Any], msg: str) -> None:
626634
if not uri:
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Parameters: {}
3+
Resources:
4+
KafkaFunction:
5+
Type: AWS::Serverless::Function
6+
Properties:
7+
CodeUri: s3://sam-demo-bucket/kafka.zip
8+
Handler: index.kafka_handler
9+
Runtime: python3.9
10+
Events:
11+
MyKafkaCluster:
12+
Type: SelfManagedKafka
13+
Properties:
14+
KafkaBootstrapServers:
15+
- abc.xyz.com:9092
16+
- 123.45.67.89:9096
17+
Topics:
18+
- Topic1
19+
SourceAccessConfigurations:
20+
- Type: SASL_SCRAM_512_AUTH
21+
URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c
22+
- Type: SERVER_ROOT_CA_CERTIFICATE
23+
URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:test-root-certificate-abcdef
24+
- Type: VPC_SUBNET
25+
URI: subnet:subnet-12345
26+
- Type: VPC_SECURITY_GROUP
27+
URI: security_group:sg-67890
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
{
2+
"AWSTemplateFormatVersion": "2010-09-09",
3+
"Parameters": {},
4+
"Resources": {
5+
"KafkaFunction": {
6+
"Properties": {
7+
"Code": {
8+
"S3Bucket": "sam-demo-bucket",
9+
"S3Key": "kafka.zip"
10+
},
11+
"Handler": "index.kafka_handler",
12+
"Role": {
13+
"Fn::GetAtt": [
14+
"KafkaFunctionRole",
15+
"Arn"
16+
]
17+
},
18+
"Runtime": "python3.9",
19+
"Tags": [
20+
{
21+
"Key": "lambda:createdBy",
22+
"Value": "SAM"
23+
}
24+
]
25+
},
26+
"Type": "AWS::Lambda::Function"
27+
},
28+
"KafkaFunctionMyKafkaCluster": {
29+
"Properties": {
30+
"FunctionName": {
31+
"Ref": "KafkaFunction"
32+
},
33+
"SelfManagedEventSource": {
34+
"Endpoints": {
35+
"KafkaBootstrapServers": [
36+
"abc.xyz.com:9092",
37+
"123.45.67.89:9096"
38+
]
39+
}
40+
},
41+
"SourceAccessConfigurations": [
42+
{
43+
"Type": "SASL_SCRAM_512_AUTH",
44+
"URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
45+
},
46+
{
47+
"Type": "SERVER_ROOT_CA_CERTIFICATE",
48+
"URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:test-root-certificate-abcdef"
49+
},
50+
{
51+
"Type": "VPC_SUBNET",
52+
"URI": "subnet:subnet-12345"
53+
},
54+
{
55+
"Type": "VPC_SECURITY_GROUP",
56+
"URI": "security_group:sg-67890"
57+
}
58+
],
59+
"Topics": [
60+
"Topic1"
61+
]
62+
},
63+
"Type": "AWS::Lambda::EventSourceMapping"
64+
},
65+
"KafkaFunctionRole": {
66+
"Properties": {
67+
"AssumeRolePolicyDocument": {
68+
"Statement": [
69+
{
70+
"Action": [
71+
"sts:AssumeRole"
72+
],
73+
"Effect": "Allow",
74+
"Principal": {
75+
"Service": [
76+
"lambda.amazonaws.com"
77+
]
78+
}
79+
}
80+
],
81+
"Version": "2012-10-17"
82+
},
83+
"ManagedPolicyArns": [
84+
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
85+
],
86+
"Policies": [
87+
{
88+
"PolicyDocument": {
89+
"Statement": [
90+
{
91+
"Action": [
92+
"secretsmanager:GetSecretValue"
93+
],
94+
"Effect": "Allow",
95+
"Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
96+
},
97+
{
98+
"Action": [
99+
"secretsmanager:GetSecretValue"
100+
],
101+
"Effect": "Allow",
102+
"Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
103+
},
104+
{
105+
"Action": [
106+
"ec2:CreateNetworkInterface",
107+
"ec2:DescribeNetworkInterfaces",
108+
"ec2:DeleteNetworkInterface",
109+
"ec2:DescribeVpcs",
110+
"ec2:DescribeSubnets",
111+
"ec2:DescribeSecurityGroups"
112+
],
113+
"Effect": "Allow",
114+
"Resource": "*"
115+
}
116+
],
117+
"Version": "2012-10-17"
118+
},
119+
"PolicyName": "SelfManagedKafkaExecutionRolePolicy"
120+
}
121+
],
122+
"Tags": [
123+
{
124+
"Key": "lambda:createdBy",
125+
"Value": "SAM"
126+
}
127+
]
128+
},
129+
"Type": "AWS::IAM::Role"
130+
}
131+
}
132+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
{
2+
"AWSTemplateFormatVersion": "2010-09-09",
3+
"Parameters": {},
4+
"Resources": {
5+
"KafkaFunction": {
6+
"Properties": {
7+
"Code": {
8+
"S3Bucket": "sam-demo-bucket",
9+
"S3Key": "kafka.zip"
10+
},
11+
"Handler": "index.kafka_handler",
12+
"Role": {
13+
"Fn::GetAtt": [
14+
"KafkaFunctionRole",
15+
"Arn"
16+
]
17+
},
18+
"Runtime": "python3.9",
19+
"Tags": [
20+
{
21+
"Key": "lambda:createdBy",
22+
"Value": "SAM"
23+
}
24+
]
25+
},
26+
"Type": "AWS::Lambda::Function"
27+
},
28+
"KafkaFunctionMyKafkaCluster": {
29+
"Properties": {
30+
"FunctionName": {
31+
"Ref": "KafkaFunction"
32+
},
33+
"SelfManagedEventSource": {
34+
"Endpoints": {
35+
"KafkaBootstrapServers": [
36+
"abc.xyz.com:9092",
37+
"123.45.67.89:9096"
38+
]
39+
}
40+
},
41+
"SourceAccessConfigurations": [
42+
{
43+
"Type": "SASL_SCRAM_512_AUTH",
44+
"URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
45+
},
46+
{
47+
"Type": "SERVER_ROOT_CA_CERTIFICATE",
48+
"URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:test-root-certificate-abcdef"
49+
},
50+
{
51+
"Type": "VPC_SUBNET",
52+
"URI": "subnet:subnet-12345"
53+
},
54+
{
55+
"Type": "VPC_SECURITY_GROUP",
56+
"URI": "security_group:sg-67890"
57+
}
58+
],
59+
"Topics": [
60+
"Topic1"
61+
]
62+
},
63+
"Type": "AWS::Lambda::EventSourceMapping"
64+
},
65+
"KafkaFunctionRole": {
66+
"Properties": {
67+
"AssumeRolePolicyDocument": {
68+
"Statement": [
69+
{
70+
"Action": [
71+
"sts:AssumeRole"
72+
],
73+
"Effect": "Allow",
74+
"Principal": {
75+
"Service": [
76+
"lambda.amazonaws.com"
77+
]
78+
}
79+
}
80+
],
81+
"Version": "2012-10-17"
82+
},
83+
"ManagedPolicyArns": [
84+
"arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
85+
],
86+
"Policies": [
87+
{
88+
"PolicyDocument": {
89+
"Statement": [
90+
{
91+
"Action": [
92+
"secretsmanager:GetSecretValue"
93+
],
94+
"Effect": "Allow",
95+
"Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
96+
},
97+
{
98+
"Action": [
99+
"secretsmanager:GetSecretValue"
100+
],
101+
"Effect": "Allow",
102+
"Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c"
103+
},
104+
{
105+
"Action": [
106+
"ec2:CreateNetworkInterface",
107+
"ec2:DescribeNetworkInterfaces",
108+
"ec2:DeleteNetworkInterface",
109+
"ec2:DescribeVpcs",
110+
"ec2:DescribeSubnets",
111+
"ec2:DescribeSecurityGroups"
112+
],
113+
"Effect": "Allow",
114+
"Resource": "*"
115+
}
116+
],
117+
"Version": "2012-10-17"
118+
},
119+
"PolicyName": "SelfManagedKafkaExecutionRolePolicy"
120+
}
121+
],
122+
"Tags": [
123+
{
124+
"Key": "lambda:createdBy",
125+
"Value": "SAM"
126+
}
127+
]
128+
},
129+
"Type": "AWS::IAM::Role"
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)