Skip to content

Commit da50bd7

Browse files
committed
refracturing
Signed-off-by: Katharina Trentau <[email protected]>
1 parent 7cf0800 commit da50bd7

File tree

1 file changed

+200
-100
lines changed

1 file changed

+200
-100
lines changed

Tests/iaas/security-groups/default-security-group-rules.py

Lines changed: 200 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -27,82 +27,180 @@ def connect(cloud_name: str) -> openstack.connection.Connection:
2727
)
2828

2929

30-
def test_rules(cloud_name: str):
31-
try:
32-
connection = connect(cloud_name)
33-
rules = connection.network.default_security_group_rules()
34-
except Exception as e:
35-
print(str(e))
36-
raise Exception(
37-
f"Connection to cloud '{cloud_name}' was not successful. "
38-
f"The default Security Group Rules could not be accessed. "
39-
f"Please check your cloud connection and authorization."
40-
)
41-
30+
def count_ingress_egress(rules, short=False):
4231
# count all overall ingress rules and egress rules.
4332
ingress_rules = 0
44-
ingress_from_same_sg = 0
4533
egress_rules = 0
46-
egress_ipv4_default_sg = 0
47-
egress_ipv4_custom_sg = 0
48-
egress_ipv6_default_sg = 0
49-
egress_ipv6_custom_sg = 0
34+
if short:
35+
egress_ipv4 = 0
36+
egress_ipv6 = 0
37+
else:
38+
ingress_from_same_sg = 0
39+
egress_ipv4_default_sg = 0
40+
egress_ipv4_custom_sg = 0
41+
egress_ipv6_default_sg = 0
42+
egress_ipv6_custom_sg = 0
43+
5044
if not rules:
5145
print("No default security group rules defined.")
5246
else:
5347
for rule in rules:
5448
direction = rule["direction"]
5549
ethertype = rule["ethertype"]
56-
r_custom_sg = rule["used_in_non_default_sg"]
57-
r_default_sg = rule["used_in_default_sg"]
50+
if not short:
51+
r_custom_sg = rule["used_in_non_default_sg"]
52+
r_default_sg = rule["used_in_default_sg"]
5853
if direction == "ingress":
5954
ingress_rules += 1
60-
# we allow ingress from the same security group
61-
# but only for the default security group
62-
r_group_id = rule.remote_group_id
63-
if r_group_id == "PARENT" and not r_custom_sg:
64-
ingress_from_same_sg += 1
55+
if not short:
56+
# we allow ingress from the same security group
57+
# but only for the default security group
58+
r_group_id = rule.remote_group_id
59+
if r_group_id == "PARENT" and not r_custom_sg:
60+
ingress_from_same_sg += 1
6561
elif direction == "egress" and ethertype == "IPv4":
6662
egress_rules += 1
67-
if rule.remote_ip_prefix:
68-
# this rule does not allow traffic to all external ips
69-
continue
70-
if r_custom_sg:
71-
egress_ipv4_custom_sg += 1
72-
if r_default_sg:
73-
egress_ipv4_default_sg += 1
63+
if not short:
64+
if rule.remote_ip_prefix:
65+
# this rule does not allow traffic to all external ips
66+
continue
67+
if r_custom_sg:
68+
egress_ipv4_custom_sg += 1
69+
if r_default_sg:
70+
egress_ipv4_default_sg += 1
71+
else:
72+
egress_ipv4 += 1
7473
elif direction == "egress" and ethertype == "IPv6":
7574
egress_rules += 1
76-
if rule.remote_ip_prefix:
77-
# this rule does not allow traffic to all external ips
78-
continue
79-
if r_custom_sg:
80-
egress_ipv6_custom_sg += 1
81-
if r_default_sg:
82-
egress_ipv6_default_sg += 1
75+
if not short:
76+
if rule.remote_ip_prefix:
77+
# this rule does not allow traffic to all external ips
78+
continue
79+
if r_custom_sg:
80+
egress_ipv6_custom_sg += 1
81+
if r_default_sg:
82+
egress_ipv6_default_sg += 1
83+
else:
84+
egress_ipv6 += 1
85+
if not short:
86+
assert ingress_rules == ingress_from_same_sg, (
87+
f"Expected only ingress rules for default security groups, "
88+
f"that allow ingress traffic from the same group. "
89+
f"But there are more - in total {ingress_rules} ingress rules. "
90+
f"There should be only {ingress_from_same_sg} ingress rules."
91+
)
92+
assert (
93+
egress_rules > 0
94+
), f"Expected to have more than {egress_rules} egress rules present."
95+
var_list = [
96+
egress_ipv4_default_sg,
97+
egress_ipv4_custom_sg,
98+
egress_ipv6_default_sg,
99+
egress_ipv6_custom_sg,
100+
]
101+
assert all([var > 0 for var in var_list]), (
102+
"Not all expected egress rules are present. "
103+
"Expected rules for egress for IPv4 and IPv6 "
104+
"both for default and custom security groups."
105+
)
106+
else:
107+
# test whether there are no ingress rules
108+
assert ingress_rules == 0, (
109+
f"Expected no default ingress rules for security groups, "
110+
f"But there are {ingress_rules} ingress rules. "
111+
f"There should be only none."
112+
)
113+
assert (
114+
egress_rules > 0
115+
), f"Expected to have more than {egress_rules} egress rules present."
116+
var_list = [
117+
egress_ipv4,
118+
egress_ipv6,
119+
]
120+
assert all([var > 0 for var in var_list]), (
121+
"Not all expected egress rules are present. "
122+
"Expected rules for egress for IPv4 and IPv6 "
123+
"both for default and custom security groups."
124+
)
125+
ingress_rules, egress_rules
83126

