1
1
from abc import ABCMeta , abstractmethod
2
2
from typing import Any , Dict , List , Optional
3
3
4
+ from samtranslator .internal .deprecation_control import deprecated
4
5
from samtranslator .metrics .method_decorator import cw_timer
5
6
from samtranslator .model import PassThroughProperty , PropertyType , ResourceMacro
6
7
from samtranslator .model .eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
17
18
class PullEventSource (ResourceMacro , metaclass = ABCMeta ):
18
19
"""Base class for pull event sources for SAM Functions.
19
20
20
- The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues and SQS Queues. All of these correspond to an
21
+ The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues, SQS Queues, and DocumentDB Clusters . All of these correspond to an
21
22
EventSourceMapping in Lambda, and require that the execution role be given to Kinesis Streams, DynamoDB
22
23
Streams, or SQS Queues, respectively.
23
24
24
25
:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source
25
26
"""
26
27
27
28
# Event types that support `FilterCriteria`, stored as a list to keep the alphabetical order
28
- RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DynamoDB" , "Kinesis" , "MQ" , "MSK" , "SelfManagedKafka" , "SQS" ]
29
+ RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DocumentDB" , " DynamoDB" , "Kinesis" , "MQ" , "MSK" , "SelfManagedKafka" , "SQS" ]
29
30
30
31
# Note(xinhol): `PullEventSource` should have been an abstract class. Disabling the type check for the next
31
32
# line to avoid any potential behavior change.
@@ -88,6 +89,14 @@ def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
88
89
def get_event_source_arn (self ) -> Optional [PassThrough ]:
89
90
"""Return the value to assign to lambda event source mapping's EventSourceArn."""
90
91
92
+ def add_extra_eventsourcemapping_fields (self , _lambda_eventsourcemapping : LambdaEventSourceMapping ) -> None :
93
+ """Adds extra fields to the CloudFormation ESM resource.
94
+ This method can be overriden by a subclass if it has extra fields specific to that subclass.
95
+
96
+ :param LambdaEventSourceMapping lambda_eventsourcemapping: the Event source mapping resource to add the fields to.
97
+ """
98
+ return
99
+
91
100
@cw_timer (prefix = FUNCTION_EVETSOURCE_METRIC_PREFIX )
92
101
def to_cloudformation (self , ** kwargs ): # type: ignore[no-untyped-def] # noqa: too-many-branches
93
102
"""Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
@@ -183,6 +192,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: t
183
192
184
193
lambda_eventsourcemapping .DestinationConfig = self .DestinationConfig
185
194
195
+ self .add_extra_eventsourcemapping_fields (lambda_eventsourcemapping )
196
+
186
197
if "role" in kwargs :
187
198
self ._link_policy (kwargs ["role" ], destination_config_policy ) # type: ignore[no-untyped-call]
188
199
@@ -232,13 +243,71 @@ def _validate_filter_criteria(self) -> None:
232
243
if list (self .FilterCriteria .keys ()) not in [[], ["Filters" ]]:
233
244
raise InvalidEventException (self .relative_id , "FilterCriteria field has a wrong format" )
234
245
235
- def validate_secrets_manager_kms_key_id (self ): # type: ignore[no-untyped-def]
236
- if self .SecretsManagerKmsKeyId and not isinstance (self .SecretsManagerKmsKeyId , str ):
246
+ def validate_secrets_manager_kms_key_id (self ) -> None :
247
+ if self .SecretsManagerKmsKeyId :
248
+ sam_expect (
249
+ self .SecretsManagerKmsKeyId , self .relative_id , "SecretsManagerKmsKeyId" , is_sam_event = True
250
+ ).to_be_a_string ()
251
+
252
+ def _validate_source_access_configurations (self , supported_types : List [str ], required_type : str ) -> str :
253
+ """
254
+ Validate the SourceAccessConfigurations parameter and return the URI to
255
+ be used for policy statement creation.
256
+ """
257
+
258
+ if not self .SourceAccessConfigurations :
237
259
raise InvalidEventException (
238
260
self .relative_id ,
239
- "Provided SecretsManagerKmsKeyId should be of type str." ,
261
+ f"No SourceAccessConfigurations for Amazon { self .resource_type } event provided." ,
262
+ )
263
+ if not isinstance (self .SourceAccessConfigurations , list ):
264
+ raise InvalidEventException (
265
+ self .relative_id ,
266
+ "Provided SourceAccessConfigurations cannot be parsed into a list." ,
240
267
)
241
268
269
+ required_type_uri : Optional [str ] = None
270
+ for index , conf in enumerate (self .SourceAccessConfigurations ):
271
+ sam_expect (conf , self .relative_id , f"SourceAccessConfigurations[{ index } ]" , is_sam_event = True ).to_be_a_map ()
272
+ event_type : str = sam_expect (
273
+ conf .get ("Type" ), self .relative_id , f"SourceAccessConfigurations[{ index } ].Type" , is_sam_event = True
274
+ ).to_be_a_string ()
275
+ if event_type not in supported_types :
276
+ raise InvalidEventException (
277
+ self .relative_id ,
278
+ f"Invalid property Type specified in SourceAccessConfigurations. The supported values are: { supported_types } ." ,
279
+ )
280
+ if event_type == required_type :
281
+ if required_type_uri :
282
+ raise InvalidEventException (
283
+ self .relative_id ,
284
+ f"Multiple { required_type } properties specified in SourceAccessConfigurations." ,
285
+ )
286
+ required_type_uri = conf .get ("URI" )
287
+ if not required_type_uri :
288
+ raise InvalidEventException (
289
+ self .relative_id ,
290
+ f"No { required_type } URI property specified in SourceAccessConfigurations." ,
291
+ )
292
+
293
+ if not required_type_uri :
294
+ raise InvalidEventException (
295
+ self .relative_id ,
296
+ f"No { required_type } property specified in SourceAccessConfigurations." ,
297
+ )
298
+ return required_type_uri
299
+
300
+ @staticmethod
301
+ def _get_kms_decrypt_policy (secrets_manager_kms_key_id : str ) -> Dict [str , Any ]:
302
+ return {
303
+ "Action" : ["kms:Decrypt" ],
304
+ "Effect" : "Allow" ,
305
+ "Resource" : {
306
+ "Fn::Sub" : "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
307
+ + secrets_manager_kms_key_id
308
+ },
309
+ }
310
+
242
311
243
312
class Kinesis (PullEventSource ):
244
313
"""Kinesis event source."""
@@ -366,45 +435,8 @@ def get_policy_arn(self) -> Optional[str]:
366
435
return None
367
436
368
437
def get_policy_statements (self ) -> Optional [List [Dict [str , Any ]]]:
369
- if not self .SourceAccessConfigurations :
370
- raise InvalidEventException (
371
- self .relative_id ,
372
- "No SourceAccessConfigurations for Amazon MQ event provided." ,
373
- )
374
- if not isinstance (self .SourceAccessConfigurations , list ):
375
- raise InvalidEventException (
376
- self .relative_id ,
377
- "Provided SourceAccessConfigurations cannot be parsed into a list." ,
378
- )
379
- basic_auth_uri = None
380
- for index , conf in enumerate (self .SourceAccessConfigurations ):
381
- sam_expect (conf , self .relative_id , f"SourceAccessConfigurations[{ index } ]" , is_sam_event = True ).to_be_a_map ()
382
- event_type : str = sam_expect (
383
- conf .get ("Type" ), self .relative_id , f"SourceAccessConfigurations[{ index } ].Type" , is_sam_event = True
384
- ).to_be_a_string ()
385
- if event_type not in ("BASIC_AUTH" , "VIRTUAL_HOST" ):
386
- raise InvalidEventException (
387
- self .relative_id ,
388
- "Invalid property specified in SourceAccessConfigurations for Amazon MQ event." ,
389
- )
390
- if event_type == "BASIC_AUTH" :
391
- if basic_auth_uri :
392
- raise InvalidEventException (
393
- self .relative_id ,
394
- "Multiple BASIC_AUTH properties specified in SourceAccessConfigurations for Amazon MQ event." ,
395
- )
396
- basic_auth_uri = conf .get ("URI" )
397
- if not basic_auth_uri :
398
- raise InvalidEventException (
399
- self .relative_id ,
400
- "No BASIC_AUTH URI property specified in SourceAccessConfigurations for Amazon MQ event." ,
401
- )
438
+ basic_auth_uri = self ._validate_source_access_configurations (["BASIC_AUTH" , "VIRTUAL_HOST" ], "BASIC_AUTH" )
402
439
403
- if not basic_auth_uri :
404
- raise InvalidEventException (
405
- self .relative_id ,
406
- "No BASIC_AUTH property specified in SourceAccessConfigurations for Amazon MQ event." ,
407
- )
408
440
document = {
409
441
"PolicyName" : "SamAutoGeneratedAMQPolicy" ,
410
442
"PolicyDocument" : {
@@ -427,7 +459,7 @@ def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
427
459
},
428
460
}
429
461
if self .SecretsManagerKmsKeyId :
430
- self .validate_secrets_manager_kms_key_id () # type: ignore[no-untyped-call]
462
+ self .validate_secrets_manager_kms_key_id ()
431
463
kms_policy = {
432
464
"Action" : "kms:Decrypt" ,
433
465
"Effect" : "Allow" ,
@@ -499,8 +531,8 @@ def generate_policy_document(self, source_access_configurations: List[Any]): #
499
531
statements .append (vpc_permissions )
500
532
501
533
if self .SecretsManagerKmsKeyId :
502
- self .validate_secrets_manager_kms_key_id () # type: ignore[no-untyped-call]
503
- kms_policy = self .get_kms_policy (self .SecretsManagerKmsKeyId )
534
+ self .validate_secrets_manager_kms_key_id ()
535
+ kms_policy = self ._get_kms_decrypt_policy (self .SecretsManagerKmsKeyId )
504
536
statements .append (kms_policy )
505
537
506
538
return {
@@ -590,6 +622,7 @@ def get_vpc_permission(self) -> Dict[str, Any]:
590
622
}
591
623
592
624
@staticmethod
625
+ @deprecated (None )
593
626
def get_kms_policy (secrets_manager_kms_key_id : str ) -> Dict [str , Any ]:
594
627
return {
595
628
"Action" : ["kms:Decrypt" ],
@@ -599,3 +632,94 @@ def get_kms_policy(secrets_manager_kms_key_id: str) -> Dict[str, Any]:
599
632
+ secrets_manager_kms_key_id
600
633
},
601
634
}
635
+
636
+
637
+ class DocumentDB (PullEventSource ):
638
+ """DocumentDB event source."""
639
+
640
+ resource_type = "DocumentDB"
641
+ property_types : Dict [str , PropertyType ] = {
642
+ ** PullEventSource .property_types ,
643
+ "Cluster" : PassThroughProperty (True ),
644
+ "DatabaseName" : PassThroughProperty (True ),
645
+ "CollectionName" : PassThroughProperty (False ),
646
+ "FullDocument" : PassThroughProperty (False ),
647
+ }
648
+
649
+ Cluster : PassThrough
650
+ DatabaseName : PassThrough
651
+ CollectionName : Optional [PassThrough ]
652
+ FullDocument : Optional [PassThrough ]
653
+
654
+ def add_extra_eventsourcemapping_fields (self , lambda_eventsourcemapping : LambdaEventSourceMapping ) -> None :
655
+ lambda_eventsourcemapping .DocumentDBEventSourceConfig = {
656
+ "DatabaseName" : self .DatabaseName ,
657
+ }
658
+ if self .CollectionName :
659
+ lambda_eventsourcemapping .DocumentDBEventSourceConfig ["CollectionName" ] = self .CollectionName # type: ignore[attr-defined]
660
+ if self .FullDocument :
661
+ lambda_eventsourcemapping .DocumentDBEventSourceConfig ["FullDocument" ] = self .FullDocument # type: ignore[attr-defined]
662
+
663
+ def get_event_source_arn (self ) -> Optional [PassThrough ]:
664
+ return self .Cluster
665
+
666
+ def get_policy_arn (self ) -> Optional [str ]:
667
+ return None
668
+
669
+ def get_policy_statements (self ) -> List [Dict [str , Any ]]:
670
+ basic_auth_uri = self ._validate_source_access_configurations (["BASIC_AUTH" ], "BASIC_AUTH" )
671
+
672
+ statements = [
673
+ {
674
+ "Action" : [
675
+ "secretsmanager:GetSecretValue" ,
676
+ ],
677
+ "Effect" : "Allow" ,
678
+ "Resource" : basic_auth_uri ,
679
+ },
680
+ {
681
+ "Action" : [
682
+ "rds:DescribeDBClusterParameters" ,
683
+ ],
684
+ "Effect" : "Allow" ,
685
+ "Resource" : {"Fn::Sub" : "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster-pg:*" },
686
+ },
687
+ {
688
+ "Action" : [
689
+ "rds:DescribeDBSubnetGroups" ,
690
+ ],
691
+ "Effect" : "Allow" ,
692
+ "Resource" : {"Fn::Sub" : "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:subgrp:*" },
693
+ },
694
+ {
695
+ "Action" : [
696
+ "rds:DescribeDBClusters" ,
697
+ ],
698
+ "Effect" : "Allow" ,
699
+ "Resource" : self .Cluster ,
700
+ },
701
+ {
702
+ "Action" : [
703
+ "ec2:CreateNetworkInterface" ,
704
+ "ec2:DescribeNetworkInterfaces" ,
705
+ "ec2:DeleteNetworkInterface" ,
706
+ "ec2:DescribeVpcs" ,
707
+ "ec2:DescribeSubnets" ,
708
+ "ec2:DescribeSecurityGroups" ,
709
+ ],
710
+ "Effect" : "Allow" ,
711
+ "Resource" : "*" ,
712
+ },
713
+ ]
714
+
715
+ if self .SecretsManagerKmsKeyId :
716
+ self .validate_secrets_manager_kms_key_id ()
717
+ kms_policy = self ._get_kms_decrypt_policy (self .SecretsManagerKmsKeyId )
718
+ statements .append (kms_policy )
719
+
720
+ document = {
721
+ "PolicyName" : "SamAutoGeneratedDocumentDBPolicy" ,
722
+ "PolicyDocument" : {"Statement" : statements },
723
+ }
724
+
725
+ return [document ]
0 commit comments