Skip to content

Commit 5324ec0

Browse files
authored
chore: Move some PullEventSource validations to subclasses (#2767)
1 parent 134ecf9 commit 5324ec0

File tree

5 files changed

+84
-63
lines changed

5 files changed

+84
-63
lines changed

samtranslator/model/eventsources/pull.py

Lines changed: 80 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from abc import ABCMeta, abstractmethod
12
from typing import Any, Dict, List, Optional
23

34
from samtranslator.metrics.method_decorator import cw_timer
@@ -15,7 +16,7 @@
1516
from samtranslator.validator.value_validator import sam_expect
1617

1718

18-
class PullEventSource(ResourceMacro):
19+
class PullEventSource(ResourceMacro, metaclass=ABCMeta):
1920
"""Base class for pull event sources for SAM Functions.
2021
2122
The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues and SQS Queues. All of these correspond to an
@@ -32,11 +33,8 @@ class PullEventSource(ResourceMacro):
3233
# line to avoid any potential behavior change.
3334
# TODO: Make `PullEventSource` an abstract class and not giving `resource_type` initial value.
3435
resource_type: str = None # type: ignore
35-
requires_stream_queue_broker = True
3636
relative_id: str # overriding the Optional[str]: for event, relative id is not None
37-
property_types = {
38-
"Stream": PropertyType(False, IS_STR),
39-
"Queue": PropertyType(False, IS_STR),
37+
property_types: Dict[str, PropertyType] = {
4038
"BatchSize": PropertyType(False, is_type(int)),
4139
"StartingPosition": PassThroughProperty(False),
4240
"StartingPositionTimestamp": PassThroughProperty(False),
@@ -48,7 +46,6 @@ class PullEventSource(ResourceMacro):
4846
"DestinationConfig": PropertyType(False, IS_DICT),
4947
"ParallelizationFactor": PropertyType(False, is_type(int)),
5048
"Topics": PropertyType(False, is_type(list)),
51-
"Broker": PropertyType(False, IS_STR),
5249
"Queues": PropertyType(False, is_type(list)),
5350
"SourceAccessConfigurations": PropertyType(False, is_type(list)),
5451
"SecretsManagerKmsKeyId": PropertyType(False, IS_STR),
@@ -59,8 +56,6 @@ class PullEventSource(ResourceMacro):
5956
"ConsumerGroupId": PropertyType(False, IS_STR),
6057
}
6158

62-
Stream: Optional[Intrinsicable[str]]
63-
Queue: Optional[Intrinsicable[str]]
6459
BatchSize: Optional[Intrinsicable[int]]
6560
StartingPosition: Optional[PassThrough]
6661
StartingPositionTimestamp: Optional[PassThrough]
@@ -72,7 +67,6 @@ class PullEventSource(ResourceMacro):
7267
DestinationConfig: Optional[Dict[str, Any]]
7368
ParallelizationFactor: Optional[Intrinsicable[int]]
7469
Topics: Optional[List[Any]]
75-
Broker: Optional[Intrinsicable[str]]
7670
Queues: Optional[List[Any]]
7771
SourceAccessConfigurations: Optional[List[Any]]
7872
SecretsManagerKmsKeyId: Optional[str]
@@ -82,11 +76,17 @@ class PullEventSource(ResourceMacro):
8276
FilterCriteria: Optional[Dict[str, Any]]
8377
ConsumerGroupId: Optional[Intrinsicable[str]]
8478

85-
def get_policy_arn(self): # type: ignore[no-untyped-def]
86-
raise NotImplementedError("Subclass must implement this method")
79+
@abstractmethod
80+
def get_policy_arn(self) -> Optional[str]:
81+
"""Policy to be added to the role (if a role applies)."""
8782

88-
def get_policy_statements(self): # type: ignore[no-untyped-def]
89-
raise NotImplementedError("Subclass must implement this method")
83+
@abstractmethod
84+
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
85+
"""Inline policy statements to be added to the role (if a role applies)."""
86+
87+
@abstractmethod
88+
def get_event_source_arn(self) -> Optional[PassThrough]:
89+
"""Return the value to assign to lambda event source mapping's EventSourceArn."""
9090

9191
@cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX)
9292
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
@@ -115,17 +115,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
115115
except NotImplementedError:
116116
function_name_or_arn = function.get_runtime_attr("arn")
117117

