Skip to content

Commit 36afb70

Browse files
authored
chore: adding types part 2 (#2749)
1 parent 1b5a469 commit 36afb70

File tree

8 files changed

+59
-53
lines changed

8 files changed

+59
-53
lines changed

samtranslator/intrinsics/resolver.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
# Help resolve intrinsic functions
2-
from typing import Any
2+
from typing import Any, Dict, Optional
33

44
from samtranslator.intrinsics.actions import Action, SubAction, RefAction, GetAttAction
55
from samtranslator.model.exceptions import InvalidTemplateException, InvalidDocumentException
6+
from samtranslator.intrinsics.resource_refs import SupportedResourceReferences
67

78
# All intrinsics are supported by default
89
DEFAULT_SUPPORTED_INTRINSICS = {action.intrinsic_name: action() for action in [RefAction, SubAction, GetAttAction]}
910

1011

1112
class IntrinsicsResolver(object):
12-
def __init__(self, parameters, supported_intrinsics=None): # type: ignore[no-untyped-def]
13+
def __init__(self, parameters: Dict[str, Any], supported_intrinsics: Optional[Dict[str, Any]] = None) -> None:
1314
"""
1415
Instantiate the resolver
1516
:param dict parameters: Map of parameter names to their values
@@ -46,7 +47,9 @@ def resolve_parameter_refs(self, _input: Any) -> Any:
4647
"""
4748
return self._traverse(_input, self.parameters, self._try_resolve_parameter_refs) # type: ignore[no-untyped-call]
4849

49-
def resolve_sam_resource_refs(self, input, supported_resource_refs): # type: ignore[no-untyped-def]
50+
def resolve_sam_resource_refs(
51+
self, input: Dict[str, Any], supported_resource_refs: SupportedResourceReferences
52+
) -> Any:
5053
"""
5154
Customers can provide a reference to a "derived" SAM resource such as Alias of a Function or Stage of an API
5255
resource. This method recursively walks the tree, converting all derived references to the real resource name,
@@ -70,7 +73,7 @@ def resolve_sam_resource_refs(self, input, supported_resource_refs): # type: ig
7073
"""
7174
return self._traverse(input, supported_resource_refs, self._try_resolve_sam_resource_refs) # type: ignore[no-untyped-call]
7275

73-
def resolve_sam_resource_id_refs(self, input, supported_resource_id_refs): # type: ignore[no-untyped-def]
76+
def resolve_sam_resource_id_refs(self, input: Dict[str, Any], supported_resource_id_refs: Dict[str, str]) -> Any:
7477
"""
7578
Some SAM resources have their logical ids mutated from the original id that the customer writes in the
7679
template. This method recursively walks the tree and updates these logical ids from the old value

samtranslator/model/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class DuplicateLogicalIdException(ExceptionWithMessage):
4848
message -- explanation of the error
4949
"""
5050

51-
def __init__(self, logical_id, duplicate_id, type): # type: ignore[no-untyped-def]
51+
def __init__(self, logical_id: str, duplicate_id: str, type: str) -> None:
5252
self._logical_id = logical_id
5353
self._duplicate_id = duplicate_id
5454
self._type = type

samtranslator/model/preferences/deployment_preference_collection.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Dict, Optional, cast
1+
from typing import Any, Dict, Optional, cast, List, Union
22

33
from .deployment_preference import DeploymentPreference
44
from samtranslator.model.codedeploy import CodeDeployApplication
@@ -94,7 +94,7 @@ def can_skip_service_role(self): # type: ignore[no-untyped-def]
9494
"""
9595
return all(preference.role or not preference.enabled for preference in self._resource_preferences.values())
9696

97-
def needs_resource_condition(self): # type: ignore[no-untyped-def]
97+
def needs_resource_condition(self) -> Union[Dict[str, Any], bool]:
9898
"""
9999
If all preferences have a condition, all code deploy resources need to be conditionally created
100100
:return: True, if a condition needs to be created
@@ -104,7 +104,7 @@ def needs_resource_condition(self): # type: ignore[no-untyped-def]
104104
not preference.condition and preference.enabled for preference in self._resource_preferences.values()
105105
)
106106