84-
# test whether there are no other than the allowed ingress rules
85-
assert ingress_rules == ingress_from_same_sg, (
86-
f"Expected only ingress rules for default security groups, "
87-
f"that allow ingress traffic from the same group. "
88-
f"But there are more - in total {ingress_rules} ingress rules. "
89-
f"There should be only {ingress_from_same_sg} ingress rules."
90-
)
91-
assert (
92-
egress_rules > 0
93-
), f"Expected to have more than {egress_rules} egress rules present."
94-
var_list = [
95-
egress_ipv4_default_sg,
96-
egress_ipv4_custom_sg,
97-
egress_ipv6_default_sg,
98-
egress_ipv6_custom_sg,
99-
]
100-
assert all([var > 0 for var in var_list]), (
101-
"Not all expected egress rules are present. "
102-
"Expected rules for egress for IPv4 and IPv6 "
103-
"both for default and custom security groups."
104-
)
105127

128+
def test_rules(cloud_name: str):
129+
try:
130+
connection = connect(cloud_name)
131+
rules = connection.network.default_security_group_rules()
132+
except Exception as e:
133+
print(str(e))
134+
raise Exception(
135+
f"Connection to cloud '{cloud_name}' was not successful. "
136+
f"The default Security Group Rules could not be accessed. "
137+
f"Please check your cloud connection and authorization."
138+
)
139+
140+
# # count all overall ingress rules and egress rules.
141+
# ingress_rules = 0
142+
# ingress_from_same_sg = 0
143+
# egress_rules = 0
144+
# egress_ipv4_default_sg = 0
145+
# egress_ipv4_custom_sg = 0
146+
# egress_ipv6_default_sg = 0
147+
# egress_ipv6_custom_sg = 0
148+
# if not rules:
149+
# print("No default security group rules defined.")
150+
# else:
151+
# for rule in rules:
152+
# direction = rule["direction"]
153+
# ethertype = rule["ethertype"]
154+
# r_custom_sg = rule["used_in_non_default_sg"]
155+
# r_default_sg = rule["used_in_default_sg"]
156+
# if direction == "ingress":
157+
# ingress_rules += 1
158+
# # we allow ingress from the same security group
159+
# # but only for the default security group
160+
# r_group_id = rule.remote_group_id
161+
# if r_group_id == "PARENT" and not r_custom_sg:
162+
# ingress_from_same_sg += 1
163+
# elif direction == "egress" and ethertype == "IPv4":
164+
# egress_rules += 1
165+
# if rule.remote_ip_prefix:
166+
# # this rule does not allow traffic to all external ips
167+
# continue
168+
# if r_custom_sg:
169+
# egress_ipv4_custom_sg += 1
170+
# if r_default_sg:
171+
# egress_ipv4_default_sg += 1
172+
# elif direction == "egress" and ethertype == "IPv6":
173+
# egress_rules += 1
174+
# if rule.remote_ip_prefix:
175+
# # this rule does not allow traffic to all external ips
176+
# continue
177+
# if r_custom_sg:
178+
# egress_ipv6_custom_sg += 1
179+
# if r_default_sg:
180+
# egress_ipv6_default_sg += 1
181+
182+
# test whether there are no other than the allowed ingress rules
183+
# assert ingress_rules == ingress_from_same_sg, (
184+
# f"Expected only ingress rules for default security groups, "
185+
# f"that allow ingress traffic from the same group. "
186+
# f"But there are more - in total {ingress_rules} ingress rules. "
187+
# f"There should be only {ingress_from_same_sg} ingress rules."
188+
# )
189+
# assert (
190+
# egress_rules > 0
191+
# ), f"Expected to have more than {egress_rules} egress rules present."
192+
# var_list = [
193+
# egress_ipv4_default_sg,
194+
# egress_ipv4_custom_sg,
195+
# egress_ipv6_default_sg,
196+
# egress_ipv6_custom_sg,
197+
# ]
198+
# assert all([var > 0 for var in var_list]), (
199+
# "Not all expected egress rules are present. "
200+
# "Expected rules for egress for IPv4 and IPv6 "
201+
# "both for default and custom security groups."
202+
# )
203+
ingress_rules, egress_rules = count_ingress_egress(rules)
106204
result_dict = {"Ingress Rules": ingress_rules, "Egress Rules": egress_rules}
107205
return result_dict
108206

