|
| 1 | +from collections import Counter |
| 2 | +import logging |
| 3 | +import os |
| 4 | +import sys |
| 5 | + |
| 6 | +import openstack |
| 7 | +from openstack.exceptions import ResourceNotFound |
| 8 | + |
| 9 | +logger = logging.getLogger(__name__) |
| 10 | + |
| 11 | +SG_NAME = "scs-test-default-sg" |
| 12 | +DESCRIPTION = "scs-test-default-sg" |
| 13 | + |
| 14 | + |
| 15 | +def check_default_rules(rules, short=False): |
| 16 | + """ |
| 17 | + counts all verall ingress rules and egress rules, depending on the requested testing mode |
| 18 | +
|
| 19 | + :param bool short |
| 20 | + if short is True, the testing mode is set on short for older OpenStack versions |
| 21 | + """ |
| 22 | + ingress_rules = egress_rules = 0 |
| 23 | + egress_vars = {'IPv4': {}, 'IPv6': {}} |
| 24 | + for key, value in egress_vars.items(): |
| 25 | + value['default'] = 0 |
| 26 | + if not short: |
| 27 | + value['custom'] = 0 |
| 28 | + if not rules: |
| 29 | + logger.info("No default security group rules defined.") |
| 30 | + for rule in rules: |
| 31 | + direction = rule["direction"] |
| 32 | + ethertype = rule["ethertype"] |
| 33 | + if direction == "ingress": |
| 34 | + if not short: |
| 35 | + # we allow ingress from the same security group |
| 36 | + # but only for the default security group |
| 37 | + if rule.remote_group_id == "PARENT" and not rule["used_in_non_default_sg"]: |
| 38 | + continue |
| 39 | + ingress_rules += 1 |
| 40 | + elif direction == "egress" and ethertype in egress_vars: |
| 41 | + egress_rules += 1 |
| 42 | + if short: |
| 43 | + egress_vars[ethertype]['default'] += 1 |
| 44 | + continue |
| 45 | + if rule.remote_ip_prefix: |
| 46 | + # this rule does not allow traffic to all external ips |
| 47 | + continue |
| 48 | + # note: these two are not mutually exclusive |
| 49 | + if rule["used_in_default_sg"]: |
| 50 | + egress_vars[ethertype]['default'] += 1 |
| 51 | + if rule["used_in_non_default_sg"]: |
| 52 | + egress_vars[ethertype]['custom'] += 1 |
| 53 | + # test whether there are no unallowed ingress rules |
| 54 | + if ingress_rules: |
| 55 | + logger.error(f"Expected no default ingress rules, found {ingress_rules}.") |
| 56 | + # test whether all expected egress rules are present |
| 57 | + missing = [(key, key2) for key, val in egress_vars.items() for key2, val2 in val.items() if not val2] |
| 58 | + if missing: |
| 59 | + logger.error( |
| 60 | + "Expected rules for egress for IPv4 and IPv6 both for default and custom security groups. " |
| 61 | + f"Missing rule types: {', '.join(str(x) for x in missing)}" |
| 62 | + ) |
| 63 | + logger.info(str({ |
| 64 | + "Unallowed Ingress Rules": ingress_rules, |
| 65 | + "Egress Rules": egress_rules, |
| 66 | + })) |
| 67 | + return not ingress_rules and not missing |
| 68 | + |
| 69 | + |
| 70 | +def create_security_group(conn, sg_name: str = SG_NAME, description: str = DESCRIPTION): |
| 71 | + """Create security group in openstack |
| 72 | +
|
| 73 | + :returns: |
| 74 | + ~openstack.network.v2.security_group.SecurityGroup: The new security group or None |
| 75 | + """ |
| 76 | + sg = conn.network.create_security_group(name=sg_name, description=description) |
| 77 | + return sg.id |
| 78 | + |
| 79 | + |
| 80 | +def delete_security_group(conn, sg_id): |
| 81 | + conn.network.delete_security_group(sg_id) |
| 82 | + # in case of a successful delete finding the sg will throw an exception |
| 83 | + try: |
| 84 | + conn.network.find_security_group(name_or_id=sg_id) |
| 85 | + except ResourceNotFound: |
| 86 | + logger.debug(f"Security group {sg_id} was deleted successfully.") |
| 87 | + except Exception: |
| 88 | + logger.critical(f"Security group {sg_id} was not deleted successfully") |
| 89 | + raise |
| 90 | + |
| 91 | + |
| 92 | +def altern_test_rules(connection: openstack.connection.Connection): |
| 93 | + sg_id = create_security_group(connection) |
| 94 | + try: |
| 95 | + sg = connection.network.find_security_group(name_or_id=sg_id) |
| 96 | + return check_default_rules(sg.security_group_rules, short=True) |
| 97 | + finally: |
| 98 | + delete_security_group(connection, sg_id) |
| 99 | + |
| 100 | + |
| 101 | +def compute_scs_0115_default_rules(conn: openstack.connection.Connection): |
| 102 | + try: |
| 103 | + rules = list(conn.network.default_security_group_rules()) |
| 104 | + except (ResourceNotFound, AttributeError) as exc: |
| 105 | + # older versions of OpenStack don't have the endpoint and give ResourceNotFound |
| 106 | + if isinstance(exc, ResourceNotFound) and 'default-security-group-rules' not in str(exc): |
| 107 | + raise |
| 108 | + # why we see the AttributeError in some environments is a mystery |
| 109 | + if isinstance(exc, AttributeError) and 'default_security_group_rules' not in str(exc): |
| 110 | + raise |
| 111 | + logger.info( |
| 112 | + "API call failed. OpenStack components might not be up to date. " |
| 113 | + "Falling back to old-style test method. " |
| 114 | + ) |
| 115 | + return altern_test_rules(conn) |
| 116 | + else: |
| 117 | + return check_default_rules(rules) |
0 commit comments