107-
def get_all_deployment_conditions(self): # type: ignore[no-untyped-def]
107+
def get_all_deployment_conditions(self) -> List[str]:
108108
"""
109109
Returns a list of all conditions associated with the deployment preference resources
110110
:return: List of condition names
@@ -115,14 +115,14 @@ def get_all_deployment_conditions(self): # type: ignore[no-untyped-def]
115115
conditions_set.remove(None)
116116
return list(conditions_set)
117117

118-
def create_aggregate_deployment_condition(self): # type: ignore[no-untyped-def]
118+
def create_aggregate_deployment_condition(self) -> Union[None, Dict[str, Dict[str, List[Dict[str, Any]]]]]:
119119
"""
120120
Creates an aggregate deployment condition if necessary
121121
:return: None if <2 conditions are found, otherwise a dictionary of new conditions to add to template
122122
"""
123-
return make_combined_condition(self.get_all_deployment_conditions(), CODE_DEPLOY_CONDITION_NAME) # type: ignore[no-untyped-call, no-untyped-call]
123+
return make_combined_condition(self.get_all_deployment_conditions(), CODE_DEPLOY_CONDITION_NAME)
124124

125-
def enabled_logical_ids(self): # type: ignore[no-untyped-def]
125+
def enabled_logical_ids(self) -> List[str]:
126126
"""
127127
:return: only the logical id's for the deployment preferences in this collection which are enabled
128128
"""
@@ -131,8 +131,8 @@ def enabled_logical_ids(self): # type: ignore[no-untyped-def]
131131
def get_codedeploy_application(self): # type: ignore[no-untyped-def]
132132
codedeploy_application_resource = CodeDeployApplication(CODEDEPLOY_APPLICATION_LOGICAL_ID)
133133
codedeploy_application_resource.ComputePlatform = "Lambda"
134-
if self.needs_resource_condition(): # type: ignore[no-untyped-call]
135-
conditions = self.get_all_deployment_conditions() # type: ignore[no-untyped-call]
134+
if self.needs_resource_condition():
135+
conditions = self.get_all_deployment_conditions()
136136
condition_name = CODE_DEPLOY_CONDITION_NAME
137137
if len(conditions) <= 1:
138138
condition_name = conditions.pop()
@@ -163,8 +163,8 @@ def get_codedeploy_iam_role(self): # type: ignore[no-untyped-def]
163163
ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSCodeDeployRoleForLambda")
164164
]
165165

166-
if self.needs_resource_condition(): # type: ignore[no-untyped-call]
167-
conditions = self.get_all_deployment_conditions() # type: ignore[no-untyped-call]
166+
if self.needs_resource_condition():
167+
conditions = self.get_all_deployment_conditions()
168168
condition_name = CODE_DEPLOY_CONDITION_NAME
169169
if len(conditions) <= 1:
170170
condition_name = conditions.pop()

samtranslator/plugins/application/serverless_app_plugin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ def _replace_value(self, input_dict, key, intrinsic_resolvers): # type: ignore[
165165

166166
def _get_intrinsic_resolvers(self, mappings): # type: ignore[no-untyped-def]
167167
return [
168-
IntrinsicsResolver(self._parameters), # type: ignore[no-untyped-call]
169-
IntrinsicsResolver(mappings, {FindInMapAction.intrinsic_name: FindInMapAction()}), # type: ignore[no-untyped-call, no-untyped-call]
168+
IntrinsicsResolver(self._parameters),
169+
IntrinsicsResolver(mappings, {FindInMapAction.intrinsic_name: FindInMapAction()}),
170170
]
171171

172172
def _resolve_location_value(self, value, intrinsic_resolvers): # type: ignore[no-untyped-def]

samtranslator/plugins/sam_plugins.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from typing import Optional, Any, List, Union
23
from samtranslator.model.exceptions import InvalidResourceException, InvalidDocumentException, InvalidTemplateException
34
from samtranslator.plugins import BasePlugin, LifeCycleEvents
45

@@ -45,13 +46,13 @@ class SamPlugins(object):
4546
set by the plugin. SAM translator will convert this into a nice error message and display to the user.
4647
"""
4748

