diff --git a/iam_check/argument_actions.py b/iam_check/argument_actions.py index 5e0bd47..671ad18 100644 --- a/iam_check/argument_actions.py +++ b/iam_check/argument_actions.py @@ -4,8 +4,9 @@ """ import argparse -from .lib.reporter import ResourceOrCodeFindingToIgnore, ResourceAndCodeFindingToIgnore, \ - AllowedExternalArn, AllowedExternalPrincipal +from .lib.reporter import (AllowedExternalArn, AllowedExternalPrincipal, + ResourceAndCodeFindingToIgnore, + ResourceOrCodeFindingToIgnore) from .tools import regex_patterns @@ -28,8 +29,8 @@ class ParseFindingsToIgnoreFromCLI(argparse.Action): a combination of both in the form MyResource.FindingA """ - def __call__(self, _, namespace, values, option_string=None): - values = values.split(',') + def __call__(self, _, namespace, values, option_string=None): + values = values.split(",") findings_to_ignore = parse_findings_to_ignore(values) @@ -47,7 +48,9 @@ def parse_findings_to_ignore(values_as_list): if "." in value: resource_and_code = value.split(".", 1) # a split must have at least two members of the array, so no need to validate - finding_to_ignore = ResourceAndCodeFindingToIgnore(resource_and_code[0], resource_and_code[1]) + finding_to_ignore = ResourceAndCodeFindingToIgnore( + resource_and_code[0], resource_and_code[1] + ) else: finding_to_ignore = ResourceOrCodeFindingToIgnore(value) @@ -63,7 +66,7 @@ class ParseAllowExternalPrincipalsFromCLI(argparse.Action): """ def __call__(self, _, namespace, values, option_string=None): - values = values.split(',') + values = values.split(",") allowed_external_principals = parse_allow_external_principals(values) @@ -86,4 +89,4 @@ def parse_allow_external_principals(values_as_list): allowed_external_principals.append(allowed_external_principal) - return allowed_external_principals \ No newline at end of file + return allowed_external_principals diff --git a/iam_check/client.py b/iam_check/client.py index 319e7b5..1f0c0c5 100644 --- a/iam_check/client.py +++ b/iam_check/client.py @@ -6,39 +6,39 @@ from botocore.config import Config config = Config( - retries={ - # this number was chosen arbitrarily, tweak as necessary - 'max_attempts': 30, - 'mode': 'standard' - } + retries={ + # this number was chosen arbitrarily, tweak as necessary + "max_attempts": 30, + "mode": "standard", + } ) def get_account_and_partition(region): - """ - Pull the account and partition from the credentials used to execute the validator - """ + """ + Pull the account and partition from the credentials used to execute the validator + """ - sts_client = build('sts', region) - identity = sts_client.get_caller_identity() - account_id = identity['Account'] + sts_client = build("sts", region) + identity = sts_client.get_caller_identity() + account_id = identity["Account"] - parts = identity['Arn'].split(':') - partition = parts[1] + parts = identity["Arn"].split(":") + partition = parts[1] - return account_id, partition + return account_id, partition def build(service_name, region_name, client_config=None): - if client_config is None: - client_config = config - session = boto3.Session(profile_name=profile_name, region_name=region_name) - return session.client(service_name, config=client_config) + if client_config is None: + client_config = config + session = boto3.Session(profile_name=profile_name, region_name=region_name) + return session.client(service_name, config=client_config) profile_name = None def set_profile(profile): - global profile_name - profile_name = profile \ No newline at end of file + global profile_name + profile_name = profile diff --git a/iam_check/config.py b/iam_check/config.py index 64e9d47..59fb4b1 100644 --- a/iam_check/config.py +++ b/iam_check/config.py @@ -1,34 +1,36 @@ import logging -import yaml import sys +import yaml + # logging configuration -LOGGER = logging.getLogger('iam-policy-validator-for-terraform') +LOGGER = logging.getLogger("iam-policy-validator-for-terraform") # AWS Account ID to use when unknown -awsAccount = '123456789012' +awsAccount = "123456789012" -#IAM Policy checks to run +# IAM Policy checks to run # The default is to run all checks if thhe list is empty # iamChecks = [] -#IAM policy resources +# IAM policy resources iamPolicyAttributes = {} -#Generate fake ARN +# Generate fake ARN # default substitube is {?} arnServiceMap = {} validatePolicyResourceType = {} + def configure_logging(enable_logging): console_handler = logging.StreamHandler(sys.stdout) - #console_handler.setLevel(logging.DEBUG) + # console_handler.setLevel(logging.DEBUG) LOGGER.setLevel(logging.INFO) # log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") console_handler.setFormatter(log_formatter) LOGGER.propagate = False # for handler in LOGGER.handlers: @@ -36,21 +38,22 @@ def configure_logging(enable_logging): LOGGER.addHandler(console_handler) if not enable_logging: LOGGER.disabled = True + + def loadConfigYaml(file): global arnServiceMap global iamPolicyAttributes global validatePolicyResourceType - - with open(file, 'r') as fh: + + with open(file, "r") as fh: data = yaml.safe_load(fh) - arnServiceMap = data.get('arnServiceMap', arnServiceMap) - if 'arnServiceMap' in data: - arnServiceMap = data['arnServiceMap'] - - if 'iamPolicyAttributes' in data: - iamPolicyAttributes = data['iamPolicyAttributes'] - - if 'validatePolicyResourceType' in data: - validatePolicyResourceType = data['validatePolicyResourceType'] - + arnServiceMap = data.get("arnServiceMap", arnServiceMap) + if "arnServiceMap" in data: + arnServiceMap = data["arnServiceMap"] + + if "iamPolicyAttributes" in data: + iamPolicyAttributes = data["iamPolicyAttributes"] + + if "validatePolicyResourceType" in data: + validatePolicyResourceType = data["validatePolicyResourceType"] diff --git a/iam_check/iam_check.py b/iam_check/iam_check.py index 339bc24..3aa003c 100644 --- a/iam_check/iam_check.py +++ b/iam_check/iam_check.py @@ -1,21 +1,25 @@ #!/usr/bin/env python3 import argparse -import logging import json +import logging import sys import traceback -#from .config import loadConfigYaml, iamPolicyAttributes, configure_logging +# from .config import loadConfigYaml, iamPolicyAttributes, configure_logging import iam_check.config as config + +from .argument_actions import (ParseAllowExternalPrincipalsFromCLI, + ParseFindingsToIgnoreFromCLI) from .client import get_account_and_partition, set_profile -from .parameters import validate_region, validate_finding_types_from_cli, validate_credentials -from .argument_actions import ParseFindingsToIgnoreFromCLI, ParseAllowExternalPrincipalsFromCLI -from .lib import iamcheck_AccessAnalyzer, account_config, reporter, tfPlan -from .lib.reporter import default_finding_types_that_are_blocking, Reporter +from .lib import account_config, iamcheck_AccessAnalyzer, reporter, tfPlan +from .lib.reporter import Reporter, default_finding_types_that_are_blocking +from .parameters import (validate_credentials, validate_finding_types_from_cli, + validate_region) # Global Variables -LOGGER = logging.getLogger('iam-policy-validator-for-terraform') +LOGGER = logging.getLogger("iam-policy-validator-for-terraform") + def main(): global policy_checks @@ -23,28 +27,32 @@ def main(): account_id, partition = get_account_and_partition(opts.region) account = account_config.AccountConfig(partition, opts.region, account_id) - validator=iamcheck_AccessAnalyzer.Validator(account.Account, account.Region, account.Partition) + validator = iamcheck_AccessAnalyzer.Validator( + account.Account, account.Region, account.Partition + ) findings = [] - LOGGER.debug(f'Validating terraform plan file: {opts.template_path}') - with open(opts.template_path, 'r') as fh: + LOGGER.debug(f"Validating terraform plan file: {opts.template_path}") + with open(opts.template_path, "r") as fh: data = fh.read() try: data = json.loads(data) plan = tfPlan.TerraformPlan(**data) except Exception as e: - if opts.plan.endswith('.json'): - print(f'Failed to load plan file from: {opts.template_path}') + if opts.plan.endswith(".json"): + print(f"Failed to load plan file from: {opts.template_path}") raise e plan = tfPlan.plan_from_stdout(data) validator.run(plan) - - findings = validator.findings - reporter = Reporter(opts.ignore_finding, opts.treat_as_blocking, opts.allowed_external_principals) + + findings = validator.findings + reporter = Reporter( + opts.ignore_finding, opts.treat_as_blocking, opts.allowed_external_principals + ) report = reporter.build_report_from(findings) - LOGGER.info('Printing findings to the console...') + LOGGER.info("Printing findings to the console...") report.print() - + if report.has_blocking_findings(): exit(2) else: @@ -54,55 +62,88 @@ def main(): def cli_parse_opts(): parser = argparse.ArgumentParser() - parser.add_argument('--template-path', metavar="TEMPLATE_PATH", dest="template_path", required=True, - help='Terraform plan file (JSON)') - parser.add_argument('--region', dest="region", required=True, type=validate_region, - help="The region the resources will be deployed to.") - parser.add_argument('--profile', help='The named profile to use for AWS API calls.') - parser.add_argument("--config", nargs="+", help='Config file for running this script', action='append') - parser.add_argument('--enable-logging', help='Enable detailed logging.', default=False, action='store_true') - parser.add_argument('--ignore-finding', dest="ignore_finding", metavar='FINDING_CODE,RESOURCE_NAME,RESOURCE_NAME.FINDING_CODE', - help='Allow validation failures to be ignored.\n' - 'Specify as a comma separated list of findings to be ignored. Can be individual ' - 'finding codes (e.g. "PASS_ROLE_WITH_STAR_IN_RESOURCE"), a specific resource name ' - '(e.g. "MyResource"), or a combination of both separated by a period.' - '(e.g. "MyResource.PASS_ROLE_WITH_STAR_IN_RESOURCE").', - action=ParseFindingsToIgnoreFromCLI) - parser.add_argument('--treat-finding-type-as-blocking', dest="treat_as_blocking", metavar="ERROR,SECURITY_WARNING", - help='Specify which finding types should be treated as blocking. Other finding types are treated ' - 'as non-blocking. Defaults to "ERROR" and "SECURITY_WARNING". Specify as a comma separated ' - 'list of finding types that should be blocking. Possible values are "ERROR", ' - '"SECURITY_WARNING", "SUGGESTION", and "WARNING". Pass "NONE" to ignore all errors.', - default=default_finding_types_that_are_blocking, type=validate_finding_types_from_cli) - - parser.add_argument('--allow-external-principals', dest='allowed_external_principals', metavar="ACCOUNT,ARN", - help='A comma separated list of external principals that should be ignored. Specify as ' - 'a comma separated list of a 12 digit AWS account ID, a federated web identity ' - 'user, a federated SAML user, or an ARN. Specify "*" to allow anonymous access. ' - '(e.g. 123456789123,arn:aws:iam::111111111111:role/MyOtherRole,graph.facebook.com)', - action=ParseAllowExternalPrincipalsFromCLI) + parser.add_argument( + "--template-path", + metavar="TEMPLATE_PATH", + dest="template_path", + required=True, + help="Terraform plan file (JSON)", + ) + parser.add_argument( + "--region", + dest="region", + required=True, + type=validate_region, + help="The region the resources will be deployed to.", + ) + parser.add_argument("--profile", help="The named profile to use for AWS API calls.") + parser.add_argument( + "--config", + nargs="+", + help="Config file for running this script", + action="append", + ) + parser.add_argument( + "--enable-logging", + help="Enable detailed logging.", + default=False, + action="store_true", + ) + parser.add_argument( + "--ignore-finding", + dest="ignore_finding", + metavar="FINDING_CODE,RESOURCE_NAME,RESOURCE_NAME.FINDING_CODE", + help="Allow validation failures to be ignored.\n" + "Specify as a comma separated list of findings to be ignored. Can be individual " + 'finding codes (e.g. "PASS_ROLE_WITH_STAR_IN_RESOURCE"), a specific resource name ' + '(e.g. "MyResource"), or a combination of both separated by a period.' + '(e.g. "MyResource.PASS_ROLE_WITH_STAR_IN_RESOURCE").', + action=ParseFindingsToIgnoreFromCLI, + ) + parser.add_argument( + "--treat-finding-type-as-blocking", + dest="treat_as_blocking", + metavar="ERROR,SECURITY_WARNING", + help="Specify which finding types should be treated as blocking. Other finding types are treated " + 'as non-blocking. Defaults to "ERROR" and "SECURITY_WARNING". Specify as a comma separated ' + 'list of finding types that should be blocking. Possible values are "ERROR", ' + '"SECURITY_WARNING", "SUGGESTION", and "WARNING". Pass "NONE" to ignore all errors.', + default=default_finding_types_that_are_blocking, + type=validate_finding_types_from_cli, + ) + + parser.add_argument( + "--allow-external-principals", + dest="allowed_external_principals", + metavar="ACCOUNT,ARN", + help="A comma separated list of external principals that should be ignored. Specify as " + "a comma separated list of a 12 digit AWS account ID, a federated web identity " + 'user, a federated SAML user, or an ARN. Specify "*" to allow anonymous access. ' + "(e.g. 123456789123,arn:aws:iam::111111111111:role/MyOtherRole,graph.facebook.com)", + action=ParseAllowExternalPrincipalsFromCLI, + ) args = parser.parse_args() - #load yaml config + # load yaml config if args.config is not None: for conf in [fileName for arg in args.config for fileName in arg]: - LOGGER.debug(f'Config file: {conf}') + LOGGER.debug(f"Config file: {conf}") config.loadConfigYaml(conf) - #Make sure there is at least one policy to look for + # Make sure there is at least one policy to look for if len(config.iamPolicyAttributes) == 0: - raise ValueError(f'No IAM policies defined!') - + raise ValueError(f"No IAM policies defined!") + set_profile(args.profile) validate_credentials(args.region) config.configure_logging(args.enable_logging) return args -if __name__ == "__main__": +if __name__ == "__main__": try: main() except Exception as e: traceback.print_exc() - print(f'ERROR: Unexpected error occurred. {str(e)}', file=sys.stderr) - exit(1) \ No newline at end of file + print(f"ERROR: Unexpected error occurred. {str(e)}", file=sys.stderr) + exit(1) diff --git a/iam_check/lib/__init__.py b/iam_check/lib/__init__.py index 86e84d2..0a78f93 100644 --- a/iam_check/lib/__init__.py +++ b/iam_check/lib/__init__.py @@ -7,16 +7,16 @@ def default_to_json(value): - if isinstance(value, datetime.date): - return value.isoformat() - else: - return value.__dict__ + if isinstance(value, datetime.date): + return value.isoformat() + else: + return value.__dict__ class InvalidPolicyException(Exception): - def __init__(self, message, policy): - self.message = message - self.policy = policy + def __init__(self, message, policy): + self.message = message + self.policy = policy - def to_string(self): - return f'{self.message}\n{json.dumps(self.policy, default=default_to_json, indent=4)}' \ No newline at end of file + def to_string(self): + return f"{self.message}\n{json.dumps(self.policy, default=default_to_json, indent=4)}" diff --git a/iam_check/lib/account_config.py b/iam_check/lib/account_config.py index 6b97455..4d4d69e 100644 --- a/iam_check/lib/account_config.py +++ b/iam_check/lib/account_config.py @@ -2,8 +2,10 @@ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ + + class AccountConfig: - def __init__(self, partition, region, account_id): - self.Account = account_id - self.Region = region - self.Partition = partition \ No newline at end of file + def __init__(self, partition, region, account_id): + self.Account = account_id + self.Region = region + self.Partition = partition diff --git a/iam_check/lib/findings.py b/iam_check/lib/findings.py index 9810b83..1a334af 100644 --- a/iam_check/lib/findings.py +++ b/iam_check/lib/findings.py @@ -8,75 +8,77 @@ class Finding: - def __init__(self, message, finding_type, policy_name, resource_name, details, code): - self.findingType = finding_type - self.code = code - self.message = message - self.resourceName = resource_name - self.policyName = policy_name - self.details = details + def __init__( + self, message, finding_type, policy_name, resource_name, details, code + ): + self.findingType = finding_type + self.code = code + self.message = message + self.resourceName = resource_name + self.policyName = policy_name + self.details = details class Findings: - """ - Build a findings object from the Access Analyzer response, wrapping the raw findings from Access Analyzer so that - consumers can parse the output in a standardized manner. - """ + """ + Build a findings object from the Access Analyzer response, wrapping the raw findings from Access Analyzer so that + consumers can parse the output in a standardized manner. + """ - def __init__(self): - self.errors = [] - self.security_warnings = [] - self.warnings = [] - self.suggestions = [] + def __init__(self): + self.errors = [] + self.security_warnings = [] + self.warnings = [] + self.suggestions = [] - def add_trust_policy_finding(self, findings, resource_name): - for raw_finding in findings: - message = 'Trust policy allows access from external principals.' - finding = Finding( - message=message, - finding_type='SECURITY_WARNING', - policy_name='TrustPolicy', - resource_name=resource_name, - details=raw_finding, - code='EXTERNAL_PRINCIPAL' - ) + def add_trust_policy_finding(self, findings, resource_name): + for raw_finding in findings: + message = "Trust policy allows access from external principals." + finding = Finding( + message=message, + finding_type="SECURITY_WARNING", + policy_name="TrustPolicy", + resource_name=resource_name, + details=raw_finding, + code="EXTERNAL_PRINCIPAL", + ) - self.security_warnings.append(finding) + self.security_warnings.append(finding) - def add_validation_finding(self, findings, resource_name, policy_name): - for raw_finding in findings: - finding_type = raw_finding['findingType'] + def add_validation_finding(self, findings, resource_name, policy_name): + for raw_finding in findings: + finding_type = raw_finding["findingType"] - finding = Finding( - message=raw_finding['findingDetails'], - finding_type=finding_type, - policy_name=policy_name, - resource_name=resource_name, - details=raw_finding, - code=raw_finding['issueCode'] - ) + finding = Finding( + message=raw_finding["findingDetails"], + finding_type=finding_type, + policy_name=policy_name, + resource_name=resource_name, + details=raw_finding, + code=raw_finding["issueCode"], + ) - if finding_type == 'ERROR': - self.errors.append(finding) - elif finding_type == 'SECURITY_WARNING': - self.security_warnings.append(finding) - elif finding_type == 'SUGGESTION': - self.suggestions.append(finding) - elif finding_type == 'WARNING': - self.warnings.append(finding) + if finding_type == "ERROR": + self.errors.append(finding) + elif finding_type == "SECURITY_WARNING": + self.security_warnings.append(finding) + elif finding_type == "SUGGESTION": + self.suggestions.append(finding) + elif finding_type == "WARNING": + self.warnings.append(finding) - def add_external_principal_finding(self, findings, resource_name, policy_name): - for raw_finding in findings: - finding = Finding( - message='Resource policy allows access from external principals.', - finding_type='SECURITY_WARNING', - policy_name=policy_name, - resource_name=resource_name, - details=raw_finding, - code='EXTERNAL_PRINCIPAL' - ) + def add_external_principal_finding(self, findings, resource_name, policy_name): + for raw_finding in findings: + finding = Finding( + message="Resource policy allows access from external principals.", + finding_type="SECURITY_WARNING", + policy_name=policy_name, + resource_name=resource_name, + details=raw_finding, + code="EXTERNAL_PRINCIPAL", + ) - self.security_warnings.append(finding) + self.security_warnings.append(finding) - def to_json(self): - return json.dumps(self, default=default_to_json, indent=4) \ No newline at end of file + def to_json(self): + return json.dumps(self, default=default_to_json, indent=4) diff --git a/iam_check/lib/iamCheck.py b/iam_check/lib/iamCheck.py index cecaec9..c062a54 100644 --- a/iam_check/lib/iamCheck.py +++ b/iam_check/lib/iamCheck.py @@ -3,12 +3,14 @@ logger = logging.getLogger(__name__) + class IamCheck(abc.ABC): def __init__(self, *args, **kwargs): name = type(self).__name__ - logger.debug(f'Initializing check: {name}') + logger.debug(f"Initializing check: {name}") + + self.exceptions = kwargs.get("exceptions", []) - self.exceptions = kwargs.get('exceptions', []) - @abc.abstractclassmethod - def run(self, policy, **kwargs): pass + def run(self, policy, **kwargs): + pass diff --git a/iam_check/lib/iamPolicy.py b/iam_check/lib/iamPolicy.py index 5a3aaf2..8b7a371 100644 --- a/iam_check/lib/iamPolicy.py +++ b/iam_check/lib/iamPolicy.py @@ -2,7 +2,8 @@ import logging from typing import Optional, Union -LOGGER = logging.getLogger('iam-policy-validator-for-terraform') +LOGGER = logging.getLogger("iam-policy-validator-for-terraform") + class Policy: def __init__(self, **kwargs): @@ -10,14 +11,14 @@ def __init__(self, **kwargs): self._id = "" self._statement = [] - accepted =['Version', 'Id', 'Statement', 'file', 'json'] + accepted = ["Version", "Id", "Statement", "file", "json"] for key, value in kwargs.items(): - if key == 'Version': + if key == "Version": self.setVersion(value) - elif key == 'Id': + elif key == "Id": self.setId(value) - elif key =='Statement': + elif key == "Statement": if isinstance(value, Statement): self.addStatement(value) elif isinstance(value, dict): @@ -29,23 +30,27 @@ def __init__(self, **kwargs): else: self.addStatement(Statement(**statement)) else: - raise ValueError(f'Statements expects list or dictionary') + raise ValueError(f"Statements expects list or dictionary") - elif key == 'file': + elif key == "file": if len(kwargs) != 1: - raise ValueError('Invalid parameter: file can not be used with other arguments') - with open(value, 'r') as fh: + raise ValueError( + "Invalid parameter: file can not be used with other arguments" + ) + with open(value, "r") as fh: data = json.load(fh) self.__init__(**data) - elif key == 'json': + elif key == "json": if len(kwargs) != 1: - raise ValueError('Invalid parameter: file can not be used with other arguments') + raise ValueError( + "Invalid parameter: file can not be used with other arguments" + ) data = json.loads(value) self.__init__(**data) else: - raise ValueError(f'Invalid parameter: {key} not a valid parameter') + raise ValueError(f"Invalid parameter: {key} not a valid parameter") def __eq__(self, o) -> bool: """Overload = operator to test if two policies are the same""" @@ -64,19 +69,19 @@ def __str__(self) -> str: """return a JSON representative of the iam policy""" obj = {} if self._version != "": - obj['Version'] = self._version - + obj["Version"] = self._version + if self._id != "": - obj['Id'] = self._id + obj["Id"] = self._id if len(self._statement) == 1: t = json.loads(str(self._statement[0])) - obj['Statement'] = t + obj["Statement"] = t else: - obj['Statement'] = [] + obj["Statement"] = [] for statement in self._statement: t = json.loads(str(statement)) - obj['Statement'].append(t) + obj["Statement"].append(t) return json.dumps(obj, indent=4) def setId(self, id: str) -> None: @@ -86,7 +91,7 @@ def setVersion(self, version: str) -> None: valid = ["2012-10-17", "2008-10-17"] if version not in valid: - raise ValueError(f'Invalid IAM policy version: {version}') + raise ValueError(f"Invalid IAM policy version: {version}") self._version = version @@ -106,8 +111,9 @@ def getVersion(self) -> str: def getStatement(self) -> str: return self._statement + class Statement: - def __init__(self, json: str= None, **kwargs): + def __init__(self, json: str = None, **kwargs): self._sid = None self._principal = None self._effect = "" @@ -124,37 +130,37 @@ def __init__(self, json: str= None, **kwargs): self.__init__(**data) return - if 'Action' in kwargs and 'NotAction' in kwargs: - raise ValueError('Parameter: Can not use Action with NotAction') - if 'Principal' in kwargs and 'NotPrincipal' in kwargs: - raise ValueError('Parameter: Can not use Action with NotPrincipal') - if 'Resource' in kwargs and 'NotResource' in kwargs: - raise ValueError('Parameter: Can not use Action with NotResource') + if "Action" in kwargs and "NotAction" in kwargs: + raise ValueError("Parameter: Can not use Action with NotAction") + if "Principal" in kwargs and "NotPrincipal" in kwargs: + raise ValueError("Parameter: Can not use Action with NotPrincipal") + if "Resource" in kwargs and "NotResource" in kwargs: + raise ValueError("Parameter: Can not use Action with NotResource") for arg, value in kwargs.items(): - if arg == 'Sid': + if arg == "Sid": self.setSid(value) - elif arg == 'Effect': + elif arg == "Effect": self.setEffect(value) - elif arg == 'Action' or arg == 'NotAction': + elif arg == "Action" or arg == "NotAction": if isinstance(value, list): for action in value: - self.addAction(action, arg == 'NotAction') + self.addAction(action, arg == "NotAction") else: - self.addAction(value, arg == 'NotAction') - elif arg == 'Principal' or arg == 'NotPrincipal' : - self.setPrincipal(value, arg == 'NotPrincipal') - elif arg == 'Resource' or arg == 'NotResource': + self.addAction(value, arg == "NotAction") + elif arg == "Principal" or arg == "NotPrincipal": + self.setPrincipal(value, arg == "NotPrincipal") + elif arg == "Resource" or arg == "NotResource": if isinstance(value, list): for resource in value: - self.addResource(resource, arg == 'NotResource') + self.addResource(resource, arg == "NotResource") else: - self.addResource(value, arg == 'NotResource') - elif arg == 'Condition': + self.addResource(value, arg == "NotResource") + elif arg == "Condition": self._condition = value else: - raise ValueError(f'Invalid parameter: {arg} not a valid parameter') - + raise ValueError(f"Invalid parameter: {arg} not a valid parameter") + def __eq__(self, o) -> bool: if self._notAction != o._notAction: return False @@ -170,55 +176,54 @@ def __eq__(self, o) -> bool: return False if len(self._resource) != len(o._resource): return False - + me = sorted(self._resource) you = sorted(o._resource) if me != you: return False - + me = sorted(self._action) you = sorted(o._action) if me != you: - return False + return False - me = json.dumps(self._principal, sort_keys = True) - me = json.dumps(o._principal, sort_keys = True) + me = json.dumps(self._principal, sort_keys=True) + me = json.dumps(o._principal, sort_keys=True) if me != you: - return False + return False - me = json.dumps(self._condition, sort_keys = True) - me = json.dumps(o._condition, sort_keys = True) + me = json.dumps(self._condition, sort_keys=True) + me = json.dumps(o._condition, sort_keys=True) if me != you: - return False + return False return True def __str__(self) -> str: obj = {} if self._sid is not None: - obj['Sid'] = self._sid + obj["Sid"] = self._sid if self._principal is not None: - key = 'NotPrincipal' if self._notPrincipal else 'Principal' + key = "NotPrincipal" if self._notPrincipal else "Principal" obj[key] = self._principal if self._effect != "": - obj['Effect'] = self._effect + obj["Effect"] = self._effect if self._condition is not None: - obj['Condition'] = self._condition - - key = 'NotAction' if self._notAction else 'Action' + obj["Condition"] = self._condition + + key = "NotAction" if self._notAction else "Action" if len(self._action) == 1: obj[key] = self._action[0] else: obj[key] = self._action - key = 'NotResource' if self._notResource else 'Resource' + key = "NotResource" if self._notResource else "Resource" if len(self._resource) == 1: obj[key] = self._resource[0] else: obj[key] = self._resource - - return json.dumps(obj, indent=4, default=str) + return json.dumps(obj, indent=4, default=str) def setSid(self, sid: Optional[str] = None): self._sid = sid @@ -226,24 +231,26 @@ def setSid(self, sid: Optional[str] = None): def setEffect(self, effect: Optional[str] = None): valid = [None, "Allow", "Deny"] if effect not in valid: - raise ValueError(f'Invalid effect: {effect}') + raise ValueError(f"Invalid effect: {effect}") self._effect = effect - - def setPrincipal(self, principal: Union[str, dict, None]= None, NotPrincipal = False): + + def setPrincipal( + self, principal: Union[str, dict, None] = None, NotPrincipal=False + ): if self._principal is not None and self.NotPrincipal != NotPrincipal: - raise ValueError(f'Can not use Principal with NotPrincipal') + raise ValueError(f"Can not use Principal with NotPrincipal") self._notPrincipal = NotPrincipal self._principal = principal - def addAction(self, action: str, notAction = False): + def addAction(self, action: str, notAction=False): if len(self._action) > 0 and self._notAction != notAction: - raise ValueError(f'Can not use Action with NotAction') + raise ValueError(f"Can not use Action with NotAction") self._notAction = notAction self._action.append(action) - def addResource(self, resource: str, notResource = False): + def addResource(self, resource: str, notResource=False): if len(self._resource) > 0 and self._notResource != notResource: - raise ValueError(f'Can not use Resource with NotResource') + raise ValueError(f"Can not use Resource with NotResource") self._notResource = notResource if resource not in self._resource: self._resource.append(resource) @@ -253,10 +260,10 @@ def deleteAction(self, action: str): def deleteResource(self, resource: str): self._resource = [r for r in self._resource if r != resource] - + def getSid(self): return self._sid - + def getEffect(self): return self._effect @@ -264,7 +271,7 @@ def getAction(self): if self._notAction: return None return self._action - + def getNotAction(self): if self._notAction: return self._action @@ -275,7 +282,7 @@ def getPrincipal(self): return None return self._principal - def listPrincipal(self, notPrincipal = False): + def listPrincipal(self, notPrincipal=False): if notPrincipal != self._notPrincipal: return [None] principal = self._principal @@ -287,30 +294,30 @@ def listPrincipal(self, notPrincipal = False): return ["*"] principals = [] - for svc,ids in principal.items(): - if isinstance(ids,str): - principals.append(f'{svc}:{ids}') + for svc, ids in principal.items(): + if isinstance(ids, str): + principals.append(f"{svc}:{ids}") continue for id in ids: - principals.append(f'{svc}:{ids}') + principals.append(f"{svc}:{ids}") return principals def getNotPrincipal(self): if self._notPrincipal: return self._principal return None - + def getResource(self): if self._notResource: return None - return self._resource + return self._resource def getNotResource(self): if self._notResource: - return self._resource + return self._resource return None def getCondition(self): if self._condition: return self._condition - return None \ No newline at end of file + return None diff --git a/iam_check/lib/iamcheck_AccessAnalyzer.py b/iam_check/lib/iamcheck_AccessAnalyzer.py index 0c0e385..4f28b1c 100644 --- a/iam_check/lib/iamcheck_AccessAnalyzer.py +++ b/iam_check/lib/iamcheck_AccessAnalyzer.py @@ -1,58 +1,68 @@ import logging + +import iam_check.config as config + from ..client import build -from .findings import Findings from . import iamPolicy -import iam_check.config as config -LOGGER = logging.getLogger('iam-policy-validator-for-terraform') +from .findings import Findings + +LOGGER = logging.getLogger("iam-policy-validator-for-terraform") # class AccessAnalyzer(iamCheck.IamCheck): class Validator: def __init__(self, account_id, region, partition): self.findings = Findings() - self.access_analyzer_name = 'AnalyzerCreatedByCfnIAMPolicyValidator' + self.access_analyzer_name = "AnalyzerCreatedByCfnIAMPolicyValidator" self.analyzer_arn = None - self.client = build('accessanalyzer', region) - # preview builders are used to build the access preview configuration for an individual resource type - # a preview builder must be added to add support for access previews for a given resource -# self.preview_builders = { -# 'AWS::SQS::Queue': SqsQueuePreviewBuilder(account_id, region, partition), -# 'AWS::KMS::Key': KmsKeyPreviewBuilder(account_id, region, partition), -# 'AWS::S3::AccessPoint': S3SingleRegionAccessPointPreviewBuilder(account_id, region, partition), -# 'AWS::S3::MultiRegionAccessPoint': S3MultiRegionAccessPointPreviewBuilder(account_id, partition), -# 'AWS::S3::Bucket': S3BucketPreviewBuilder(region, partition), -# 'AWS::IAM::Role::TrustPolicy': RoleTrustPolicyPreviewBuilder(account_id, partition), -# 'AWS::SecretsManager::Secret': SecretsManagerSecretPreviewBuilder(account_id, region, partition) -# } - # maps the resource type to the parameter for validate_policy that enables service specific policy validation - # not all services have service specific policy validation. The names may be identical for now, but we don't - # want to rely on that - #to move this to config file + self.client = build("accessanalyzer", region) + # preview builders are used to build the access preview configuration for an individual resource type + # a preview builder must be added to add support for access previews for a given resource + # self.preview_builders = { + # 'AWS::SQS::Queue': SqsQueuePreviewBuilder(account_id, region, partition), + # 'AWS::KMS::Key': KmsKeyPreviewBuilder(account_id, region, partition), + # 'AWS::S3::AccessPoint': S3SingleRegionAccessPointPreviewBuilder(account_id, region, partition), + # 'AWS::S3::MultiRegionAccessPoint': S3MultiRegionAccessPointPreviewBuilder(account_id, partition), + # 'AWS::S3::Bucket': S3BucketPreviewBuilder(region, partition), + # 'AWS::IAM::Role::TrustPolicy': RoleTrustPolicyPreviewBuilder(account_id, partition), + # 'AWS::SecretsManager::Secret': SecretsManagerSecretPreviewBuilder(account_id, region, partition) + # } + # maps the resource type to the parameter for validate_policy that enables service specific policy validation + # not all services have service specific policy validation. The names may be identical for now, but we don't + # want to rely on that + # to move this to config file self.maximum_number_of_access_preview_attempts = 150 - + def run(self, plan): policies = plan.findPolicies() for ref, policy in policies.items(): - LOGGER.info(f'check policy at: {ref}') - policy_resource_type=ref.split('.')[0] - policy_name = '.'.join(ref.split('.')[0:-1]) + LOGGER.info(f"check policy at: {ref}") + policy_resource_type = ref.split(".")[0] + policy_name = ".".join(ref.split(".")[0:-1]) resource_name = plan.getResourceName(policy_name) p = iamPolicy.Policy(json=policy) - LOGGER.info(f'start checking policy:{p}') - policyType='IDENTITY_POLICY' + LOGGER.info(f"start checking policy:{p}") + policyType = "IDENTITY_POLICY" for statement in p.getStatement(): - if statement.getPrincipal() != None or statement.getNotPrincipal() != None: - policyType='RESOURCE_POLICY' + if ( + statement.getPrincipal() != None + or statement.getNotPrincipal() != None + ): + policyType = "RESOURCE_POLICY" continue - if policy_resource_type not in config.validatePolicyResourceType : - response = self.client.validate_policy(policyDocument=str(p),policyType=policyType) + if policy_resource_type not in config.validatePolicyResourceType: + response = self.client.validate_policy( + policyDocument=str(p), policyType=policyType + ) else: - policy_resource_type = config.validatePolicyResourceType[policy_resource_type] + policy_resource_type = config.validatePolicyResourceType[ + policy_resource_type + ] response = self.client.validate_policy( policyDocument=str(p), policyType=policyType, - validatePolicyResourceType=policy_resource_type + validatePolicyResourceType=policy_resource_type, ) - validation_findings = response['findings'] - self.findings.add_validation_finding(validation_findings, resource_name, policy_name) - - + validation_findings = response["findings"] + self.findings.add_validation_finding( + validation_findings, resource_name, policy_name + ) diff --git a/iam_check/lib/reporter.py b/iam_check/lib/reporter.py index e2c3abf..c6312ab 100644 --- a/iam_check/lib/reporter.py +++ b/iam_check/lib/reporter.py @@ -4,182 +4,207 @@ """ import json -from . import default_to_json from ..tools import regex_patterns +from . import default_to_json -default_finding_types_that_are_blocking = ['ERROR', 'SECURITY_WARNING'] +default_finding_types_that_are_blocking = ["ERROR", "SECURITY_WARNING"] class Reporter: - """ - Determines what findings should be reported to the end user based on parameters provided when starting validation. - """ - - def __init__(self, findings_to_ignore, finding_types_that_are_blocking, allowed_external_principals): - self.blocking_findings = [] - self.nonblocking_findings = [] - self.findings_to_ignore = findings_to_ignore - self.finding_types_that_are_blocking = finding_types_that_are_blocking - self.allowed_external_principals = allowed_external_principals - - def build_report_from(self, findings): - self._filter_overridden_findings(findings) - return Report(self.blocking_findings, self.nonblocking_findings) - - def _filter_overridden_findings(self, findings): - for finding in findings.errors + findings.security_warnings + findings.warnings + findings.suggestions: - overridden = self._is_finding_ignored(finding) - if overridden: - continue - - overridden = self._is_external_principal_allowed(finding) - if overridden: - continue - - self._classify_as_blocking_or_non_blocking(finding) - - def _is_finding_ignored(self, finding): - if self.findings_to_ignore is None: - return False - - is_ignored = any([finding_to_ignore.matches(finding) for finding_to_ignore in self.findings_to_ignore]) - if is_ignored: - return True - - return False - - def _is_external_principal_allowed(self, finding): - if self.allowed_external_principals is None: - return False - - is_allowed = any([principal_to_allow.matches(finding) for principal_to_allow in self.allowed_external_principals]) - if is_allowed: - return True - - return False - - def _classify_as_blocking_or_non_blocking(self, finding): - # if none is present it overrides all others - if 'NONE' in self.finding_types_that_are_blocking: - self.nonblocking_findings.append(finding) - return - - if finding.findingType.upper() in self.finding_types_that_are_blocking: - self.blocking_findings.append(finding) - else: - self.nonblocking_findings.append(finding) + """ + Determines what findings should be reported to the end user based on parameters provided when starting validation. + """ + + def __init__( + self, + findings_to_ignore, + finding_types_that_are_blocking, + allowed_external_principals, + ): + self.blocking_findings = [] + self.nonblocking_findings = [] + self.findings_to_ignore = findings_to_ignore + self.finding_types_that_are_blocking = finding_types_that_are_blocking + self.allowed_external_principals = allowed_external_principals + + def build_report_from(self, findings): + self._filter_overridden_findings(findings) + return Report(self.blocking_findings, self.nonblocking_findings) + + def _filter_overridden_findings(self, findings): + for finding in ( + findings.errors + + findings.security_warnings + + findings.warnings + + findings.suggestions + ): + overridden = self._is_finding_ignored(finding) + if overridden: + continue + + overridden = self._is_external_principal_allowed(finding) + if overridden: + continue + + self._classify_as_blocking_or_non_blocking(finding) + + def _is_finding_ignored(self, finding): + if self.findings_to_ignore is None: + return False + + is_ignored = any( + [ + finding_to_ignore.matches(finding) + for finding_to_ignore in self.findings_to_ignore + ] + ) + if is_ignored: + return True + + return False + + def _is_external_principal_allowed(self, finding): + if self.allowed_external_principals is None: + return False + + is_allowed = any( + [ + principal_to_allow.matches(finding) + for principal_to_allow in self.allowed_external_principals + ] + ) + if is_allowed: + return True + + return False + + def _classify_as_blocking_or_non_blocking(self, finding): + # if none is present it overrides all others + if "NONE" in self.finding_types_that_are_blocking: + self.nonblocking_findings.append(finding) + return + + if finding.findingType.upper() in self.finding_types_that_are_blocking: + self.blocking_findings.append(finding) + else: + self.nonblocking_findings.append(finding) class ResourceOrCodeFindingToIgnore: - def __init__(self, value): - self.value = value + def __init__(self, value): + self.value = value - def matches(self, finding): - return finding.resourceName.lower() == self.value.lower() or \ - finding.code.lower() == self.value.lower() + def matches(self, finding): + return ( + finding.resourceName.lower() == self.value.lower() + or finding.code.lower() == self.value.lower() + ) - def __eq__(self, other): - if not isinstance(other, ResourceOrCodeFindingToIgnore): - return False + def __eq__(self, other): + if not isinstance(other, ResourceOrCodeFindingToIgnore): + return False - return self.value == other.value + return self.value == other.value class ResourceAndCodeFindingToIgnore: - def __init__(self, resource_name, code): - self.resource_name = resource_name - self.code = code + def __init__(self, resource_name, code): + self.resource_name = resource_name + self.code = code - def matches(self, finding): - return finding.resourceName.lower() == self.resource_name.lower() and \ - finding.code.lower() == self.code.lower() + def matches(self, finding): + return ( + finding.resourceName.lower() == self.resource_name.lower() + and finding.code.lower() == self.code.lower() + ) - def __eq__(self, other): - if not isinstance(other, ResourceAndCodeFindingToIgnore): - return False + def __eq__(self, other): + if not isinstance(other, ResourceAndCodeFindingToIgnore): + return False - return self.resource_name == other.resource_name and \ - self.code == other.code + return self.resource_name == other.resource_name and self.code == other.code def _get_principal_from_finding(finding): - if finding.code != 'EXTERNAL_PRINCIPAL': - return None + if finding.code != "EXTERNAL_PRINCIPAL": + return None - principal = finding.details.get('principal', {}) - aws_principal = principal.get('AWS') - if aws_principal is not None: - return aws_principal + principal = finding.details.get("principal", {}) + aws_principal = principal.get("AWS") + if aws_principal is not None: + return aws_principal - federated_principal = principal.get('Federated') - if federated_principal is not None: - return federated_principal + federated_principal = principal.get("Federated") + if federated_principal is not None: + return federated_principal - return principal.get('CanonicalUser') + return principal.get("CanonicalUser") class AllowedExternalPrincipal: - def __init__(self, principal): - self.principal = principal + def __init__(self, principal): + self.principal = principal - def matches(self, finding): - principal = _get_principal_from_finding(finding) - if principal is None: - return False + def matches(self, finding): + principal = _get_principal_from_finding(finding) + if principal is None: + return False - match = regex_patterns.generic_arn_pattern.match(principal) - if match is None: - # the principal may or may not be an ARN, if it's not an ARN, compare the raw values - return principal == self.principal + match = regex_patterns.generic_arn_pattern.match(principal) + if match is None: + # the principal may or may not be an ARN, if it's not an ARN, compare the raw values + return principal == self.principal - # if principal is an ARN, grab the account ID from the ARN to compare - account_id = match.group(1) - return account_id == self.principal + # if principal is an ARN, grab the account ID from the ARN to compare + account_id = match.group(1) + return account_id == self.principal - def __eq__(self, other): - if not isinstance(other, AllowedExternalPrincipal): - return False + def __eq__(self, other): + if not isinstance(other, AllowedExternalPrincipal): + return False - return self.principal == other.principal + return self.principal == other.principal class AllowedExternalArn: - def __init__(self, arn): - self.arn = arn + def __init__(self, arn): + self.arn = arn - def matches(self, finding): - principal = _get_principal_from_finding(finding) - if principal is None: - return False + def matches(self, finding): + principal = _get_principal_from_finding(finding) + if principal is None: + return False - return principal == self.arn + return principal == self.arn - def __eq__(self, other): - if not isinstance(other, AllowedExternalArn): - return False + def __eq__(self, other): + if not isinstance(other, AllowedExternalArn): + return False - return self.arn == other.arn + return self.arn == other.arn class Report: - def __init__(self, blocking_findings, nonblocking_findings): - self.blocking_findings = blocking_findings - self.nonblocking_findings = nonblocking_findings - - def has_blocking_findings(self): - return len(self.blocking_findings) > 0 - - def to_json(self): - return { - 'BlockingFindings': [vars(finding) for finding in self.blocking_findings], - 'NonBlockingFindings': [vars(finding) for finding in self.nonblocking_findings] - } - - def print(self): - report = self.to_json() - report_as_json_string = self._to_json_string(report) - print(report_as_json_string) - - @staticmethod - def _to_json_string(obj): - return json.dumps(obj, default=default_to_json, indent=4) \ No newline at end of file + def __init__(self, blocking_findings, nonblocking_findings): + self.blocking_findings = blocking_findings + self.nonblocking_findings = nonblocking_findings + + def has_blocking_findings(self): + return len(self.blocking_findings) > 0 + + def to_json(self): + return { + "BlockingFindings": [vars(finding) for finding in self.blocking_findings], + "NonBlockingFindings": [ + vars(finding) for finding in self.nonblocking_findings + ], + } + + def print(self): + report = self.to_json() + report_as_json_string = self._to_json_string(report) + print(report_as_json_string) + + @staticmethod + def _to_json_string(obj): + return json.dumps(obj, default=default_to_json, indent=4) diff --git a/iam_check/lib/tfPlan.py b/iam_check/lib/tfPlan.py index 9a302f9..4eb6836 100644 --- a/iam_check/lib/tfPlan.py +++ b/iam_check/lib/tfPlan.py @@ -1,32 +1,35 @@ -#from ..config import iamPolicyAttributes, arnServiceMap, awsAccount -import iam_check.config as config +# from ..config import iamPolicyAttributes, arnServiceMap, awsAccount import enum -import logging import json +import logging import re +import iam_check.config as config + +LOGGER = logging.getLogger("iam-policy-validator-for-terraform") -LOGGER = logging.getLogger('iam-policy-validator-for-terraform') class TerraformState: def __init__(self, **kwargs) -> None: """Not implemented since it only supports planned changes""" self.format_version = None self.terraform_version = None - self.values = None + self.values = None for arg, value in kwargs.items(): - if arg == 'format_version': + if arg == "format_version": self.format_version = value - elif arg == 'terraform_version': + elif arg == "terraform_version": self.terraform_version = value else: setattr(self, arg, value) + def __str__(self): return json.dumps(vars(self), indent=4) - def getValue (self, key): - raise NotImplementedError('Not implemented as we only support planned changes') + def getValue(self, key): + raise NotImplementedError("Not implemented as we only support planned changes") + class TerraformPlan: def __init__(self, **kwargs) -> None: @@ -40,115 +43,118 @@ def __init__(self, **kwargs) -> None: self.output_changes = [] for arg, value in kwargs.items(): - if arg == 'format_version': + if arg == "format_version": self.format_version = value - elif arg == 'prior_state': + elif arg == "prior_state": self.prior_state = TerraformState(**value) - elif arg == 'configuration': + elif arg == "configuration": self.configuration = TerraformConfig(**value) - elif arg == 'planned_values': + elif arg == "planned_values": if isinstance(value, TerraformValues): self.planned_values = value else: self.planned_values = TerraformValues(**value) - elif arg == 'proposed_unknown': + elif arg == "proposed_unknown": self.proposed_unknown = TerraformValues(**value) - elif arg == 'variables': + elif arg == "variables": self.variables = value - elif arg == 'resource_changes': + elif arg == "resource_changes": self.resource_changes = [TerraformResourceChange(**r) for r in value] - elif arg == 'output_changes': + elif arg == "output_changes": self.output_changes = value - + def __str__(self) -> str: obj = {} - obj['format_version'] = self.format_version - obj['variables'] = self.variables - obj['resource_changes'] = [json.loads(str(r)) for r in self.resource_changes] - obj['output_changes'] = self.output_changes - + obj["format_version"] = self.format_version + obj["variables"] = self.variables + obj["resource_changes"] = [json.loads(str(r)) for r in self.resource_changes] + obj["output_changes"] = self.output_changes + if self.prior_state is not None: - obj['prior_state'] = json.loads(str(self.prior_state)) + obj["prior_state"] = json.loads(str(self.prior_state)) if self.configuration is not None: - obj['configuration'] = json.loads(str(self.configuration)) + obj["configuration"] = json.loads(str(self.configuration)) if self.planned_values is not None: - obj['planned_values'] = json.loads(str(self.planned_values)) + obj["planned_values"] = json.loads(str(self.planned_values)) if self.proposed_unknown is not None: - obj['proposed_unknown'] = json.loads(str(self.proposed_unknown)) + obj["proposed_unknown"] = json.loads(str(self.proposed_unknown)) return json.dumps(obj, indent=4) def findPolicies(self): - logging.debug('generating a list of policies in plan') + logging.debug("generating a list of policies in plan") policies = {} resources = self.listResources() for r in resources: - resourceType = r.split('.')[-2] + resourceType = r.split(".")[-2] if resourceType not in config.iamPolicyAttributes: continue - + attributes = config.iamPolicyAttributes[resourceType] if isinstance(attributes, str): attributes = [attributes] for attribute in attributes: # check if attribute is a base one or inside a block - if '.' not in attribute: - ref = f'{r}.{attribute}' + if "." not in attribute: + ref = f"{r}.{attribute}" try: policy = self.getValue(ref) if policy is None or policy == "": - LOGGER.info(f'No policy found at: {ref}') + LOGGER.info(f"No policy found at: {ref}") else: policies[ref] = policy except KeyError as e: - LOGGER.info(f'No policy found at: {ref}') + LOGGER.info(f"No policy found at: {ref}") continue else: - block, key = attribute.split('.') - block_ref = f'{r}.{block}' + block, key = attribute.split(".") + block_ref = f"{r}.{block}" try: data = self.getValue(block_ref) except KeyError as e: - LOGGER.info(f'No policy found at: {ref}') + LOGGER.info(f"No policy found at: {ref}") continue - - if not isinstance (data, list): - ref = f'{block_ref}.key' + + if not isinstance(data, list): + ref = f"{block_ref}.key" policy = data[key] if policy is None or policy == "": - LOGGER.info(f'No policy found at: {ref}') + LOGGER.info(f"No policy found at: {ref}") else: policies[ref] = policy else: for index, block in enumerate(data): - ref = f'{block_ref}.{index}.{key}' - if "policy" in block: + ref = f"{block_ref}.{index}.{key}" + if "policy" in block: policy = block[key] if policy is None or policy == "": - LOGGER.info(f'No policy found at: {ref}') + LOGGER.info(f"No policy found at: {ref}") else: policies[ref] = policy else: - LOGGER.info(f'No policy found at: {ref}') + LOGGER.info(f"No policy found at: {ref}") for ref in policies.keys(): - LOGGER.debug(f'found policy at {ref}') - + LOGGER.debug(f"found policy at {ref}") + return policies + def getResourceName(self, ref): if isinstance(ref, TerraformResource): - resource = ref + resource = ref ref = resource.address else: resource = self.getValue(ref) if resource.type not in config.arnServiceMap: - raise TypeError(f'Add resource type {resource.type} in the configuration arnServiceMap') + raise TypeError( + f"Add resource type {resource.type} in the configuration arnServiceMap" + ) key = config.arnServiceMap[resource.type] terraformKey = key default = None - if '?' in key: - terraformKey, default = key.split('?') + if "?" in key: + terraformKey, default = key.split("?") try: - name = self.getValue(f'{ref}.{terraformKey}') + name = self.getValue(f"{ref}.{terraformKey}") except KeyError as e: if default is None: raise e @@ -157,27 +163,29 @@ def getResourceName(self, ref): def getFakeArn(self, ref): if isinstance(ref, TerraformResource): - resource = ref + resource = ref ref = resource.address else: resource = self.getValue(ref) if resource.type not in config.arnServiceMap: - raise TypeError(f'Add resource type {resource.type} in the configuration arnServiceMap') + raise TypeError( + f"Add resource type {resource.type} in the configuration arnServiceMap" + ) arn_pattern = config.arnServiceMap[resource.type] - arnComponents = arn_pattern.split(':') + arnComponents = arn_pattern.split(":") arnComponents[4] = config.awsAccount - arn_pattern = ':'.join(arnComponents) - arn_keys = re.findall('{([^}]+)}', arn_pattern) + arn_pattern = ":".join(arnComponents) + arn_keys = re.findall("{([^}]+)}", arn_pattern) print(arn_keys) arn_dict = {} for key in arn_keys: terraformKey = key default = None - if '?' in key: - terraformKey, default = key.split('?') + if "?" in key: + terraformKey, default = key.split("?") try: - arn_dict[key] = self.getValue(f'{ref}.{terraformKey}') + arn_dict[key] = self.getValue(f"{ref}.{terraformKey}") print(terraformKey) except KeyError as e: if default is None: @@ -187,24 +195,25 @@ def getFakeArn(self, ref): return arn_pattern.format(**arn_dict) def getValue(self, ref): - """ interpolate a given resource address and attribute based on computed values and return the results - """ + """interpolate a given resource address and attribute based on computed values and return the results""" found = False result = None # first get the original values - try: + try: result = self.prior_state.getValue(ref) found = True - except: pass + except: + pass # then get the planned value - try: + try: result = self.planned_values.getValue(ref) found = True - except: pass + except: + pass if found is False: - raise KeyError(f'Invalid Key: {ref}') + raise KeyError(f"Invalid Key: {ref}") if isinstance(result, str): return self.parserHclString(result) @@ -213,11 +222,11 @@ def getValue(self, ref): def listResources(self): "created or updated resources in plan" return self.planned_values.listResources() - + def parserHclString(self, input): "This function will only attempt direct lookups" - interpolation = '${}' - directive = '${}' + interpolation = "${}" + directive = "${}" result = "" class strFsm(enum.Enum): @@ -226,7 +235,7 @@ class strFsm(enum.Enum): startDirective = enum.auto() parseInterpolation = enum.auto() parseDirective = enum.auto() - + state = strFsm.parseString for char in input: @@ -236,11 +245,11 @@ class strFsm(enum.Enum): elif char == directive[0]: state = strFsm.startDirective else: - result +=char - elif state is strFsm.startInterpolation: + result += char + elif state is strFsm.startInterpolation: if char == interpolation[0]: state = strFsm.parseString - result +=char + result += char elif char == interpolation[1]: state = strFsm.parseInterpolation expression = "" @@ -251,7 +260,7 @@ class strFsm(enum.Enum): elif state is strFsm.startDirective: if char == directive[0]: state = strFsm.parseString - result +=char + result += char elif char == directive[1]: state = strFsm.parseDirective expression = "" @@ -265,21 +274,25 @@ class strFsm(enum.Enum): try: result += self.getValue(expression) except: - LOGGER.warning(f'string interpolation not available: {expression}') - expression = '${' + expression + '}' + LOGGER.warning( + f"string interpolation not available: {expression}" + ) + expression = "${" + expression + "}" result += expression else: expression += char elif state is strFsm.parseDirective: if char == directive[2]: - state = strFsm.parseString - expression = '${' + expression + '}' + state = strFsm.parseString + expression = "${" + expression + "}" result += expression - LOGGER.warning(f'Cannot process string directive: {directive}') + LOGGER.warning(f"Cannot process string directive: {directive}") else: expression += char else: - raise RuntimeError(f'FSM for parsing HCL string reached an unknown state: {state}') + raise RuntimeError( + f"FSM for parsing HCL string reached an unknown state: {state}" + ) return result @@ -289,25 +302,26 @@ def __init__(self, **kwargs) -> None: self.root_module = None for arg, value in kwargs.items(): - if arg == 'outputs': + if arg == "outputs": self.outputs = value - elif arg == 'root_module': + elif arg == "root_module": if isinstance(value, TerraformModule): self.root_module = value else: self.root_module = TerraformModule(**value) else: - raise ValueError(f'TerraformVaules: Unknown parameter: {arg}') + raise ValueError(f"TerraformVaules: Unknown parameter: {arg}") + def __eq__(self, o: object) -> bool: return vars(self) == vars(o) - + def __str__(self): - obj = {k: json.loads(str(v)) for k,v in vars(self).items() if v is not None} + obj = {k: json.loads(str(v)) for k, v in vars(self).items() if v is not None} return json.dumps(obj, indent=4) - + def getValue(self, key): return self.root_module.getValue(key) - + def addResource(self, r): self.root_module.addResource(r) @@ -317,6 +331,7 @@ def addChildModule(self, m): def listResources(self): return self.root_module.listResources() + class TerraformModule: def __init__(self, **kwargs) -> None: self.address = None @@ -331,13 +346,13 @@ def __init__(self, **kwargs) -> None: if isinstance(resource, TerraformResource): self.resources.append(resource) else: - self.resources.append( TerraformResource(**resource) ) + self.resources.append(TerraformResource(**resource)) elif arg == "child_modules": for child in value: self.child_modules.append(TerraformModule(**child)) else: - raise ValueError(f'Unkown parameter: {arg}') - + raise ValueError(f"Unkown parameter: {arg}") + def __eq__(self, o: object) -> bool: if self.address != o.address: return False @@ -346,50 +361,50 @@ def __eq__(self, o: object) -> bool: if self.child_modules != o.child_modules: return False return True - + def __str__(self): obj = {} if self.address is not None: - obj['address'] = self.address + obj["address"] = self.address - obj['resources'] = [] + obj["resources"] = [] for r in self.resources: data = json.loads(str(r)) - obj['resources'].append(data) - - obj['child_modules'] = [] + obj["resources"].append(data) + + obj["child_modules"] = [] for m in self.child_modules: data = json.loads(str(m)) - obj['child_modules'].append(data) + obj["child_modules"].append(data) return json.dumps(obj, indent=4) def addResource(self, r): if not isinstance(r, TerraformResource): - raise ValueError('Must be a TerraformResource') + raise ValueError("Must be a TerraformResource") self.resources.append(r) - + def addChildModule(self, m): if not isinstance(m, TerraformModule): - raise ValueError('Must be a TerraformModule') + raise ValueError("Must be a TerraformModule") self.child_modules.append(m) def getValue(self, key): if key == self.address: return self - + for r in self.resources: if r.address == key: return r - if key.startswith(f'{r.address}.'): + if key.startswith(f"{r.address}."): return r.getValue(key) - + for m in self.child_modules: if m.address == key: return key - if key.startswith(f'{r.address}.'): + if key.startswith(f"{r.address}."): return r.getValue(key) - raise KeyError(f'Invalid terraform address: {key}') + raise KeyError(f"Invalid terraform address: {key}") def listResources(self): result = [r.address for r in self.resources] @@ -397,10 +412,11 @@ def listResources(self): result = result + m.listResources() return result + class TerraformResource: def __init__(self, **kwargs) -> None: self.address = None - self.mode = None + self.mode = None self.type = None self.name = None self.index = None @@ -411,57 +427,58 @@ def __init__(self, **kwargs) -> None: self.sensitive_values = {} for arg, value in kwargs.items(): - if arg == 'address': + if arg == "address": self.address = value - elif arg == 'mode': - if value not in ['managed', 'data']: - raise ValueError(f'TerraformResource: Invalid mode: {value}') + elif arg == "mode": + if value not in ["managed", "data"]: + raise ValueError(f"TerraformResource: Invalid mode: {value}") self.mode = value - elif arg == 'type': + elif arg == "type": self.type = value - elif arg == 'name': + elif arg == "name": self.name = value - elif arg == 'index': + elif arg == "index": self.index = value - elif arg == 'change': + elif arg == "change": self.change = TerraformChange(**value) - elif arg == 'provider_name': + elif arg == "provider_name": self.provider_name = value - elif arg == 'schema_version': + elif arg == "schema_version": self.schema_version = value - elif arg == 'values': + elif arg == "values": self.values = value - elif arg == 'sensitive_values': + elif arg == "sensitive_values": self.sensitive_values = value else: - raise ValueError(f'TerraformResource: Invalid parameter: {arg}') + raise ValueError(f"TerraformResource: Invalid parameter: {arg}") def __eq__(self, o: object) -> bool: return vars(self) == vars(o) def __str__(self): - obj = {k:v for k,v in vars(self).items() if v is not None} + obj = {k: v for k, v in vars(self).items() if v is not None} if self.change is not None: - obj['change'] = json.loads(str(self.change)) + obj["change"] = json.loads(str(self.change)) return json.dumps(obj, indent=4) def getValue(self, key): if key.startswith(self.address): - key = key[len(self.address)+1:] + key = key[len(self.address) + 1 :] if key in self.values: return self.values[key] if key in self.sensitive_values: return self.sensitive_values[key] - raise KeyError(f'TerraformResources: {self.address}.{key}') - - def setValue(self, key, value, sensitive = False): + raise KeyError(f"TerraformResources: {self.address}.{key}") + + def setValue(self, key, value, sensitive=False): if sensitive: self.sensitive_values[key] = value else: self.values[key] = value + class TerraformChange: def __init__(self, **kwargs) -> None: """Not implemented since it is not needed to find policy strings""" @@ -471,15 +488,17 @@ def __str__(self): obj = vars(self) return json.dumps(obj, indent=4) + class TerraformResourceChange: def __init__(self, **kwargs) -> None: """Not implemented since it is not needed to find policy strings""" pass def __str__(self): - obj = {k: json.loads(str(v)) for k,v in vars(self).items() if v is not None} + obj = {k: json.loads(str(v)) for k, v in vars(self).items() if v is not None} return json.dumps(obj, indent=4) - + + class TerraformConfig: def __init__(self, **kwargs) -> None: """Not implemented since it is not needed to find policy strings""" @@ -487,52 +506,52 @@ def __init__(self, **kwargs) -> None: self.root_module = None def __str__(self): - obj = {k: json.loads(str(v)) for k,v in vars(self).items() if v is not None} + obj = {k: json.loads(str(v)) for k, v in vars(self).items() if v is not None} return json.dumps(obj, indent=4) + def plan_from_stdout(input): """attempt to convert output from `terraform show` into a data structure. Only compatiable with 0.11 and earlier versions""" data = {} resource = None operation = None - operators = ['+', '-', '~', '+/-'] + operators = ["+", "-", "~", "+/-"] - for line in input.split('\n'): + for line in input.split("\n"): # remove color encoding - line = re.sub(r'\033\[[0-9;]*m', '', line) + line = re.sub(r"\033\[[0-9;]*m", "", line) line = line.strip() - #blank lines indicate done processing previous resource + # blank lines indicate done processing previous resource if len(line) == 0: resource = None operation = None continue # determine the operation on the given resource - key, value = line.split(' ', 1) + key, value = line.split(" ", 1) if operation is None: if key not in operators: - raise ValueError('Unkown operations: {line}') + raise ValueError("Unkown operations: {line}") operation = key resource = value - if operation != '-': + if operation != "-": data[resource] = {} continue - elif operation == '-': + elif operation == "-": continue - key = key.strip() - key = key.rstrip(':') + key = key.rstrip(":") value = value.strip() - if operation in ['+/-', '~']: - value = value.split(' => ')[1] - + if operation in ["+/-", "~"]: + value = value.split(" => ")[1] + # if it is a string it needs to be decoded if value[0] == '"': value = value.strip('"') - value = value.encode('utf-8').decode('unicode_escape') + value = value.encode("utf-8").decode("unicode_escape") data[resource][key] = value @@ -541,8 +560,8 @@ def plan_from_stdout(input): for resource, attributes in data.items(): r = TerraformResource() r.address = resource - r.type = resource.split('.')[-2] - r.name = resource.split('.')[-1] + r.type = resource.split(".")[-2] + r.name = resource.split(".")[-1] for k, v in attributes.items(): if v == "": @@ -551,7 +570,7 @@ def plan_from_stdout(input): root_module.addResource(r) - values = TerraformValues(root_module = root_module) - plan = TerraformPlan(planned_values = values) + values = TerraformValues(root_module=root_module) + plan = TerraformPlan(planned_values=values) return plan diff --git a/iam_check/parameters.py b/iam_check/parameters.py index 2cd65d1..9c41732 100644 --- a/iam_check/parameters.py +++ b/iam_check/parameters.py @@ -3,47 +3,62 @@ SPDX-License-Identifier: MIT-0 """ from argparse import ArgumentTypeError + from botocore.config import Config from botocore.exceptions import InvalidRegionError from botocore.utils import validate_region_name from .client import build + # from cfn_policy_validator.application_error import ApplicationError + def validate_region(region): - try: - # this call validates that the region name is valid, but does not validate that the region actually exists - validate_region_name(region) - except InvalidRegionError: - raise ArgumentTypeError(f'Invalid region name: {region}.') - return region + try: + # this call validates that the region name is valid, but does not validate that the region actually exists + validate_region_name(region) + except InvalidRegionError: + raise ArgumentTypeError(f"Invalid region name: {region}.") + return region + def validate_credentials(region): - # run a test to validate the provided credentials - # create our own config here to control retries and fail fast if credentials are invalid - sts_client = build('sts', region, client_config=Config(retries={'mode': 'standard', 'max_attempts': 2})) - sts_client.get_caller_identity + # run a test to validate the provided credentials + # create our own config here to control retries and fail fast if credentials are invalid + sts_client = build( + "sts", + region, + client_config=Config(retries={"mode": "standard", "max_attempts": 2}), + ) + sts_client.get_caller_identity + def validate_finding_types_from_cli(value): - """ - Validate that the finding types provided are valid finding types. - """ + """ + Validate that the finding types provided are valid finding types. + """ - finding_types = value.split(',') - finding_types = validate_finding_types(finding_types) + finding_types = value.split(",") + finding_types = validate_finding_types(finding_types) - return finding_types + return finding_types def validate_finding_types(finding_types): - if finding_types is None: - return finding_types + if finding_types is None: + return finding_types - finding_types = [finding_type.strip() for finding_type in finding_types] - finding_types = [finding_type.upper() for finding_type in finding_types] + finding_types = [finding_type.strip() for finding_type in finding_types] + finding_types = [finding_type.upper() for finding_type in finding_types] - for finding_type in finding_types: - if finding_type not in ['ERROR', 'SECURITY_WARNING', 'SUGGESTION', 'WARNING', 'NONE']: - raise ArgumentTypeError(f"Invalid finding type: {finding_type}.") + for finding_type in finding_types: + if finding_type not in [ + "ERROR", + "SECURITY_WARNING", + "SUGGESTION", + "WARNING", + "NONE", + ]: + raise ArgumentTypeError(f"Invalid finding type: {finding_type}.") - return finding_types \ No newline at end of file + return finding_types diff --git a/iam_check/test/test_accessAnalyzer.py b/iam_check/test/test_accessAnalyzer.py index c20bac8..3a689c3 100644 --- a/iam_check/test/test_accessAnalyzer.py +++ b/iam_check/test/test_accessAnalyzer.py @@ -1,15 +1,17 @@ -from lib.iamcheck_AccessAnalyzer import AccessAnalyzer -import pytest import json + +import pytest from lib import iamPolicy +from lib.iamcheck_AccessAnalyzer import AccessAnalyzer + class TestAccessAnalyzer: def test_a2_check(self): - policy = iamPolicy.Policy(file = "lib/tests/test_policy_accessanalyzer.json") + policy = iamPolicy.Policy(file="lib/tests/test_policy_accessanalyzer.json") arn = "arn:aws:iam::123456789012:role/test_role" check = AccessAnalyzer() result = check.run(policy, arn) file = "lib/tests/accessanalyzer_findings.json" - with open(file, 'r') as f: + with open(file, "r") as f: findings = json.loads(f) - assert(result == finding) \ No newline at end of file + assert result == finding diff --git a/iam_check/test/test_tf_plan.py b/iam_check/test/test_tf_plan.py index 2bc51b0..02f4914 100644 --- a/iam_check/test/test_tf_plan.py +++ b/iam_check/test/test_tf_plan.py @@ -1,35 +1,31 @@ -import config import json -import pytest + +import config import lib.tfPlan as tfPlan +import pytest + class TestTerraformResource: def test_core_function(self): err = [] data = { - 'address': 'foo.bar', - 'name': 'bar', - 'values': { - 'baz': 'Hello!' - }, - 'sensitive_values':{ - 'tags_all': {} - } + "address": "foo.bar", + "name": "bar", + "values": {"baz": "Hello!"}, + "sensitive_values": {"tags_all": {}}, } match = '{\n "address": "foo.bar",\n "name": "bar",\n "values": {\n "baz": "Hello!"\n },\n "sensitive_values": {\n "tags_all": {}\n }\n}' x = tfPlan.TerraformResource(**data) if str(x) != match: - err.append('Invalid JSON representation') - - if x.getValue('baz') != 'Hello!': - err.append('Can not get value of baz') - + err.append("Invalid JSON representation") - if x.getValue('tags_all') != {}: - err.append('Can not get value of tags_all') + if x.getValue("baz") != "Hello!": + err.append("Can not get value of baz") - x.setValue('big', 'shot') - if x.getValue('big') != 'shot': - error.append('Failed setting a value with setqVaule()') - assert( err == []) + if x.getValue("tags_all") != {}: + err.append("Can not get value of tags_all") + x.setValue("big", "shot") + if x.getValue("big") != "shot": + error.append("Failed setting a value with setqVaule()") + assert err == [] diff --git a/iam_check/tools/regex_patterns.py b/iam_check/tools/regex_patterns.py index 2c86ef9..1d8fd97 100644 --- a/iam_check/tools/regex_patterns.py +++ b/iam_check/tools/regex_patterns.py @@ -13,9 +13,11 @@ # the values must not start with ! (!${}) - as this represents a raw value # used by Fn::Sub # e.g. ${MyValue} -> MyValue -fn_sub_variables = re.compile(r'\$\{([^!].*?)\}') +fn_sub_variables = re.compile(r"\$\{([^!].*?)\}") # looks for dynamic ssm regex of the form {{resolve:ssm:reference-key:version}} or {{resolve:ssm:reference-key}} # captures reference-key and version (if it exists) -dynamic_ssm_reference_regex = re.compile(r'({{resolve:ssm:([a-zA-Z0-9_\.\-\/]+):?(\d+)?}})') \ No newline at end of file +dynamic_ssm_reference_regex = re.compile( + r"({{resolve:ssm:([a-zA-Z0-9_\.\-\/]+):?(\d+)?}})" +)