Skip to content

Fixed issue for s3-require-ssl, s3-bucket-block-public-write-acces… #309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 175 additions & 66 deletions policies/s3/s3-bucket-block-public-read-access.sentinel
Original file line number Diff line number Diff line change
Expand Up @@ -5,113 +5,222 @@

# Imports

import "tfconfig/v2" as tfconfig
import "tfstate/v2" as tfstate
import "tfplan/v2" as tfplan
import "tfresources" as tf
import "report" as report
import "collection" as collection
import "collection/maps" as maps
import "strings"
import "types"

# Constants

const = {
"policy_name": "s3-bucket-block-public-read-access",
"message": "S3 general purpose buckets should block public read access. Refer to https://docs.aws.amazon.com/securityhub/latest/userguide/s3-controls.html#s3-2 for more details.",
"resource_aws_s3_bucket": "aws_s3_bucket",
"resource_aws_s3_bucket_policy": "aws_s3_bucket_policy",
"resource_aws_iam_policy_document": "aws_iam_policy_document",
"resource_aws_s3_public_access_block": "aws_s3_bucket_public_access_block",
"resource_aws_s3_bucket_acl": "aws_s3_bucket_acl",
"Allow": "Allow",
"address": "address",
"module_address": "module_address",
"module_prefix": "module.",
"values": "values",
"variable": "variable",
"references": "references",
"acl_not_valid_values": ["public-read", "public-read-write", "authenticated-read", "aws-exec-read"],
"access_control_policy_not_valid_values": ["FULL_CONTROL", "READ", "READ_ACP"],
}

# Functions