48-
def __init__(self, initial_plugins=None): # type: ignore[no-untyped-def]
49+
def __init__(self, initial_plugins: Optional[Union[BasePlugin, List[BasePlugin]]] = None) -> None:
4950
"""
5051
Initialize the plugins class with an optional list of plugins
5152
5253
:param BasePlugin or list initial_plugins: Single plugin or a List of plugins to initialize with
5354
"""
54-
self._plugins = []
55+
self._plugins: List[BasePlugin] = []
5556

5657
if initial_plugins is None:
5758
initial_plugins = []
@@ -75,12 +76,12 @@ def register(self, plugin): # type: ignore[no-untyped-def]
7576
if not plugin or not isinstance(plugin, BasePlugin):
7677
raise ValueError("Plugin must be implemented as a subclass of BasePlugin class")
7778

78-
if self.is_registered(plugin.name): # type: ignore[no-untyped-call]
79+
if self.is_registered(plugin.name):
7980
raise ValueError("Plugin with name {} is already registered".format(plugin.name))
8081

8182
self._plugins.append(plugin)
8283

83-
def is_registered(self, plugin_name): # type: ignore[no-untyped-def]
84+
def is_registered(self, plugin_name: str) -> bool:
8485
"""
8586
Checks if a plugin with given name is already registered
8687
@@ -90,7 +91,7 @@ def is_registered(self, plugin_name): # type: ignore[no-untyped-def]
9091

9192
return plugin_name in [p.name for p in self._plugins]
9293

93-
def _get(self, plugin_name): # type: ignore[no-untyped-def]
94+
def _get(self, plugin_name: str) -> Union[Any, None]:
9495
"""
9596
Retrieves the plugin with given name
9697
@@ -104,7 +105,7 @@ def _get(self, plugin_name): # type: ignore[no-untyped-def]
104105

105106
return None
106107

107-
def act(self, event, *args, **kwargs): # type: ignore[no-untyped-def]
108+
def act(self, event: LifeCycleEvents, *args: Any, **kwargs: Any) -> None:
108109
"""
109110
Act on the specific life cycle event. The action here is to invoke the hook function on all registered plugins.
110111
*args and **kwargs will be passed directly to the plugin's hook functions
@@ -137,7 +138,7 @@ def act(self, event, *args, **kwargs): # type: ignore[no-untyped-def]
137138
LOG.exception("Plugin '%s' raised an exception: %s", plugin.name, ex)
138139
raise ex
139140

140-
def __len__(self): # type: ignore[no-untyped-def]
141+
def __len__(self) -> int:
141142
"""
142143
Returns the number of plugins registered with this class
143144

samtranslator/policy_template_processor/processor.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import jsonschema
33
from samtranslator import policy_templates_data
4-
4+
from typing import Dict, Any, Optional
55
from jsonschema.exceptions import ValidationError
66
from samtranslator.policy_template_processor.template import Template
77
from samtranslator.policy_template_processor.exceptions import TemplateNotFoundException
@@ -48,15 +48,15 @@ class PolicyTemplatesProcessor(object):
4848
# ./policy_templates.json
4949
DEFAULT_POLICY_TEMPLATES_FILE = policy_templates_data.POLICY_TEMPLATES_FILE
5050

51-
def __init__(self, policy_templates_dict, schema=None): # type: ignore[no-untyped-def]
51+
def __init__(self, policy_templates_dict: Dict[str, Any], schema: Optional[Dict[str, Any]] = None):
5252
"""
5353
Initialize the class
5454
5555
:param policy_templates_dict: Dictionary containing the policy templates definition
5656
:param dict schema: Dictionary containing the JSON Schema of policy templates
5757
:raises ValueError: If policy templates does not match up with the schema
5858
"""
59-
PolicyTemplatesProcessor._is_valid_templates_dict(policy_templates_dict, schema) # type: ignore[no-untyped-call]
59+
PolicyTemplatesProcessor._is_valid_templates_dict(policy_templates_dict, schema)
6060

