1
1
import copy
2
2
import re
3
+ from typing import Any , Dict , Optional
3
4
4
5
from samtranslator .metrics .method_decorator import cw_timer
5
6
from samtranslator .model import ResourceMacro , PropertyType
@@ -451,6 +452,7 @@ class SNS(PushEventSource):
451
452
"Region" : PropertyType (False , is_str ()),
452
453
"FilterPolicy" : PropertyType (False , dict_of (is_str (), list_of (one_of (is_str (), is_type (dict ))))),
453
454
"SqsSubscription" : PropertyType (False , one_of (is_type (bool ), is_type (dict ))),
455
+ "RedrivePolicy" : PropertyType (False , is_type (dict )),
454
456
}
455
457
456
458
@cw_timer (prefix = FUNCTION_EVETSOURCE_METRIC_PREFIX ) # type: ignore[no-untyped-call]
@@ -469,12 +471,13 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
469
471
470
472
# SNS -> Lambda
471
473
if not self .SqsSubscription : # type: ignore[attr-defined]
472
- subscription = self ._inject_subscription ( # type: ignore[no-untyped-call]
474
+ subscription = self ._inject_subscription (
473
475
"lambda" ,
474
476
function .get_runtime_attr ("arn" ),
475
477
self .Topic , # type: ignore[attr-defined]
476
478
self .Region , # type: ignore[attr-defined]
477
479
self .FilterPolicy , # type: ignore[attr-defined]
480
+ self .RedrivePolicy , # type: ignore[attr-defined]
478
481
function ,
479
482
)
480
483
return [self ._construct_permission (function , source_arn = self .Topic ), subscription ] # type: ignore[attr-defined, no-untyped-call]
@@ -487,8 +490,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
487
490
queue_url = queue .get_runtime_attr ("queue_url" )
488
491
489
492
queue_policy = self ._inject_sqs_queue_policy (self .Topic , queue_arn , queue_url , function ) # type: ignore[attr-defined, no-untyped-call]
490
- subscription = self ._inject_subscription ( # type: ignore[no-untyped-call]
491
- "sqs" , queue_arn , self .Topic , self .Region , self .FilterPolicy , function # type: ignore[attr-defined, attr-defined, attr-defined]
493
+ subscription = self ._inject_subscription (
494
+ "sqs" , queue_arn , self .Topic , self .Region , self .FilterPolicy , self . RedrivePolicy , function # type: ignore[attr-defined, attr-defined, attr-defined]
492
495
)
493
496
event_source = self ._inject_sqs_event_source_mapping (function , role , queue_arn ) # type: ignore[no-untyped-call]
494
497
@@ -512,15 +515,26 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
512
515
queue_policy = self ._inject_sqs_queue_policy ( # type: ignore[no-untyped-call]
513
516
self .Topic , queue_arn , queue_url , function , queue_policy_logical_id # type: ignore[attr-defined]
514
517
)
515
- subscription = self ._inject_subscription ("sqs" , queue_arn , self .Topic , self .Region , self .FilterPolicy , function ) # type: ignore[attr-defined, attr-defined, attr-defined, no-untyped-call]
518
+ subscription = self ._inject_subscription (
519
+ "sqs" , queue_arn , self .Topic , self .Region , self .FilterPolicy , self .RedrivePolicy , function # type: ignore[attr-defined, attr-defined, attr-defined]
520
+ )
516
521
event_source = self ._inject_sqs_event_source_mapping (function , role , queue_arn , batch_size , enabled ) # type: ignore[no-untyped-call]
517
522
518
523
resources = resources + event_source
519
524
resources .append (queue_policy )
520
525
resources .append (subscription )
521
526
return resources
522
527
523
- def _inject_subscription (self , protocol , endpoint , topic , region , filterPolicy , function ): # type: ignore[no-untyped-def]
528
+ def _inject_subscription (
529
+ self ,
530
+ protocol : str ,
531
+ endpoint : str ,
532
+ topic : str ,
533
+ region : Optional [str ],
534
+ filterPolicy : Optional [Dict [str , Any ]],
535
+ redrivePolicy : Optional [Dict [str , Any ]],
536
+ function : Any ,
537
+ ) -> SNSSubscription :
524
538
subscription = SNSSubscription (self .logical_id , attributes = function .get_passthrough_resource_attributes ())
525
539
subscription .Protocol = protocol
526
540
subscription .Endpoint = endpoint
@@ -532,6 +546,9 @@ def _inject_subscription(self, protocol, endpoint, topic, region, filterPolicy,
532
546
if filterPolicy is not None :
533
547
subscription .FilterPolicy = filterPolicy
534
548
549
+ if redrivePolicy is not None :
550
+ subscription .RedrivePolicy = redrivePolicy
551
+
535
552
return subscription
536
553
537
554
def _inject_sqs_queue (self , function ): # type: ignore[no-untyped-def]
0 commit comments