Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions iam_check/argument_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"""
import argparse

from .lib.reporter import ResourceOrCodeFindingToIgnore, ResourceAndCodeFindingToIgnore, \
AllowedExternalArn, AllowedExternalPrincipal
from .lib.reporter import (AllowedExternalArn, AllowedExternalPrincipal,
ResourceAndCodeFindingToIgnore,
ResourceOrCodeFindingToIgnore)
from .tools import regex_patterns


Expand All @@ -28,8 +29,8 @@ class ParseFindingsToIgnoreFromCLI(argparse.Action):
a combination of both in the form MyResource.FindingA
"""

def __call__(self, _, namespace, values, option_string=None):
values = values.split(',')
def __call__(self, _, namespace, values, option_string=None):
values = values.split(",")

findings_to_ignore = parse_findings_to_ignore(values)

Expand All @@ -47,7 +48,9 @@ def parse_findings_to_ignore(values_as_list):
if "." in value:
resource_and_code = value.split(".", 1)
# a split must have at least two members of the array, so no need to validate
finding_to_ignore = ResourceAndCodeFindingToIgnore(resource_and_code[0], resource_and_code[1])
finding_to_ignore = ResourceAndCodeFindingToIgnore(
resource_and_code[0], resource_and_code[1]
)
else:
finding_to_ignore = ResourceOrCodeFindingToIgnore(value)