# Function to get violations for data source aws_iam_policy_document for s3 bucket policy
get_policy_document_violations = func(resources) {
return collection.reject(resources, func(res) {
statements = maps.get(res, "values.statement", [])
if statements is null {
return true
# Removes module address prefix from a resource
# and returns back the localized address for a module.
resource_address_without_module_address = func(res) {
resource_addr = res[const.address]

# Check for root module
if not strings.has_prefix(resource_addr, const.module_prefix) {
return resource_addr
}

module_addr_prefix = res[const.module_address] + "."
return strings.trim_prefix(resource_addr, module_addr_prefix)
}

# Function to check if policy document has public read violations
has_public_read_policy_violation = func(res) {
policy = res.config.policy
if policy[const.references] is not defined or policy[const.references][1] not matches "^data.aws_iam_policy_document.(.*)$" {
return false
}
reference = policy[const.references][1]

address = strings.trim_prefix(reference, "data.")
// Append the module address to the data source's local address
// in case of nested modules
if strings.has_prefix(res.module_address, const.module_prefix) {
address = res.module_address + "." + address
}
datasource = tf.state(tfstate.resources).mode("data").address(address).resources
if datasource is null or datasource is not defined or datasource is empty {
address = "data." + address
datasource = tf.config(tfconfig.resources).mode("data").address(address).resources
if datasource is null or datasource is not defined {
return false
}
full_access = collection.find(statements, func(statement) {
actions = maps.get(statement, "actions", [])
return collection.find(actions, func(action) {
return action contains ":*" or action contains "s3:GetObject" or action contains "s3:GetBucket"
}) is defined and
maps.get(statement, "effect", "") == const.Allow
})
return full_access is not defined or full_access is empty
})
}

# Function to get violations for resource aws_s3_bucket_public_access_block for s3 bucket
get_public_access_block_violations = func(resources) {
return collection.reject(resources, func(res) {
block_public_acls = maps.get(res, "values.block_public_acls", false)
ignore_public_acls = maps.get(res, "values.ignore_public_acls", false)
block_public_policy = maps.get(res, "values.block_public_policy", false)
restrict_public_buckets = maps.get(res, "values.restrict_public_buckets", false)
return block_public_acls and ignore_public_acls and block_public_policy and restrict_public_buckets
})
}

# Function to get violations for resource aws_s3_bucket_acl for s3 bucket
get_bucket_acl_violations = func(resources) {
return collection.reject(resources, func(res) {
acl_complaint = false
access_control_policy_complaint = false

acl = maps.get(res, "values.acl", null)
if acl is not null {
if acl in const.acl_not_valid_values {
acl_complaint = false
} else {
acl_complaint = true
statements = datasource[0].config.statement
if statements is not defined {
return false
}
for statements as _, statement {
actions = maps.get(statement, "actions", {})
effect = maps.get(statement, "effect", {})
if (types.type_of(effect) is not "string" and effect.constant_value is const.Allow) or
(types.type_of(effect) is "string" and effect is const.Allow) {
if types.type_of(actions) is not "string" {
action_values = actions.constant_value
} else {
action_values = [actions]
}
if action_values is not defined {
continue
}
for action_values as _, action {
if action contains ":*" or action contains "s3:GetObject" or action contains "s3:GetBucket" {
return true
}
}
}
}

access_control_policy = maps.get(res, "values.access_control_policy", [])
if access_control_policy is empty {
return acl_complaint
return false
}

statements = datasource[0].values.statement
if statements is undefined {
return false
}
for statements as _, statement {
actions = maps.get(statement, "actions", [])
effect = maps.get(statement, "effect", "")
if effect is const.Allow {
for actions as _, action {
if action contains ":*" or action contains "s3:GetObject" or action contains "s3:GetBucket" {
return true
}
}
}
}
return false
}

# Function to check if public access block has violations
has_public_access_block_violation = func(config) {
block_public_acls = maps.get(maps.get(config, "block_public_acls", {}), "constant_value", false)
ignore_public_acls = maps.get(maps.get(config, "ignore_public_acls", {}), "constant_value", false)
block_public_policy = maps.get(maps.get(config, "block_public_policy", {}), "constant_value", false)
restrict_public_buckets = maps.get(maps.get(config, "restrict_public_buckets", {}), "constant_value", false)
return not (block_public_acls and ignore_public_acls and block_public_policy and restrict_public_buckets)
}

# Function to check if bucket ACL has violations
has_bucket_acl_violation = func(config) {
acl = maps.get(maps.get(config, "acl", {}), "constant_value", null)
if acl is not null and acl in const.acl_not_valid_values {
return true
}
access_control_policy = maps.get(maps.get(config, "access_control_policy", {}), "constant_value", [])
if access_control_policy is not empty {
grant = maps.get(access_control_policy[0], "grant", [])
if grant is empty {
return acl_complaint
if grant is not empty {
permission = maps.get(grant[0], "permission", "")
if permission is not "" and permission in const.access_control_policy_not_valid_values {
return true
}
}
}
return false
}

permission = maps.get(grant[0], "permission", "")
if permission is not "" and permission not in const.access_control_policy_not_valid_values {
access_control_policy_complaint = true
}
# Prefixes the referenced s3 bucket's address with
# the module address. This is done because resource
# addresses comprise of module addresses
sanitize_referenced_s3_bucket_address = func(res) {
module_addr = res[const.module_address]
if res.config.bucket.constant_value is defined {
return ""
}

bucket_reference = res.config.bucket.references[1]
# Check for root module
if not strings.has_prefix(res[const.address], const.module_prefix) {
return bucket_reference
}

return module_addr + "." + bucket_reference
}

build_violation_object = func(resource_addr, module_addr, message) {
return {
"address": resource_addr,
"module_address": module_addr,
"message": message,
}
}

# Variables

config_resources = tf.config(tfconfig.resources)
s3_bucket_resources = config_resources.type(const.resource_aws_s3_bucket).resources

# Get bucket policy resources that have violations
bucket_policy_violations = filter config_resources.type(const.resource_aws_s3_bucket_policy).resources as _, res {
has_public_read_policy_violation(res)
}

return acl_complaint or access_control_policy_complaint
})
# Get public access block resources that have violations
public_access_block_violations = filter config_resources.type(const.resource_aws_s3_public_access_block).resources as _, res {
has_public_access_block_violation(res.config)
}

iam_policy_document_resources = tf.state(tfstate.resources).type(const.resource_aws_iam_policy_document).resources
public_access_block_resources = tf.plan(tfplan.planned_values.resources).type(const.resource_aws_s3_public_access_block).resources
bucket_acl_resources = tf.plan(tfplan.planned_values.resources).type(const.resource_aws_s3_bucket_acl).resources
# Get bucket ACL resources that have violations
bucket_acl_violations = filter config_resources.type(const.resource_aws_s3_bucket_acl).resources as _, res {
has_bucket_acl_violation(res.config)
}

violations = []
violations += get_policy_document_violations(iam_policy_document_resources)
violations += get_public_access_block_violations(public_access_block_resources)
violations += get_bucket_acl_violations(bucket_acl_resources)
# Get bucket addresses that have policy violations
bucket_addresses_with_policy_violations = map bucket_policy_violations as _, res {
sanitize_referenced_s3_bucket_address(res)
}

# Get bucket addresses that have public access block violations
bucket_addresses_with_access_block_violations = map public_access_block_violations as _, res {
sanitize_referenced_s3_bucket_address(res)
}

# Get bucket addresses that have ACL violations
bucket_addresses_with_acl_violations = map bucket_acl_violations as _, res {
sanitize_referenced_s3_bucket_address(res)
}

# Find violations: buckets that have policy violations OR have access block violations OR have ACL violations
violations = filter s3_bucket_resources as _, res {
res.address in bucket_addresses_with_policy_violations or
res.address in bucket_addresses_with_access_block_violations or
res.address in bucket_addresses_with_acl_violations
}

summary = {
"policy_name": const.policy_name,
"violations": map violations as _, v {
{
"address": v.address,
"module_address": v.module_address,
"message": const.message,
}
build_violation_object(v.address, v.module_address, const.message)
},
}

print(report.generate_policy_report(summary))

main = rule {
violations is empty
}
Loading