6161
self.policy_templates = {}
6262
for template_name, template_value_dict in policy_templates_dict["Templates"].items():
@@ -81,7 +81,7 @@ def get(self, template_name): # type: ignore[no-untyped-def]
8181
"""
8282
return self.policy_templates.get(template_name, None)
8383

84-
def convert(self, template_name, parameter_values): # type: ignore[no-untyped-def]
84+
def convert(self, template_name: str, parameter_values: str) -> Any:
8585
"""
8686
Converts the given template to IAM-ready policy statement by substituting template parameters with the given
8787
values.
@@ -100,7 +100,9 @@ def convert(self, template_name, parameter_values): # type: ignore[no-untyped-d
100100
return template.to_statement(parameter_values)
101101

102102
@staticmethod
103-
def _is_valid_templates_dict(policy_templates_dict, schema=None): # type: ignore[no-untyped-def]
103+
def _is_valid_templates_dict(
104+
policy_templates_dict: Dict[Any, Any], schema: Optional[Dict[Any, Any]] = None
105+
) -> bool:
104106
"""
105107
Is this a valid policy template dictionary
106108
@@ -111,7 +113,7 @@ def _is_valid_templates_dict(policy_templates_dict, schema=None): # type: ignor
111113
"""
112114

113115
if not schema:
114-
schema = PolicyTemplatesProcessor._read_schema() # type: ignore[no-untyped-call]
116+
schema = PolicyTemplatesProcessor._read_schema()
115117

116118
try:
117119
jsonschema.validate(policy_templates_dict, schema)
@@ -122,17 +124,17 @@ def _is_valid_templates_dict(policy_templates_dict, schema=None): # type: ignor
122124
return True
123125

124126
@staticmethod
125-
def get_default_policy_templates_json(): # type: ignore[no-untyped-def]
127+
def get_default_policy_templates_json() -> Any:
126128
"""
127129
Reads and returns the default policy templates JSON data from file.
128130
129131
:return dict: Dictionary containing data read from default policy templates JSON file
130132
"""
131133

132-
return PolicyTemplatesProcessor._read_json(PolicyTemplatesProcessor.DEFAULT_POLICY_TEMPLATES_FILE) # type: ignore[no-untyped-call]
134+
return PolicyTemplatesProcessor._read_json(PolicyTemplatesProcessor.DEFAULT_POLICY_TEMPLATES_FILE)
133135

134136
@staticmethod
135-
def _read_schema(): # type: ignore[no-untyped-def]
137+
def _read_schema() -> Any:
136138
"""
137139
Reads the JSON Schema at given file path
138140
@@ -141,10 +143,10 @@ def _read_schema(): # type: ignore[no-untyped-def]
141143
:return dict: JSON Schema of the policy template
142144
"""
143145

144-
return PolicyTemplatesProcessor._read_json(PolicyTemplatesProcessor.SCHEMA_LOCATION) # type: ignore[no-untyped-call]
146+
return PolicyTemplatesProcessor._read_json(PolicyTemplatesProcessor.SCHEMA_LOCATION)
145147

146148
@staticmethod
147-
def _read_json(filepath): # type: ignore[no-untyped-def]
149+
def _read_json(filepath: str) -> Any:
148150
"""
149151
Helper method to read a JSON file
150152
:param filepath: Path to the file

samtranslator/policy_template_processor/template.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def to_statement(self, parameter_values): # type: ignore[no-untyped-def]
6161
# Only "Ref" is supported
6262
supported_intrinsics = {RefAction.intrinsic_name: RefAction()}
6363

64-
resolver = IntrinsicsResolver(necessary_parameter_values, supported_intrinsics) # type: ignore[no-untyped-call]
64+
resolver = IntrinsicsResolver(necessary_parameter_values, supported_intrinsics)
6565
definition_copy = self._disambiguate_policy_parameter(self.definition)
6666

6767
return resolver.resolve_parameter_refs(definition_copy)

0 commit comments

Comments
 (0)