Expand All @@ -63,7 +66,7 @@ class ParseAllowExternalPrincipalsFromCLI(argparse.Action):
"""

def __call__(self, _, namespace, values, option_string=None):
values = values.split(',')
values = values.split(",")

allowed_external_principals = parse_allow_external_principals(values)

Expand All @@ -86,4 +89,4 @@ def parse_allow_external_principals(values_as_list):

allowed_external_principals.append(allowed_external_principal)

return allowed_external_principals
return allowed_external_principals
40 changes: 20 additions & 20 deletions iam_check/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,39 @@
from botocore.config import Config

config = Config(
retries={
# this number was chosen arbitrarily, tweak as necessary
'max_attempts': 30,
'mode': 'standard'
}
retries={
# this number was chosen arbitrarily, tweak as necessary
"max_attempts": 30,
"mode": "standard",
}
)


def get_account_and_partition(region):
"""
Pull the account and partition from the credentials used to execute the validator
"""
"""
Pull the account and partition from the credentials used to execute the validator
"""

sts_client = build('sts', region)
identity = sts_client.get_caller_identity()
account_id = identity['Account']
sts_client = build("sts", region)
identity = sts_client.get_caller_identity()
account_id = identity["Account"]

parts = identity['Arn'].split(':')
partition = parts[1]
parts = identity["Arn"].split(":")
partition = parts[1]

return account_id, partition
return account_id, partition


def build(service_name, region_name, client_config=None):
if client_config is None:
client_config = config
session = boto3.Session(profile_name=profile_name, region_name=region_name)
return session.client(service_name, config=client_config)
if client_config is None:
client_config = config
session = boto3.Session(profile_name=profile_name, region_name=region_name)
return session.client(service_name, config=client_config)


profile_name = None


def set_profile(profile):
global profile_name
profile_name = profile
global profile_name
profile_name = profile
43 changes: 23 additions & 20 deletions iam_check/config.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,59 @@
import logging
import yaml
import sys

import yaml

# logging configuration
LOGGER = logging.getLogger('iam-policy-validator-for-terraform')
LOGGER = logging.getLogger("iam-policy-validator-for-terraform")

# AWS Account ID to use when unknown
awsAccount = '123456789012'
awsAccount = "123456789012"

#IAM Policy checks to run
# IAM Policy checks to run
# The default is to run all checks if thhe list is empty
# iamChecks = []

#IAM policy resources
# IAM policy resources
iamPolicyAttributes = {}

#Generate fake ARN
# Generate fake ARN
# default substitube is {<key>?<default>}
arnServiceMap = {}

validatePolicyResourceType = {}


def configure_logging(enable_logging):
console_handler = logging.StreamHandler(sys.stdout)
#console_handler.setLevel(logging.DEBUG)
# console_handler.setLevel(logging.DEBUG)

LOGGER.setLevel(logging.INFO)

# log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
console_handler.setFormatter(log_formatter)
LOGGER.propagate = False
# for handler in LOGGER.handlers:
# LOGGER.removeHandler(handler)
LOGGER.addHandler(console_handler)
if not enable_logging:
LOGGER.disabled = True


def loadConfigYaml(file):
global arnServiceMap
global iamPolicyAttributes
global validatePolicyResourceType
with open(file, 'r') as fh:

with open(file, "r") as fh:
data = yaml.safe_load(fh)

arnServiceMap = data.get('arnServiceMap', arnServiceMap)
if 'arnServiceMap' in data:
arnServiceMap = data['arnServiceMap']

if 'iamPolicyAttributes' in data:
iamPolicyAttributes = data['iamPolicyAttributes']

if 'validatePolicyResourceType' in data:
validatePolicyResourceType = data['validatePolicyResourceType']

arnServiceMap = data.get("arnServiceMap", arnServiceMap)
if "arnServiceMap" in data:
arnServiceMap = data["arnServiceMap"]

if "iamPolicyAttributes" in data:
iamPolicyAttributes = data["iamPolicyAttributes"]

if "validatePolicyResourceType" in data:
validatePolicyResourceType = data["validatePolicyResourceType"]
145 changes: 93 additions & 52 deletions iam_check/iam_check.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,58 @@
#!/usr/bin/env python3

import argparse
import logging
import json
import logging
import sys
import traceback

#from .config import loadConfigYaml, iamPolicyAttributes, configure_logging
# from .config import loadConfigYaml, iamPolicyAttributes, configure_logging
import iam_check.config as config

from .argument_actions import (ParseAllowExternalPrincipalsFromCLI,
ParseFindingsToIgnoreFromCLI)
from .client import get_account_and_partition, set_profile
from .parameters import validate_region, validate_finding_types_from_cli, validate_credentials
from .argument_actions import ParseFindingsToIgnoreFromCLI, ParseAllowExternalPrincipalsFromCLI
from .lib import iamcheck_AccessAnalyzer, account_config, reporter, tfPlan
from .lib.reporter import default_finding_types_that_are_blocking, Reporter
from .lib import account_config, iamcheck_AccessAnalyzer, reporter, tfPlan
from .lib.reporter import Reporter, default_finding_types_that_are_blocking
from .parameters import (validate_credentials, validate_finding_types_from_cli,
validate_region)

# Global Variables
LOGGER = logging.getLogger('iam-policy-validator-for-terraform')
LOGGER = logging.getLogger("iam-policy-validator-for-terraform")


def main():
global policy_checks
opts = cli_parse_opts()

account_id, partition = get_account_and_partition(opts.region)
account = account_config.AccountConfig(partition, opts.region, account_id)
validator=iamcheck_AccessAnalyzer.Validator(account.Account, account.Region, account.Partition)
validator = iamcheck_AccessAnalyzer.Validator(
account.Account, account.Region, account.Partition
)

findings = []
LOGGER.debug(f'Validating terraform plan file: {opts.template_path}')
with open(opts.template_path, 'r') as fh:
LOGGER.debug(f"Validating terraform plan file: {opts.template_path}")
with open(opts.template_path, "r") as fh:
data = fh.read()
try:
data = json.loads(data)
plan = tfPlan.TerraformPlan(**data)
except Exception as e:
if opts.plan.endswith('.json'):
print(f'Failed to load plan file from: {opts.template_path}')
if opts.plan.endswith(".json"):
print(f"Failed to load plan file from: {opts.template_path}")
raise e
plan = tfPlan.plan_from_stdout(data)
validator.run(plan)

findings = validator.findings
reporter = Reporter(opts.ignore_finding, opts.treat_as_blocking, opts.allowed_external_principals)

findings = validator.findings
reporter = Reporter(
opts.ignore_finding, opts.treat_as_blocking, opts.allowed_external_principals
)
report = reporter.build_report_from(findings)
LOGGER.info('Printing findings to the console...')
LOGGER.info("Printing findings to the console...")
report.print()

if report.has_blocking_findings():
exit(2)
else:
Expand All @@ -54,55 +62,88 @@ def main():
def cli_parse_opts():
parser = argparse.ArgumentParser()

parser.add_argument('--template-path', metavar="TEMPLATE_PATH", dest="template_path", required=True,
help='Terraform plan file (JSON)')
parser.add_argument('--region', dest="region", required=True, type=validate_region,
help="The region the resources will be deployed to.")
parser.add_argument('--profile', help='The named profile to use for AWS API calls.')
parser.add_argument("--config", nargs="+", help='Config file for running this script', action='append')
parser.add_argument('--enable-logging', help='Enable detailed logging.', default=False, action='store_true')
parser.add_argument('--ignore-finding', dest="ignore_finding", metavar='FINDING_CODE,RESOURCE_NAME,RESOURCE_NAME.FINDING_CODE',
help='Allow validation failures to be ignored.\n'
'Specify as a comma separated list of findings to be ignored. Can be individual '
'finding codes (e.g. "PASS_ROLE_WITH_STAR_IN_RESOURCE"), a specific resource name '
'(e.g. "MyResource"), or a combination of both separated by a period.'
'(e.g. "MyResource.PASS_ROLE_WITH_STAR_IN_RESOURCE").',
action=ParseFindingsToIgnoreFromCLI)
parser.add_argument('--treat-finding-type-as-blocking', dest="treat_as_blocking", metavar="ERROR,SECURITY_WARNING",
help='Specify which finding types should be treated as blocking. Other finding types are treated '
'as non-blocking. Defaults to "ERROR" and "SECURITY_WARNING". Specify as a comma separated '
'list of finding types that should be blocking. Possible values are "ERROR", '
'"SECURITY_WARNING", "SUGGESTION", and "WARNING". Pass "NONE" to ignore all errors.',
default=default_finding_types_that_are_blocking, type=validate_finding_types_from_cli)

parser.add_argument('--allow-external-principals', dest='allowed_external_principals', metavar="ACCOUNT,ARN",
help='A comma separated list of external principals that should be ignored. Specify as '
'a comma separated list of a 12 digit AWS account ID, a federated web identity '
'user, a federated SAML user, or an ARN. Specify "*" to allow anonymous access. '
'(e.g. 123456789123,arn:aws:iam::111111111111:role/MyOtherRole,graph.facebook.com)',
action=ParseAllowExternalPrincipalsFromCLI)
parser.add_argument(
"--template-path",
metavar="TEMPLATE_PATH",
dest="template_path",
required=True,
help="Terraform plan file (JSON)",
)
parser.add_argument(
"--region",
dest="region",
required=True,
type=validate_region,
help="The region the resources will be deployed to.",
)
parser.add_argument("--profile", help="The named profile to use for AWS API calls.")
parser.add_argument(
"--config",
nargs="+",
help="Config file for running this script",
action="append",
)
parser.add_argument(
"--enable-logging",
help="Enable detailed logging.",
default=False,
action="store_true",
)
parser.add_argument(
"--ignore-finding",
dest="ignore_finding",
metavar="FINDING_CODE,RESOURCE_NAME,RESOURCE_NAME.FINDING_CODE",
help="Allow validation failures to be ignored.\n"
"Specify as a comma separated list of findings to be ignored. Can be individual "
'finding codes (e.g. "PASS_ROLE_WITH_STAR_IN_RESOURCE"), a specific resource name '
'(e.g. "MyResource"), or a combination of both separated by a period.'
'(e.g. "MyResource.PASS_ROLE_WITH_STAR_IN_RESOURCE").',
action=ParseFindingsToIgnoreFromCLI,
)
parser.add_argument(
"--treat-finding-type-as-blocking",
dest="treat_as_blocking",
metavar="ERROR,SECURITY_WARNING",
help="Specify which finding types should be treated as blocking. Other finding types are treated "
'as non-blocking. Defaults to "ERROR" and "SECURITY_WARNING". Specify as a comma separated '
'list of finding types that should be blocking. Possible values are "ERROR", '
'"SECURITY_WARNING", "SUGGESTION", and "WARNING". Pass "NONE" to ignore all errors.',
default=default_finding_types_that_are_blocking,
type=validate_finding_types_from_cli,
)

parser.add_argument(
"--allow-external-principals",
dest="allowed_external_principals",
metavar="ACCOUNT,ARN",
help="A comma separated list of external principals that should be ignored. Specify as "
"a comma separated list of a 12 digit AWS account ID, a federated web identity "
'user, a federated SAML user, or an ARN. Specify "*" to allow anonymous access. '
"(e.g. 123456789123,arn:aws:iam::111111111111:role/MyOtherRole,graph.facebook.com)",
action=ParseAllowExternalPrincipalsFromCLI,
)
args = parser.parse_args()

#load yaml config
# load yaml config
if args.config is not None:
for conf in [fileName for arg in args.config for fileName in arg]:
LOGGER.debug(f'Config file: {conf}')
LOGGER.debug(f"Config file: {conf}")
config.loadConfigYaml(conf)

#Make sure there is at least one policy to look for
# Make sure there is at least one policy to look for
if len(config.iamPolicyAttributes) == 0:
raise ValueError(f'No IAM policies defined!')
raise ValueError(f"No IAM policies defined!")

set_profile(args.profile)
validate_credentials(args.region)
config.configure_logging(args.enable_logging)
return args


if __name__ == "__main__":
if __name__ == "__main__":
try:
main()
except Exception as e:
traceback.print_exc()
print(f'ERROR: Unexpected error occurred. {str(e)}', file=sys.stderr)
exit(1)
print(f"ERROR: Unexpected error occurred. {str(e)}", file=sys.stderr)
exit(1)
Loading