diff --git a/src/plugin/manager/ec2/security_group_manager.py b/src/plugin/manager/ec2/security_group_manager.py index cc6e8db..c5c0dc8 100644 --- a/src/plugin/manager/ec2/security_group_manager.py +++ b/src/plugin/manager/ec2/security_group_manager.py @@ -80,13 +80,11 @@ def create_cloud_service(self, region, options, secret_data, schema): # Inbound Rules inbound_rules = [] - for in_rule in raw.get("IpPermissions", []): - in_rule_copy = copy.deepcopy(in_rule) - - for _ip_range in in_rule.get("IpRanges", []): + for inbound_rule in raw.get("IpPermissions", []): + for _ip_range in inbound_rule.get("IpRanges", []): inbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=in_rule_copy, + self._custom_security_group_rule_info( + raw_rule=inbound_rule, remote=_ip_range, remote_type="ip_ranges", is_egress=False, @@ -94,10 +92,10 @@ def create_cloud_service(self, region, options, secret_data, schema): ) ) - for _user_group_pair in in_rule.get("UserIdGroupPairs", []): + for _user_group_pair in inbound_rule.get("UserIdGroupPairs", []): inbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=in_rule_copy, + self._custom_security_group_rule_info( + raw_rule=inbound_rule, remote=_user_group_pair, remote_type="user_id_group_pairs", is_egress=False, @@ -105,10 +103,10 @@ def create_cloud_service(self, region, options, secret_data, schema): ) ) - for _ip_v6_range in in_rule.get("Ipv6Ranges", []): + for _ip_v6_range in inbound_rule.get("Ipv6Ranges", []): inbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=in_rule_copy, + self._custom_security_group_rule_info( + raw_rule=inbound_rule, remote=_ip_v6_range, remote_type="ipv6_ranges", is_egress=False, @@ -116,10 +114,10 @@ def create_cloud_service(self, region, options, secret_data, schema): ) ) - for prefix_list_id in in_rule.get("PrefixListIds", []): + for prefix_list_id in inbound_rule.get("PrefixListIds", []): inbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=in_rule_copy, + self._custom_security_group_rule_info( + raw_rule=inbound_rule, remote=prefix_list_id, remote_type="prefix_list_ids", is_egress=False, @@ -129,43 +127,41 @@ def create_cloud_service(self, region, options, secret_data, schema): # Outbound Rules outbound_rules = [] - for out_rule in raw.get("IpPermissionsEgress", []): - out_rule_copy = copy.deepcopy(out_rule) - - for _ip_range in out_rule.get("IpRanges", []): + for outbound_rule in raw.get("IpPermissionsEgress", []): + for _ip_range in outbound_rule.get("IpRanges", []): outbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=out_rule_copy, + self._custom_security_group_rule_info( + raw_rule=outbound_rule, remote=_ip_range, remote_type="ip_ranges", is_egress=True, ) ) - for _user_group_pairs in out_rule.get("UserIdGroupPairs", []): + for _user_group_pairs in outbound_rule.get("UserIdGroupPairs", []): outbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=out_rule_copy, + self._custom_security_group_rule_info( + raw_rule=outbound_rule, remote=_user_group_pairs, remote_type="user_id_group_pairs", is_egress=True, ) ) - for _ip_v6_range in out_rule.get("Ipv6Ranges", []): + for _ip_v6_range in outbound_rule.get("Ipv6Ranges", []): outbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=out_rule_copy, + self._custom_security_group_rule_info( + raw_rule=outbound_rule, remote=_ip_v6_range, remote_type="ipv6_ranges", is_egress=True, ) ) - for prefix_list_id in out_rule.get("PrefixListIds", []): + for prefix_list_id in outbound_rule.get("PrefixListIds", []): outbound_rules.append( - self._custom_security_group_inbound_rule_info( - raw_rule=out_rule_copy, + self._custom_security_group_rule_info( + raw_rule=outbound_rule, remote=prefix_list_id, remote_type="prefix_list_ids", is_egress=True, @@ -258,7 +254,7 @@ def _get_matched_security_group_rule_id( return None - def _custom_security_group_inbound_rule_info( + def _custom_security_group_rule_info( self, raw_rule, remote, remote_type, is_egress, vulnerable_ports=None ): rule_id = self._get_matched_security_group_rule_id( @@ -269,30 +265,29 @@ def _custom_security_group_inbound_rule_info( is_egress=is_egress, ) - raw_rule = self._custom_security_group_rule_info(raw_rule, remote, remote_type) - raw_rule.update({"rule_id": rule_id}) - - protocol_display = raw_rule.get("protocol_display") + custom_rule = self._custom_security_group_rule(raw_rule, remote, remote_type, rule_id) if vulnerable_ports: + protocol_display = custom_rule.get("protocol_display") + ports = self._get_vulnerable_ports( protocol_display, raw_rule, vulnerable_ports ) - raw_rule.update( + custom_rule.update( { "vulnerable_ports": ports, "detected_vulnerable_ports": True if ports else False, } ) - return raw_rule + return custom_rule - def _custom_security_group_rule_info(self, raw_rule, remote, remote_type): - protocol_display = self._get_protocol_display(raw_rule.get("IpProtocol")) - raw_rule.update( + def _custom_security_group_rule(self, raw_rule, remote, remote_type, rule_id): + return ( { - "protocol_display": protocol_display, + "rule_id": rule_id, + "protocol_display": self._get_protocol_display(raw_rule.get("IpProtocol")), "port_display": self._get_port_display(raw_rule), "source_display": self._get_source_display(remote), "description_display": self._get_description_display(remote), @@ -300,8 +295,6 @@ def _custom_security_group_rule_info(self, raw_rule, remote, remote_type): } ) - return raw_rule - def list_instances(self): instances = [] filter_info = [