Skip to content

Commit 2027309

Browse files
committed
pylint issues
1 parent 5484ca7 commit 2027309

File tree

1 file changed

+162
-96
lines changed

1 file changed

+162
-96
lines changed
Lines changed: 162 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
14
"""
2-
Python class responsible for updating the nuke generic config , based on exceptions to be filtered
3-
and also updates dynamically the region attribute passed in from the StepFunctions invocation. This should be modified to suit your needs.
5+
Python class responsible for updating the nuke generic config, based on exceptions to be filtered,
6+
and also updates dynamically the region attribute passed in from the StepFunctions invocation.
7+
This should be modified to suit your needs.
48
"""
9+
510
import argparse
611
import copy
12+
import logging
13+
from typing import Any, Dict, List, Tuple
714

815
import boto3
916
import yaml
17+
from boto3.exceptions import ClientError
1018

1119
GLOBAL_RESOURCE_EXCEPTIONS = [
1220
{"property": "tag:DoNotNuke", "value": "True"},
@@ -19,19 +27,42 @@
1927

2028

2129
class StackInfo:
22-
def __init__(self, account, target_regions):
30+
"""
31+
Class responsible for managing StackInfo operations.
32+
33+
Attributes:
34+
session (boto3.Session): AWS session object.
35+
regions (List[str]): List of target regions.
36+
resources (Dict[str, List[Dict[str, str]]]): Dictionary of resources and their exceptions.
37+
config (Dict[str, Any]): Configuration dictionary.
38+
account (str): AWS account ID.
39+
"""
40+
41+
def __init__(self, account: str, target_regions: List[str]) -> None:
42+
"""
43+
Initialize StackInfo object.
44+
45+
Args:
46+
account (str): AWS account ID.
47+
target_regions (List[str]): List of target regions.
48+
"""
2349
self.session = boto3.Session(profile_name="nuke")
24-
# Regions to be targeted set from the Stepfunctions/CodeBuild workflow
2550
self.regions = target_regions
26-
self.resources = {}
27-
self.config = {}
51+
self.resources: Dict[str, List[Dict[str, str]]] = {}
52+
self.config: Dict[str, Any] = {}
2853
self.account = account
2954

30-
def Populate(self):
31-
self.UpdateCFNStackList()
32-
self.OverrideDefaultConfig()
55+
def populate(self) -> None:
56+
"""
57+
Populate resources and override the default configuration.
58+
"""
59+
self.update_cfn_stack_list()
60+
self.override_default_config()
3361

34-
def UpdateCFNStackList(self):
62+
def update_cfn_stack_list(self) -> None:
63+
"""
64+
Update the list of CloudFormation stacks and resources.
65+
"""
3566
try:
3667
for region in self.regions:
3768
cfn_client = self.session.client("cloudformation", region_name=region)
@@ -46,22 +77,28 @@ def UpdateCFNStackList(self):
4677
]
4778
)
4879
for page in responses:
49-
for stack in page.get("StackSummaries"):
50-
self.GetCFNResources(stack, cfn_client)
51-
self.BuildIamExclusionList(region)
52-
except Exception as e:
53-
print("Error in calling UpdateCFNStackList:\n {}".format(e))
80+
for stack in page.get("StackSummaries", []):
81+
self.get_cfn_resources(stack, cfn_client)
82+
self.build_iam_exclusion_list(region)
83+
except ClientError as e:
84+
logging.error(f"Error in calling update_cfn_stack_list: {e}")
85+
86+
def get_cfn_resources(self, stack: Dict[str, Any], cfn_client) -> None:
87+
"""
88+
Get resources from a CloudFormation stack.
5489
55-
def GetCFNResources(self, stack, cfn_client):
90+
Args:
91+
stack (Dict[str, Any]): CloudFormation stack details.
92+
cfn_client: CloudFormation client object.
93+
"""
5694
try:
57-
stack_name = stack.get("StackName")
95+
stack_name = stack.get("StackName") or stack.get("PhysicalResourceId")
5896

5997
if stack_name is None:
60-
stack_name = stack.get("PhysicalResourceId")
98+
return
6199

