Skip to content

Commit e43f7f8

Browse files
committed
further simplification, logging, result output
Signed-off-by: Matthias Büchse <[email protected]>
1 parent c1df74c commit e43f7f8

File tree

1 file changed

+68
-47
lines changed

1 file changed

+68
-47
lines changed

Tests/iaas/security-groups/default-security-group-rules.py

Lines changed: 68 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,45 +4,44 @@
44
except for ingress rules from the same Security Group. Furthermore the
55
presence of default rules for egress traffic is checked.
66
"""
7-
87
import argparse
8+
from collections import Counter
9+
import logging
910
import os
11+
import sys
1012

1113
import openstack
1214
from openstack.exceptions import ResourceNotFound
1315

14-
SG_NAME = "default-test-sg"
15-
DESCRIPTION = "default-test-sg"
16+
logger = logging.getLogger(__name__)
17+
18+
SG_NAME = "scs-test-default-sg"
19+
DESCRIPTION = "scs-test-default-sg"
1620

1721

18-
def count_ingress_egress(rules, short=False):
22+
def check_default_rules(rules, short=False):
1923
"""
2024
counts all verall ingress rules and egress rules, depending on the requested testing mode
21-
:param object rules
25+
2226
:param bool short
23-
if short is true, the testing mode is set on short for older os versions
24-
:returns:
25-
ingress_rules integer count
26-
egress_rules integer count
27+
if short is True, the testing mode is set on short for older OpenStack versions
2728
"""
28-
ingress_rules = 0
29-
egress_rules = 0
29+
ingress_rules = egress_rules = 0
3030
egress_vars = {'IPv4': {}, 'IPv6': {}}
3131
for key, value in egress_vars.items():
3232
value['default'] = 0
3333
if not short:
3434
value['custom'] = 0
3535
if not rules:
36-
print("No default security group rules defined.")
36+
logger.info("No default security group rules defined.")
3737
for rule in rules:
3838
direction = rule["direction"]
3939
ethertype = rule["ethertype"]
4040
if direction == "ingress":
4141
if not short:
4242
# we allow ingress from the same security group
4343
# but only for the default security group
44-
r_group_id = rule.remote_group_id
45-
if r_group_id == "PARENT" and not rule["used_in_non_default_sg"]:
44+
if rule.remote_group_id == "PARENT" and not rule["used_in_non_default_sg"]:
4645
continue
4746
ingress_rules += 1
4847
elif direction == "egress" and ethertype in egress_vars:
@@ -60,35 +59,23 @@ def count_ingress_egress(rules, short=False):
6059
egress_vars[ethertype]['custom'] += 1
6160
# test whether there are no unallowed ingress rules
6261
if ingress_rules:
63-
raise ValueError(
64-
f"Expected no default ingress rules for security groups, "
65-
f"But there are {ingress_rules} ingress rules. "
66-
)
62+
logger.error(f"Expected no default ingress rules, found {ingress_rules}.")
6763
# test whether all expected egress rules are present
6864
missing = [(key, key2) for key, val in egress_vars.items() for key2, val2 in val.items() if not val2]
6965
if missing:
70-
raise ValueError(
71-
"Expected rules for egress for IPv4 and IPv6 "
72-
"both for default and custom security groups. "
66+
logger.error(
67+
"Expected rules for egress for IPv4 and IPv6 both for default and custom security groups. "
7368
f"Missing rule types: {', '.join(str(x) for x in missing)}"
7469
)
75-
return {
70+
logger.info(str({
7671
"Unallowed Ingress Rules": ingress_rules,
7772
"Egress Rules": egress_rules,
78-
}
79-
80-
81-
def test_rules(connection: openstack.connection.Connection):
82-
rules = connection.network.default_security_group_rules()
83-
return count_ingress_egress(rules)
73+
}))
8474

8575

8676
def create_security_group(conn, sg_name: str = SG_NAME, description: str = DESCRIPTION):
8777
"""Create security group in openstack
8878
89-
:param sec_group_name (str): Name of security group
90-
:param description (str): Description of security group
91-
9279
:returns:
9380
~openstack.network.v2.security_group.SecurityGroup: The new security group or None
9481
"""
@@ -102,20 +89,44 @@ def delete_security_group(conn, sg_id):
10289
try:
10390
conn.network.find_security_group(name_or_id=sg_id)
10491
except ResourceNotFound:
105-
print(f"Security group {sg_id} was deleted successfully.")
92+
logger.debug(f"Security group {sg_id} was deleted successfully.")
10693
except Exception as e:
107-
print(f"Security group {sg_id} was not deleted successfully" f"Exception: {e}")
94+
logger.critical(f"Security group {sg_id} was not deleted successfully")
95+
raise
10896

10997

11098
def altern_test_rules(connection: openstack.connection.Connection):
11199
sg_id = create_security_group(connection)
112100
try:
113101
sg = connection.network.find_security_group(name_or_id=sg_id)
114-
return count_ingress_egress(sg.security_group_rules, True)
102+
check_default_rules(sg.security_group_rules, short=True)
115103
finally:
116104
delete_security_group(connection, sg_id)
117105

118106

107+
def test_rules(connection: openstack.connection.Connection):
108+
try:
109+
rules = list(connection.network.default_security_group_rules())
110+
except ResourceNotFound as e:
111+
logger.info(
112+
"API call failed. OpenStack components might not be up to date. "
113+
"Falling back to old-style test method. "
114+
)
115+
logger.debug(f"traceback", exc_info=True)
116+
altern_test_rules(connection)
117+
else:
118+
check_default_rules(rules)
119+
120+
121+
class CountingHandler(logging.Handler):
122+
def __init__(self, level=logging.NOTSET):
123+
super().__init__(level=level)
124+
self.bylevel = Counter()
125+
126+
def handle(self, record):
127+
self.bylevel[record.levelno] += 1
128+
129+
119130
def main():
120131
parser = argparse.ArgumentParser(
121132
description="SCS Default Security Group Rules Checker"
@@ -131,6 +142,14 @@ def main():
131142
)
132143
args = parser.parse_args()
133144
openstack.enable_logging(debug=args.debug)
145+
logging.basicConfig(
146+
format="%(levelname)s: %(message)s",
147+
level=logging.DEBUG if args.debug else logging.INFO,
148+
)
149+
150+
# count the number of log records per level (used for summary and return code)
151+
counting_handler = CountingHandler(level=logging.INFO)
152+
logger.addHandler(counting_handler)
134153

135154
# parse cloud name for lookup in clouds.yaml
136155
cloud = args.os_cloud or os.environ.get("OS_CLOUD", None)
@@ -141,19 +160,21 @@ def main():
141160
)
142161

143162
with openstack.connect(cloud) as conn:
144-
try:
145-
print(test_rules(conn))
146-
except ResourceNotFound as e:
147-
print(
148-
"Resource could not be found. OpenStack components might not be up to date. "
149-
"Falling back to old-style test method. "
150-
f"Error: {e}"
151-
)
152-
print(altern_test_rules(conn))
153-
except Exception as e:
154-
print(f"Error occured: {e}")
155-
raise
163+
test_rules(conn)
164+
165+
c = counting_handler.bylevel
166+
logger.debug(f"Total critical / error / warning: {c[logging.CRITICAL]} / {c[logging.ERROR]} / {c[logging.WARNING]}")
167+
if not c[logging.CRITICAL]:
168+
print("security-groups-default-rules-check: " + ('PASS', 'FAIL')[min(1, c[logging.ERROR])])
169+
return min(127, c[logging.CRITICAL] + c[logging.ERROR]) # cap at 127 due to OS restrictions
156170

157171

158172
if __name__ == "__main__":
159-
main()
173+
try:
174+
sys.exit(main())
175+
except SystemExit:
176+
raise
177+
except BaseException as exc:
178+
logging.debug("traceback", exc_info=True)
179+
logging.critical(str(exc))
180+
sys.exit(1)

0 commit comments

Comments
 (0)