118-
if self.requires_stream_queue_broker and not self.Stream and not self.Queue and not self.Broker:
119-
raise InvalidEventException(
120-
self.relative_id,
121-
"No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
122-
)
123-
124-
if self.Stream and not self.StartingPosition:
125-
raise InvalidEventException(self.relative_id, "StartingPosition is required for Kinesis, DynamoDB and MSK.")
126-
127118
lambda_eventsourcemapping.FunctionName = function_name_or_arn
128-
lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue or self.Broker
119+
lambda_eventsourcemapping.EventSourceArn = self.get_event_source_arn()
129120
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
130121
lambda_eventsourcemapping.StartingPositionTimestamp = self.StartingPositionTimestamp
131122
lambda_eventsourcemapping.BatchSize = self.BatchSize
@@ -202,8 +193,8 @@ def _link_policy(self, role, destination_config_policy=None): # type: ignore[no
202193
203194
:param model.iam.IAMRole role: the execution role generated for the function
204195
"""
205-
policy_arn = self.get_policy_arn() # type: ignore[no-untyped-call]
206-
policy_statements = self.get_policy_statements() # type: ignore[no-untyped-call]
196+
policy_arn = self.get_policy_arn()
197+
policy_statements = self.get_policy_statements()
207198
if role is not None:
208199
if policy_arn is not None and policy_arn not in role.ManagedPolicyArns:
209200
role.ManagedPolicyArns.append(policy_arn)
@@ -250,47 +241,86 @@ class Kinesis(PullEventSource):
250241
"""Kinesis event source."""
251242

252243
resource_type = "Kinesis"
244+
property_types: Dict[str, PropertyType] = {
245+
**PullEventSource.property_types,
246+
"Stream": PassThroughProperty(True),
247+
"StartingPosition": PassThroughProperty(True),
248+
}
249+
250+
Stream: PassThrough
251+
252+
def get_event_source_arn(self) -> Optional[PassThrough]:
253+
return self.Stream
253254

254-
def get_policy_arn(self): # type: ignore[no-untyped-def]
255+
def get_policy_arn(self) -> Optional[str]:
255256
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole")
256257

257-
def get_policy_statements(self): # type: ignore[no-untyped-def]
258+
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
258259
return None
259260

260261

261262
class DynamoDB(PullEventSource):
262263
"""DynamoDB Streams event source."""
263264

264265
resource_type = "DynamoDB"
266+
property_types: Dict[str, PropertyType] = {
267+
**PullEventSource.property_types,
268+
"Stream": PassThroughProperty(True),
269+
"StartingPosition": PassThroughProperty(True),
270+
}
271+
272+
Stream: PassThrough
273+
274+
def get_event_source_arn(self) -> Optional[PassThrough]:
275+
return self.Stream
265276

266-
def get_policy_arn(self): # type: ignore[no-untyped-def]
277+
def get_policy_arn(self) -> Optional[str]:
267278
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole")
268279

269-
def get_policy_statements(self): # type: ignore[no-untyped-def]
280+
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
270281
return None
271282

272283

273284
class SQS(PullEventSource):
274285
"""SQS Queue event source."""
275286

276287
resource_type = "SQS"
288+
property_types: Dict[str, PropertyType] = {
289+
**PullEventSource.property_types,
290+
"Queue": PassThroughProperty(True),
291+
}
292+
293+
Queue: PassThrough
294+
295+
def get_event_source_arn(self) -> Optional[PassThrough]:
296+
return self.Queue
277297

278-
def get_policy_arn(self): # type: ignore[no-untyped-def]
298+
def get_policy_arn(self) -> Optional[str]:
279299
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole")
280300

281-
def get_policy_statements(self): # type: ignore[no-untyped-def]
301+
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
282302
return None
283303

284304

285305
class MSK(PullEventSource):
286306
"""MSK event source."""
287307

288308
resource_type = "MSK"
309+
property_types: Dict[str, PropertyType] = {
310+
**PullEventSource.property_types,
311+
"Stream": PassThroughProperty(True),
312+
"StartingPosition": PassThroughProperty(True),
313+
}
314+
315+
Stream: PassThrough
316+
317+
def get_event_source_arn(self) -> Optional[PassThrough]:
318+
return self.Stream
289319

290-
def get_policy_arn(self): # type: ignore[no-untyped-def]
320+
def get_policy_arn(self) -> Optional[str]:
291321
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole")
292322

293-
def get_policy_statements(self): # type: ignore[no-untyped-def]
323+
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
294324
if self.SourceAccessConfigurations:
295325
for conf in self.SourceAccessConfigurations:
296326
# Lambda does not support multiple CLIENT_CERTIFICATE_TLS_AUTH configurations
@@ -312,18 +342,27 @@ def get_policy_statements(self): # type: ignore[no-untyped-def]
312342
}
313343
]
314344

315-
return None
345+
return None
316346

317347

318348
class MQ(PullEventSource):
319349
"""MQ event source."""
320350

321351
resource_type = "MQ"
352+
property_types: Dict[str, PropertyType] = {
353+
**PullEventSource.property_types,
354+
"Broker": PassThroughProperty(True),
355+
}
356+
357+
Broker: PassThrough
322358

323-
def get_policy_arn(self): # type: ignore[no-untyped-def]
359+
def get_event_source_arn(self) -> Optional[PassThrough]:
360+
return self.Broker
361+
362+
def get_policy_arn(self) -> Optional[str]:
324363
return None
325364

326-
def get_policy_statements(self): # type: ignore[no-untyped-def]
365+
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
327366
if not self.SourceAccessConfigurations:
328367
raise InvalidEventException(
329368
self.relative_id,
@@ -404,7 +443,6 @@ class SelfManagedKafka(PullEventSource):
404443
"""
405444