62100
stack_description = cfn_client.describe_stacks(StackName=stack_name)
63-
print("Stack Description: ", stack_description)
64-
tags = stack_description.get("Stacks")[0].get("Tags")
101+
tags = stack_description.get("Stacks", [{}])[0].get("Tags", [])
65102
for tag in tags:
66103
key = tag.get("Key")
67104
value = tag.get("Value")
@@ -75,11 +112,11 @@ def GetCFNResources(self, stack, cfn_client):
75112
stack_resources = cfn_client.list_stack_resources(
76113
StackName=stack_name
77114
)
78-
for resource in stack_resources.get("StackResourceSummaries"):
115+
for resource in stack_resources.get("StackResourceSummaries", []):
79116
if resource.get("ResourceType") == "AWS::CloudFormation::Stack":
80-
self.GetCFNResources(resource, cfn_client)
117+
self.get_cfn_resources(resource, cfn_client)
81118
else:
82-
nuke_type = self.UpdateResourceName(
119+
nuke_type = self.update_resource_name(
83120
resource["ResourceType"]
84121
)
85122
if nuke_type in self.resources:
@@ -96,99 +133,128 @@ def GetCFNResources(self, stack, cfn_client):
96133
"value": resource["PhysicalResourceId"],
97134
}
98135
]
99-
except Exception as e:
100-
print("Error calling GetCFNResources:\n {}".format(e))
136+
except ClientError as e:
137+
logging.error(f"Error calling get_cfn_resources: {e}")
101138

102-
def UpdateResourceName(self, resource):
103-
nuke_type = str.replace(resource, "AWS::", "")
104-
nuke_type = str.replace(nuke_type, "::", "")
105-
nuke_type = str.replace(nuke_type, "Config", "ConfigService", 1)
139+
def update_resource_name(self, resource: str) -> str:
140+
"""
141+
Update the resource name to match the nuke resource type.
142+
143+
Args:
144+
resource (str): Resource name.
145+
146+
Returns:
147+
str: Updated resource name.
148+
"""
149+
nuke_type = resource.replace("AWS::", "")
150+
nuke_type = nuke_type.replace("::", "")
151+
nuke_type = nuke_type.replace("Config", "ConfigService", 1)
106152
return nuke_type
107153

108-
def BuildIamExclusionList(self, region):
109-
# This excludes and appends to the config IAMRole resources , the roles that are federated principals
110-
# You can add any other custom filterting logic based on regions for IAM/Global roles that should be excluded
111-
iam_client = self.session.client("iam", region_name=region)
112-
iam_paginator = iam_client.get_paginator("list_roles")
113-
responses = iam_paginator.paginate()
114-
for page in responses:
115-
for role in page["Roles"]:
116-
apd = role.get("AssumeRolePolicyDocument")
117-
if apd is not None:
118-
for item in apd.get("Statement"):
119-
if item is not None:
120-
for principal in item.get("Principal"):
121-
if principal == "Federated":
122-
if "IAMRole" in self.resources:
123-
self.resources["IAMRole"].append(
124-
role.get("RoleName")
125-
)
126-
else:
127-
self.resources["IAMRole"] = [
128-
role.get("RoleName")
129-
]
130-
131-
def OverrideDefaultConfig(self):
132-
# Open the nuke_generic_config.yaml and merge the captured resources/exclusions with it
154+
def build_iam_exclusion_list(self, region: str) -> None:
155+
"""
156+
Build the IAM role exclusion list for the given region.
157+
158+
Args:
159+
region (str): AWS region.
160+
"""
161+
try:
162+
iam_client = self.session.client("iam", region_name=region)
163+
iam_paginator = iam_client.get_paginator("list_roles")
164+
responses = iam_paginator.paginate()
165+
for page in responses:
166+
for role in page.get("Roles", []):
167+
assume_role_policy_document = role.get("AssumeRolePolicyDocument")
168+
if assume_role_policy_document:
169+
for statement in assume_role_policy_document.get(
170+
"Statement", []
171+
):
172+
if statement.get("Principal", {}).get("Federated"):
173+
if "IAMRole" in self.resources:
174+
self.resources["IAMRole"].append(
175+
role.get("RoleName")
176+
)
177+
else:
178+
self.resources["IAMRole"] = [role.get("RoleName")]
179+
except ClientError as e:
180+
logging.error(f"Error building IAM exclusion list: {e}")
181+
182+
def override_default_config(self) -> None:
183+
"""
184+
Override the default configuration with captured resources and exclusions.
185+
"""
133186
try:
134-
with open(r"nuke_generic_config.yaml") as config_file:
135-
self.config = yaml.load(config_file)
187+
with open("nuke_generic_config.yaml") as config_file:
188+
self.config = yaml.safe_load(config_file)
189+
136190
# Not all resources handled by the tool, but we will add them to the exclusion anyhow.
137-
for resource in self.resources:
138-
if resource in self.config["accounts"]["ACCOUNT"]["filters"]:
139-
self.config["accounts"]["ACCOUNT"]["filters"][resource].extend(
140-
self.resources[resource]
141-
)
191+
for resource, exceptions in self.resources.items():
192+
account_filters = self.config["accounts"]["ACCOUNT"]["filters"]
193+
if resource in account_filters:
194+
account_filters[resource].extend(exceptions)
142195
else:
143-
self.config["accounts"]["ACCOUNT"]["filters"][
144-
resource
145-
] = self.resources[resource]
196+
account_filters[resource] = exceptions
197+
146198
self.config["accounts"][self.account] = copy.deepcopy(
147199
self.config["accounts"]["ACCOUNT"]
148200
)
149-
if "ACCOUNT" in self.config["accounts"]:
150-
self.config["accounts"].pop("ACCOUNT", None)
201+
self.config["accounts"].pop("ACCOUNT", None)
202+
151203
# Global exclusions apply to every type of resource
152-
for resource in self.config["accounts"][self.account]["filters"]:
204+
for resource, exceptions in self.config["accounts"][self.account][
205+
"filters"
206+
].items():
153207
for exception in GLOBAL_RESOURCE_EXCEPTIONS:
154-
self.config["accounts"][self.account]["filters"][resource].append(
155-
exception.copy()
156-
)
157-
config_file.close()
158-
except Exception as e:
159-
print("Failed merging nuke-config-test.yaml with error {}".format(e))
160-
exit(1)
208+
exceptions.append(exception.copy())
209+
except ClientError as e:
210+
logging.error(f"Failed merging nuke-config-test.yaml with error {e}")
161211