@@ -128,6 +226,8 @@ def delete_security_group(conn, sg_id):
128226
conn.network.find_security_group(name_or_id=sg_id)
129227
except ResourceNotFound:
130228
print(f"Security group {sg_id} was deleted successfully.")
229+
except Exception as e:
230+
print(f"Security group {sg_id} was not deleted successfully" f"Exception: {e}")
131231

132232

133233
def altern_test_rules(cloud_name: str):
@@ -146,48 +246,48 @@ def altern_test_rules(cloud_name: str):
146246
except Exception:
147247
print("Security group was not created successfully.")
148248

149-
# count all overall ingress rules and egress rules.
150-
ingress_rules = 0
151-
egress_rules = 0
152-
egress_ipv4 = 0
153-
egress_ipv6 = 0
154-
if not rules:
155-
print("No default security group rules defined.")
156-
else:
157-
for rule in rules.security_group_rules:
158-
direction = rule["direction"]
159-
ethertype = rule["ethertype"]
160-
if direction == "ingress":
161-
ingress_rules += 1
162-
elif direction == "egress" and ethertype == "IPv4":
163-
egress_rules += 1
164-
egress_ipv4 += 1
165-
elif direction == "egress" and ethertype == "IPv6":
166-
egress_rules += 1
167-
egress_ipv6 += 1
249+
# # count all overall ingress rules and egress rules.
250+
# ingress_rules = 0
251+
# egress_rules = 0
252+
# egress_ipv4 = 0
253+
# egress_ipv6 = 0
254+
# if not rules:
255+
# print("No default security group rules defined.")
256+
# else:
257+
# for rule in rules.security_group_rules:
258+
# direction = rule["direction"]
259+
# ethertype = rule["ethertype"]
260+
# if direction == "ingress":
261+
# ingress_rules += 1
262+
# elif direction == "egress" and ethertype == "IPv4":
263+
# egress_rules += 1
264+
# egress_ipv4 += 1
265+
# elif direction == "egress" and ethertype == "IPv6":
266+
# egress_rules += 1
267+
# egress_ipv6 += 1
268+
269+
# # test whether there are no ingress rules
270+
# assert ingress_rules == 0, (
271+
# f"Expected no default ingress rules for security groups, "
272+
# f"But there are {ingress_rules} ingress rules. "
273+
# f"There should be only none."
274+
# )
275+
# assert (
276+
# egress_rules > 0
277+
# ), f"Expected to have more than {egress_rules} egress rules present."
278+
# var_list = [
279+
# egress_ipv4,
280+
# egress_ipv6,
281+
# ]
282+
# assert all([var > 0 for var in var_list]), (
283+
# "Not all expected egress rules are present. "
284+
# "Expected rules for egress for IPv4 and IPv6 "
285+
# "both for default and custom security groups."
286+
# )
287+
288+
ingress_rules, egress_rules = count_ingress_egress(rules.security_group_rules)
289+
delete_security_group(connection, sg_id)
168290

169-
# test whether there are no ingress rules
170-
assert ingress_rules == 0, (
171-
f"Expected no default ingress rules for security groups, "
172-
f"But there are {ingress_rules} ingress rules. "
173-
f"There should be only none."
174-
)
175-
assert (
176-
egress_rules > 0
177-
), f"Expected to have more than {egress_rules} egress rules present."
178-
var_list = [
179-
egress_ipv4,
180-
egress_ipv6,
181-
]
182-
assert all([var > 0 for var in var_list]), (
183-
"Not all expected egress rules are present. "
184-
"Expected rules for egress for IPv4 and IPv6 "
185-
"both for default and custom security groups."
186-
)
187-
try:
188-
delete_security_group(connection, sg_id)
189-
except Exception:
190-
print(f"Security group {sg_id} was not deleted successfully")
191291
result_dict = {"Ingress Rules": ingress_rules, "Egress Rules": egress_rules}
192292
return result_dict
193293

0 commit comments

Comments
 (0)