406445
resource_type = "SelfManagedKafka"
407-
requires_stream_queue_broker = False
408446
AUTH_MECHANISM = [
409447
"SASL_SCRAM_256_AUTH",
410448
"SASL_SCRAM_512_AUTH",
@@ -413,10 +451,13 @@ class SelfManagedKafka(PullEventSource):
413451
"SERVER_ROOT_CA_CERTIFICATE",
414452
]
415453

416-
def get_policy_arn(self): # type: ignore[no-untyped-def]
454+
def get_event_source_arn(self) -> Optional[PassThrough]:
455+
return None
456+
457+
def get_policy_arn(self) -> Optional[str]:
417458
return None
418459

419-
def get_policy_statements(self): # type: ignore[no-untyped-def]
460+
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
420461
if not self.KafkaBootstrapServers:
421462
raise InvalidEventException(
422463
self.relative_id,
Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
11
{
2-
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MQFunction] is invalid. Event with id [MyMQQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
3-
"errors": [
4-
{
5-
"errorMessage": "Resource with id [MQFunction] is invalid. Event with id [MyMQQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided."
6-
}
7-
]
2+
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [MQFunctionMyMQQueue] is invalid. Missing required property 'Broker'."
83
}
Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
11
{
2-
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
3-
"errors": [
4-
{
5-
"errorMessage": "Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided."
6-
}
7-
]
2+
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [SQSFunctionMySqsQueue] is invalid. Missing required property 'Queue'."
83
}
Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
11
{
2-
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [KinesisFunction] is invalid. Event with id [MyKinesisStream] is invalid. StartingPosition is required for Kinesis, DynamoDB and MSK.",
3-
"errors": [
4-
{
5-
"errorMessage": "Resource with id [KinesisFunction] is invalid. Event with id [MyKinesisStream] is invalid. StartingPosition is required for Kinesis, DynamoDB and MSK."
6-
}
7-
]
2+
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [KinesisFunctionMyKinesisStream] is invalid. Missing required property 'StartingPosition'."
83
}
Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
11
{
2-
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
3-
"errors": [
4-
{
5-
"errorMessage": "Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided."
6-
}
7-
]
2+
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [DynamoDBFunctionMyDDBStream] is invalid. Missing required property 'Stream'."
83
}

0 commit comments

Comments
 (0)