162-
def WriteConfig(self):
163-
# CodeBuild script updates the target region in the generic config and is validated here.
212+
def write_config(self) -> None:
213+
"""
214+
Write the configuration to separate files for each target region.
215+
"""
164216
try:
165217
for region in self.config["regions"]:
166-
local_config = stackInfo.config.copy()
218+
local_config = self.config.copy()
167219
local_config["regions"] = [region]
168-
filename = "nuke_config_{}.yaml".format(region)
220+
filename = f"nuke_config_{region}.yaml"
169221
with open(filename, "w") as output_file:
170-
output = yaml.dump(local_config, output_file)
171-
output_file.close()
172-
except Exception as e:
173-
print("Failed opening nuke_config.yaml for writing with error {}".format(e))
222+
yaml.safe_dump(local_config, output_file)
223+
logging.info(f"Successfully wrote config to {filename}")
224+
except KeyError:
225+
logging.error("No 'regions' key found in the config dictionary")
226+
except ClientError as e:
227+
logging.error(f"An unexpected error occurred: {e}")
174228

175229

176-
try:
230+
def parse_arguments() -> Tuple[str, str]:
231+
"""
232+
Parse command-line arguments.
233+
234+
Returns:
235+
Tuple[str, str]: AWS account ID and target region.
236+
"""
177237
parser = argparse.ArgumentParser()
178238
parser.add_argument(
179-
"--account", dest="account", help="Account to nuke"
180-
) # Account and Region from StepFunctions - CodeBuild overridden params
181-
parser.add_argument("--region", dest="region", help="Region to target for nuke")
239+
"--account", dest="account", help="Account to nuke", required=True
240+
)
241+
parser.add_argument(
242+
"--region", dest="region", help="Region to target for nuke", required=True
243+
)
182244
args = parser.parse_args()
183-
if not args.account or not args.region:
184-
parser.print_help()
185-
exit(1)
186-
except Exception as e:
187-
print(e)
188-
exit(1)
245+
return args.account, args.region
246+
247+
248+
def main() -> None:
249+
"""
250+
Main entry point of the script.
251+
"""
252+
account, region = parse_arguments()
253+
stack_info = StackInfo(account, [region])
254+
stack_info.populate()
255+
stack_info.write_config()
256+
189257

190258
if __name__ == "__main__":
191-
print("Incoming Args: ", args)
192-
stackInfo = StackInfo(args.account, [args.region])
193-
stackInfo.Populate()
194-
stackInfo.WriteConfig()
259+
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
260+
main()

0 commit comments

Comments
 (0)