|
| 1 | +from enum import Enum, auto |
| 2 | +from typing import Any, Dict, List, Optional, Tuple, Union, cast |
| 3 | + |
| 4 | +from samtranslator.metrics.method_decorator import cw_timer |
| 5 | +from samtranslator.model import PropertyType, Resource, ResourceMacro |
| 6 | +from samtranslator.model.iam import IAMRole |
| 7 | +from samtranslator.model.sqs import SQSQueue |
| 8 | +from samtranslator.model.types import is_str, is_type |
| 9 | +from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX |
| 10 | +from samtranslator.model.eventbridge_utils import EventBridgeRuleUtils |
| 11 | +from samtranslator.model.exceptions import InvalidEventException |
| 12 | +from samtranslator.model.iam import IAMRolePolicies |
| 13 | +from samtranslator.model.scheduler import SchedulerSchedule |
| 14 | +from samtranslator.translator.logical_id_generator import LogicalIdGenerator |
| 15 | + |
| 16 | + |
| 17 | +class _SchedulerScheduleTargetType(Enum): |
| 18 | + FUNCTION = auto() |
| 19 | + STATE_MACHINE = auto() |
| 20 | + |
| 21 | + |
| 22 | +class SchedulerEventSource(ResourceMacro): |
| 23 | + """ |
| 24 | + Scheduler event source for SAM Functions and SAM State Machine. |
| 25 | +
|
| 26 | + It will translate into an "AWS::Scheduler::Schedule." |
| 27 | + Because a Scheduler Schedule resource requires an execution role, |
| 28 | + this macro will also create an IAM role with permissions to invoke |
| 29 | + the function/state machine. |
| 30 | + """ |
| 31 | + |
| 32 | + resource_type = "ScheduleV2" |
| 33 | + |
| 34 | + # As the first version, the properties of Scheduler schedule event will be the |
| 35 | + # same as the original "Schedule" event. |
| 36 | + # See class "Schedule" in samtranslator.model.eventsources.push and samtranslator.model.stepfunctions.events. |
| 37 | + property_types = { |
| 38 | + "PermissionsBoundary": PropertyType(False, is_str()), |
| 39 | + "ScheduleExpression": PropertyType(True, is_str()), |
| 40 | + "FlexibleTimeWindow": PropertyType(False, is_type(dict)), |
| 41 | + "Name": PropertyType(False, is_str()), |
| 42 | + "State": PropertyType(False, is_str()), |
| 43 | + "Description": PropertyType(False, is_str()), |
| 44 | + "StartDate": PropertyType(False, is_str()), |
| 45 | + "EndDate": PropertyType(False, is_str()), |
| 46 | + "ScheduleExpressionTimezone": PropertyType(False, is_str()), |
| 47 | + "GroupName": PropertyType(False, is_str()), |
| 48 | + "KmsKeyArn": PropertyType(False, is_str()), |
| 49 | + "Input": PropertyType(False, is_str()), |
| 50 | + "RoleArn": PropertyType(False, is_str()), |
| 51 | + "DeadLetterConfig": PropertyType(False, is_type(dict)), |
| 52 | + "RetryPolicy": PropertyType(False, is_type(dict)), |
| 53 | + } |
| 54 | + |
| 55 | + # Below are type hints, must maintain consistent with properties_types |
| 56 | + # - pass-through to generated IAM role |
| 57 | + PermissionsBoundary: Optional[str] |
| 58 | + # - pass-through to AWS::Scheduler::Schedule |
| 59 | + ScheduleExpression: str |
| 60 | + FlexibleTimeWindow: Optional[Dict[str, Any]] |
| 61 | + Name: Optional[str] |
| 62 | + State: Optional[str] |
| 63 | + Description: Optional[str] |
| 64 | + StartDate: Optional[str] |
| 65 | + EndDate: Optional[str] |
| 66 | + ScheduleExpressionTimezone: Optional[str] |
| 67 | + GroupName: Optional[str] |
| 68 | + KmsKeyArn: Optional[str] |
| 69 | + # - pass-through to AWS::Scheduler::Schedule's Target |
| 70 | + Input: Optional[str] |
| 71 | + RoleArn: Optional[str] |
| 72 | + DeadLetterConfig: Optional[Dict[str, Any]] |
| 73 | + RetryPolicy: Optional[Dict[str, Any]] |
| 74 | + |
| 75 | + DEFAULT_FLEXIBLE_TIME_WINDOW = {"Mode": "OFF"} |
| 76 | + |
| 77 | + @cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX) # type: ignore |
| 78 | + def to_cloudformation(self, **kwargs: Dict[str, Any]) -> List[Resource]: |
| 79 | + """Returns the Scheduler Schedule and an IAM role. |
| 80 | +
|
| 81 | + :param dict kwargs: no existing resources need to be modified |
| 82 | + :returns: a list of vanilla CloudFormation Resources, to which this push event expands |
| 83 | + :rtype: list |
| 84 | + """ |
| 85 | + |
| 86 | + target: Resource |
| 87 | + |
| 88 | + # For SAM statemachine, the resource object is passed using kwargs["resource"], |
| 89 | + # https://github.com/aws/serverless-application-model/blob/a25933379e1cad3d0df4b35729ee2ec335402fdf/samtranslator/model/stepfunctions/generators.py#L266 |
| 90 | + if kwargs.get("resource"): |
| 91 | + target_type = _SchedulerScheduleTargetType.STATE_MACHINE |
| 92 | + target = cast(Resource, kwargs["resource"]) |
| 93 | + # for SAM function, the resource object is passed using kwargs["function"], |
| 94 | + # unlike SFN using "resource" keyword argument: |
| 95 | + # https://github.com/aws/serverless-application-model/blob/a25933379e1cad3d0df4b35729ee2ec335402fdf/samtranslator/model/sam_resources.py#L681 |
| 96 | + elif kwargs.get("function"): |
| 97 | + target_type = _SchedulerScheduleTargetType.FUNCTION |
| 98 | + target = cast(Resource, kwargs["function"]) |
| 99 | + else: |
| 100 | + raise TypeError("Missing required keyword argument: function/resource") |
| 101 | + |
| 102 | + passthrough_resource_attributes = target.get_passthrough_resource_attributes() # type: ignore[no-untyped-call] |
| 103 | + |
| 104 | + resources: List[Resource] = [] |
| 105 | + |
| 106 | + scheduler_schedule = self._construct_scheduler_schedule_without_target(passthrough_resource_attributes) |
| 107 | + resources.append(scheduler_schedule) |
| 108 | + |
| 109 | + dlq_queue_arn: Optional[str] = None |
| 110 | + if self.DeadLetterConfig is not None: |
| 111 | + # The dql config spec is the same as normal "Schedule" event, |
| 112 | + # so continue to use EventBridgeRuleUtils for validation. |
| 113 | + # However, Scheduler doesn't use AWS::SQS::QueuePolicy to grant permissions. |
| 114 | + # so we cannot use EventBridgeRuleUtils.get_dlq_queue_arn_and_resources() here. |
| 115 | + EventBridgeRuleUtils.validate_dlq_config(self.logical_id, self.DeadLetterConfig) # type: ignore[no-untyped-call] |
| 116 | + dlq_queue_arn, dlq_resources = self._get_dlq_queue_arn_and_resources( |
| 117 | + self.DeadLetterConfig, passthrough_resource_attributes |
| 118 | + ) |
| 119 | + resources.extend(dlq_resources) |
| 120 | + |
| 121 | + execution_role_arn: Union[str, Dict[str, Any]] = self.RoleArn # type: ignore[assignment] |
| 122 | + if not execution_role_arn: |
| 123 | + execution_role = self._construct_execution_role( |
| 124 | + target, target_type, passthrough_resource_attributes, dlq_queue_arn, self.PermissionsBoundary |
| 125 | + ) |
| 126 | + resources.append(execution_role) |
| 127 | + execution_role_arn = execution_role.get_runtime_attr("arn") # type: ignore[no-untyped-call] |
| 128 | + |
| 129 | + scheduler_schedule.Target = self._construct_scheduler_schedule_target(target, execution_role_arn, dlq_queue_arn) |
| 130 | + |
| 131 | + return resources |
| 132 | + |
| 133 | + def _construct_scheduler_schedule_without_target( |
| 134 | + self, passthrough_resource_attributes: Dict[str, Any] |
| 135 | + ) -> SchedulerSchedule: |
| 136 | + scheduler_schedule = SchedulerSchedule(self.logical_id, attributes=passthrough_resource_attributes) |
| 137 | + scheduler_schedule.ScheduleExpression = self.ScheduleExpression |
| 138 | + |
| 139 | + if self.State: |
| 140 | + scheduler_schedule.State = self.State |
| 141 | + |
| 142 | + # Scheduler schedule's Name is a required property |
| 143 | + scheduler_schedule.Name = self.Name or self.logical_id |
| 144 | + |
| 145 | + # pass-through other properties |
| 146 | + scheduler_schedule.Description = self.Description |
| 147 | + scheduler_schedule.FlexibleTimeWindow = self.FlexibleTimeWindow or self.DEFAULT_FLEXIBLE_TIME_WINDOW |
| 148 | + scheduler_schedule.StartDate = self.StartDate |
| 149 | + scheduler_schedule.EndDate = self.EndDate |
| 150 | + scheduler_schedule.ScheduleExpressionTimezone = self.ScheduleExpressionTimezone |
| 151 | + scheduler_schedule.GroupName = self.GroupName |
| 152 | + scheduler_schedule.KmsKeyArn = self.KmsKeyArn |
| 153 | + |
| 154 | + return scheduler_schedule |
| 155 | + |
| 156 | + def _construct_execution_role( |
| 157 | + self, |
| 158 | + target: Resource, |
| 159 | + target_type: _SchedulerScheduleTargetType, |
| 160 | + passthrough_resource_attributes: Dict[str, Any], |
| 161 | + dlq_queue_arn: Optional[str], |
| 162 | + permissions_boundary: Optional[str], |
| 163 | + ) -> IAMRole: |
| 164 | + """Constructs the execution role for Scheduler Schedule.""" |
| 165 | + if target_type == _SchedulerScheduleTargetType.FUNCTION: |
| 166 | + policy = IAMRolePolicies.lambda_invoke_function_role_policy(target.get_runtime_attr("arn"), self.logical_id) # type: ignore[no-untyped-call, no-untyped-call] |
| 167 | + elif target_type == _SchedulerScheduleTargetType.STATE_MACHINE: |
| 168 | + policy = IAMRolePolicies.step_functions_start_execution_role_policy( # type: ignore[no-untyped-call] |
| 169 | + target.get_runtime_attr("arn"), self.logical_id # type: ignore[no-untyped-call] |
| 170 | + ) |
| 171 | + else: |
| 172 | + raise RuntimeError(f"Unexpected target type {target_type.name}") |
| 173 | + |
| 174 | + role_logical_id = LogicalIdGenerator(self.logical_id + "Role").gen() # type: ignore[no-untyped-call, no-untyped-call] |
| 175 | + execution_role = IAMRole(role_logical_id, attributes=passthrough_resource_attributes) |
| 176 | + execution_role.AssumeRolePolicyDocument = IAMRolePolicies.scheduler_assume_role_policy() |
| 177 | + |
| 178 | + policies = [policy] |
| 179 | + if dlq_queue_arn: |
| 180 | + policies.append(IAMRolePolicies.sqs_send_message_role_policy(dlq_queue_arn, self.logical_id)) |
| 181 | + execution_role.Policies = policies |
| 182 | + |
| 183 | + if permissions_boundary: |
| 184 | + execution_role.PermissionsBoundary = permissions_boundary |
| 185 | + return execution_role |
| 186 | + |
| 187 | + def _construct_scheduler_schedule_target( |
| 188 | + self, target: Resource, execution_role_arn: Union[str, Dict[str, Any]], dead_letter_queue_arn: Optional[Any] |
| 189 | + ) -> Dict[str, Any]: |
| 190 | + """Constructs the Target property for the Scheduler Schedule. |
| 191 | +
|
| 192 | + :returns: the Target property |
| 193 | + :rtype: dict |
| 194 | +
|
| 195 | + Inspired by https://github.com/aws/serverless-application-model/blob/a25933379e1cad3d0df4b35729ee2ec335402fdf/samtranslator/model/eventsources/push.py#L157 |
| 196 | + """ |
| 197 | + target_dict: Dict[str, Any] = { |
| 198 | + "Arn": target.get_runtime_attr("arn"), # type: ignore[no-untyped-call] |
| 199 | + "RoleArn": execution_role_arn, |
| 200 | + } |
| 201 | + if self.Input is not None: |
| 202 | + target_dict["Input"] = self.Input |
| 203 | + |
| 204 | + if self.DeadLetterConfig is not None: |
| 205 | + target_dict["DeadLetterConfig"] = {"Arn": dead_letter_queue_arn} |
| 206 | + |
| 207 | + if self.RetryPolicy is not None: |
| 208 | + target_dict["RetryPolicy"] = self.RetryPolicy |
| 209 | + |
| 210 | + return target_dict |
| 211 | + |
| 212 | + def _get_dlq_queue_arn_and_resources( |
| 213 | + self, dlq_config: Dict[str, Any], passthrough_resource_attributes: Optional[Dict[str, Any]] |
| 214 | + ) -> Tuple[Any, List[Resource]]: |
| 215 | + """ |
| 216 | + Returns dlq queue arn and dlq_resources, assuming self.DeadLetterConfig has been validated. |
| 217 | +
|
| 218 | + Inspired by https://github.com/aws/serverless-application-model/blob/a25933379e1cad3d0df4b35729ee2ec335402fdf/samtranslator/model/eventbridge_utils.py#L44 |
| 219 | + """ |
| 220 | + dlq_queue_arn = dlq_config.get("Arn") |
| 221 | + if dlq_queue_arn is not None: |
| 222 | + return dlq_queue_arn, [] |
| 223 | + queue_logical_id = dlq_config.get("QueueLogicalId") |
| 224 | + if queue_logical_id is not None and not isinstance(queue_logical_id, str): |
| 225 | + raise InvalidEventException( |
| 226 | + self.logical_id, |
| 227 | + "QueueLogicalId must be a string", |
| 228 | + ) |
| 229 | + dlq_resources: List[Resource] = [] |
| 230 | + queue = SQSQueue(queue_logical_id or self.logical_id + "Queue", attributes=passthrough_resource_attributes) |
| 231 | + dlq_resources.append(queue) |
| 232 | + |
| 233 | + dlq_queue_arn = queue.get_runtime_attr("arn") # type: ignore[no-untyped-call] |
| 234 | + return dlq_queue_arn, dlq_resources |
0 commit comments