Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions src/plugin/manager/ec2/security_group_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ def create_cloud_service(self, region, options, secret_data, schema):
for _ip_range in in_rule.get("IpRanges", []):
in_rule_copy = copy.deepcopy(in_rule)
inbound_rules.append(
self.custom_security_group_rule_info(
self.custom_security_group_inbound_rule_info(
in_rule_copy, _ip_range, "ip_ranges",vulnerable_ports
)
)

for _user_group_pairs in in_rule.get("UserIdGroupPairs", []):
in_rule_copy = copy.deepcopy(in_rule)
inbound_rules.append(
self.custom_security_group_rule_info(
self.custom_security_group_inbound_rule_info(
in_rule_copy,
_user_group_pairs,
"user_id_group_pairs",
Expand All @@ -84,7 +84,7 @@ def create_cloud_service(self, region, options, secret_data, schema):
for _ip_v6_range in in_rule.get("Ipv6Ranges", []):
in_rule_copy = copy.deepcopy(in_rule)
inbound_rules.append(
self.custom_security_group_rule_info(
self.custom_security_group_inbound_rule_info(
in_rule_copy, _ip_v6_range, "ipv6_ranges",vulnerable_ports
)
)
Expand All @@ -96,7 +96,7 @@ def create_cloud_service(self, region, options, secret_data, schema):
out_rule_copy = copy.deepcopy(out_rule)
outbound_rules.append(
self.custom_security_group_rule_info(
out_rule_copy, _ip_range, "ip_ranges",vulnerable_ports
out_rule_copy, _ip_range, "ip_ranges"
)
)

Expand All @@ -106,15 +106,15 @@ def create_cloud_service(self, region, options, secret_data, schema):
self.custom_security_group_rule_info(
out_rule_copy,
_user_group_pairs,
"user_id_group_pairs",vulnerable_ports,
"user_id_group_pairs",
)
)

for _ip_v6_range in out_rule.get("Ipv6Ranges", []):
out_rule_copy = copy.deepcopy(out_rule)
outbound_rules.append(
self.custom_security_group_rule_info(
out_rule_copy, _ip_v6_range, "ipv6_ranges",vulnerable_ports
out_rule_copy, _ip_v6_range, "ipv6_ranges"
)
)

Expand Down Expand Up @@ -165,7 +165,16 @@ def create_cloud_service(self, region, options, secret_data, schema):
region_name=region,
)

def custom_security_group_rule_info(self, raw_rule, remote, remote_type, vulnerable_ports):
def custom_security_group_inbound_rule_info(self, raw_rule, remote, remote_type, vulnerable_ports):
raw_rule = self.custom_security_group_rule_info(raw_rule, remote, remote_type)

protocol_display = raw_rule.get("protocol_display")

raw_rule.update({"vulnerable_ports": self._get_vulnerable_ports(protocol_display, raw_rule, vulnerable_ports)})

return raw_rule

def custom_security_group_rule_info(self, raw_rule, remote, remote_type):
protocol_display = self._get_protocol_display(raw_rule.get("IpProtocol"))
raw_rule.update(
{
Expand All @@ -174,7 +183,6 @@ def custom_security_group_rule_info(self, raw_rule, remote, remote_type, vulnera
"source_display": self._get_source_display(remote),
"description_display": self._get_description_display(remote),
remote_type: remote,
"vulnerable_ports": self._get_vulnerable_ports(protocol_display, raw_rule, vulnerable_ports)
}
)

Expand Down Expand Up @@ -296,19 +304,17 @@ def get_instance_name_from_tags(instance):
@staticmethod
def _get_vulnerable_ports(protocol_display: str, raw_rule: dict, vulnerable_ports: str):
try:
ports = [int(port.strip()) for port in vulnerable_ports.split(',')]

if protocol_display == "ALL":
return [int(port.strip()) for port in vulnerable_ports.split(',')]
return ports

to_port = raw_rule.get("ToPort")
from_port = raw_rule.get("FromPort")

if to_port is None or from_port is None:
return []

return [
int(port.strip())
for port in vulnerable_ports.split(',')
if from_port <= int(port.strip()) <= to_port
]
return [port for port in ports if from_port <= port <= to_port]
except ValueError:
raise ERROR_VULNERABLE_PORTS(vulnerable_ports)
Loading