@@ -28,18 +28,24 @@ def connect(cloud_name: str) -> openstack.connection.Connection:
2828
2929
3030def count_ingress_egress (rules , short = False ):
31- # count all overall ingress rules and egress rules.
31+ """
32+ counts all verall ingress rules and egress rules, depending on the requested testing mode
33+ :param object rules
34+ :param bool short
35+ if short is true, the testing mode is set on short for older os versions
36+ :returns:
37+ ingress_rules integer count
38+ egress_rules integer count
39+ """
3240 ingress_rules = 0
3341 egress_rules = 0
3442 if not short :
35- print ("not short" )
3643 ingress_from_same_sg = 0
3744 egress_ipv4_default_sg = 0
3845 egress_ipv4_custom_sg = 0
3946 egress_ipv6_default_sg = 0
4047 egress_ipv6_custom_sg = 0
4148 else :
42- print ("short" )
4349 egress_ipv4 = 0
4450 egress_ipv6 = 0
4551 if not rules :
@@ -83,42 +89,34 @@ def count_ingress_egress(rules, short=False):
8389 egress_ipv6_default_sg += 1
8490 else :
8591 egress_ipv6 += 1
86- if not short :
87- assert ingress_rules == ingress_from_same_sg , (
88- f"Expected only ingress rules for default security groups, "
89- f"that allow ingress traffic from the same group. "
90- f"But there are more - in total { ingress_rules } ingress rules. "
91- f"There should be only { ingress_from_same_sg } ingress rules."
92+ if not egress_rules > 0 :
93+ raise ValueError (
94+ f"Expected to have more than { egress_rules } egress rules present."
9295 )
93- assert (
94- egress_rules > 0
95- ), f"Expected to have more than { egress_rules } egress rules present."
96+ if not short :
97+ if ingress_rules == ingress_from_same_sg :
98+ ingress_rules -= 1
9699 var_list = [
97100 egress_ipv4_default_sg ,
98101 egress_ipv4_custom_sg ,
99102 egress_ipv6_default_sg ,
100103 egress_ipv6_custom_sg ,
101104 ]
102- assert all ([var > 0 for var in var_list ]), (
103- "Not all expected egress rules are present. "
104- "Expected rules for egress for IPv4 and IPv6 "
105- "both for default and custom security groups."
106- )
107105 else :
108- # test whether there are no ingress rules
109- assert ingress_rules == 0 , (
110- f"Expected no default ingress rules for security groups, "
111- f"But there are { ingress_rules } ingress rules. "
112- f"There should be only none."
113- )
114- assert (
115- egress_rules > 0
116- ), f"Expected to have more than { egress_rules } egress rules present."
117106 var_list = [
118107 egress_ipv4 ,
119108 egress_ipv6 ,
120109 ]
121- assert all ([var > 0 for var in var_list ]), (
110+ # test whether there are no unallowed ingress rules
111+ if not ingress_rules == 0 :
112+ raise ValueError (
113+ f"Expected no default ingress rules for security groups, "
114+ f"But there are { ingress_rules } ingress rules. "
115+ f"There should be only none."
116+ )
117+ # test whether all expected egress rules are present
118+ if not all (var > 0 for var in var_list ):
119+ raise ValueError (
122120 "Not all expected egress rules are present. "
123121 "Expected rules for egress for IPv4 and IPv6 "
124122 "both for default and custom security groups."
@@ -137,20 +135,23 @@ def test_rules(cloud_name: str):
137135 f"The default Security Group Rules could not be accessed. "
138136 f"Please check your cloud connection and authorization."
139137 )
140-
138+ if not any (rule for rule in rules ):
139+ raise
141140 ingress_rules , egress_rules = count_ingress_egress (rules )
142- result_dict = {"Ingress Rules" : ingress_rules , "Egress Rules" : egress_rules }
141+ result_dict = {
142+ "Unallowed Ingress Rules" : ingress_rules ,
143+ "Egress Rules" : egress_rules ,
144+ }
143145 return result_dict
144146
145147
146148def create_security_group (conn , sg_name : str = SG_NAME , description : str = DESCRIPTION ):
147149 """Create security group in openstack
148150
149- Args:
150- sec_group_name (str): Name of security group
151- description (str): Description of security group
151+ :param sec_group_name (str): Name of security group
152+ :param description (str): Description of security group
152153
153- Returns :
154+ :returns :
154155 ~openstack.network.v2.security_group.SecurityGroup: The new security group or None
155156 """
156157 sg = conn .network .create_security_group (name = sg_name , description = description )
@@ -187,7 +188,10 @@ def altern_test_rules(cloud_name: str):
187188 ingress_rules , egress_rules = count_ingress_egress (rules .security_group_rules , True )
188189 delete_security_group (connection , sg_id )
189190
190- result_dict = {"Ingress Rules" : ingress_rules , "Egress Rules" : egress_rules }
191+ result_dict = {
192+ "Unallowed Ingress Rules" : ingress_rules ,
193+ "Egress Rules" : egress_rules ,
194+ }
191195 return result_dict
192196
193197
@@ -211,10 +215,11 @@ def main():
211215 cloud = os .environ .get ("OS_CLOUD" , None )
212216 if args .os_cloud :
213217 cloud = args .os_cloud
214- assert cloud , (
218+ if not cloud :
219+ raise ValueError (
215220 "You need to have the OS_CLOUD environment variable set to your cloud "
216221 "name or pass it via --os-cloud"
217- )
222+ )
218223 try :
219224 print (test_rules (cloud ))
220225 except ResourceNotFound as e :
0 commit comments