@@ -28,11 +28,11 @@ def check_default_rules(rules, short=False):
2828 if short is True, the testing mode is set on short for older OpenStack versions
2929 """
3030 ingress_rules = egress_rules = 0
31- egress_vars = {' IPv4' : {}, ' IPv6' : {}}
31+ egress_vars = {" IPv4" : {}, " IPv6" : {}}
3232 for key , value in egress_vars .items ():
33- value [' default' ] = 0
33+ value [" default" ] = 0
3434 if not short :
35- value [' custom' ] = 0
35+ value [" custom" ] = 0
3636 if not rules :
3737 logger .info ("No default security group rules defined." )
3838 for rule in rules :
@@ -42,36 +42,48 @@ def check_default_rules(rules, short=False):
4242 if not short :
4343 # we allow ingress from the same security group
4444 # but only for the default security group
45- if rule .remote_group_id == "PARENT" and not rule ["used_in_non_default_sg" ]:
45+ if (
46+ rule .remote_group_id == "PARENT"
47+ and not rule ["used_in_non_default_sg" ]
48+ ):
4649 continue
4750 ingress_rules += 1
4851 elif direction == "egress" and ethertype in egress_vars :
4952 egress_rules += 1
5053 if short :
51- egress_vars [ethertype ][' default' ] += 1
54+ egress_vars [ethertype ][" default" ] += 1
5255 continue
5356 if rule .remote_ip_prefix :
5457 # this rule does not allow traffic to all external ips
5558 continue
5659 # note: these two are not mutually exclusive
5760 if rule ["used_in_default_sg" ]:
58- egress_vars [ethertype ][' default' ] += 1
61+ egress_vars [ethertype ][" default" ] += 1
5962 if rule ["used_in_non_default_sg" ]:
60- egress_vars [ethertype ][' custom' ] += 1
63+ egress_vars [ethertype ][" custom" ] += 1
6164 # test whether there are no unallowed ingress rules
6265 if ingress_rules :
6366 logger .error (f"Expected no default ingress rules, found { ingress_rules } ." )
6467 # test whether all expected egress rules are present
65- missing = [(key , key2 ) for key , val in egress_vars .items () for key2 , val2 in val .items () if not val2 ]
68+ missing = [
69+ (key , key2 )
70+ for key , val in egress_vars .items ()
71+ for key2 , val2 in val .items ()
72+ if not val2
73+ ]
6674 if missing :
6775 logger .error (
6876 "Expected rules for egress for IPv4 and IPv6 both for default and custom security groups. "
6977 f"Missing rule types: { ', ' .join (str (x ) for x in missing )} "
7078 )
71- logger .info (str ({
72- "Unallowed Ingress Rules" : ingress_rules ,
73- "Egress Rules" : egress_rules ,
74- }))
79+ logger .info (
80+ str (
81+ {
82+ "Unallowed Ingress Rules" : ingress_rules ,
83+ "Egress Rules" : egress_rules ,
84+ }
85+ )
86+ )
7587
7688
7789def create_security_group (conn , sg_name : str = SG_NAME , description : str = DESCRIPTION ):
@@ -139,7 +151,9 @@ def main():
139151 "to the OS_CLOUD environment variable" ,
140152 )
141153 parser .add_argument (
142- "--debug" , action = "store_true" , help = "Enable debug logging" ,
154+ "--debug" ,
155+ action = "store_true" ,
156+ help = "Enable debug logging" ,
143157 )
144158 args = parser .parse_args ()
145159 openstack .enable_logging (debug = args .debug )
@@ -164,10 +178,17 @@ def main():
164178 test_rules (conn )
165179
166180 c = counting_handler .bylevel
167- logger .debug (f"Total critical / error / warning: { c [logging .CRITICAL ]} / { c [logging .ERROR ]} / { c [logging .WARNING ]} " )
181+ logger .debug (
182+ f"Total critical / error / warning: { c [logging .CRITICAL ]} / { c [logging .ERROR ]} / { c [logging .WARNING ]} "
183+ )
168184 if not c [logging .CRITICAL ]:
169- print ("security-groups-default-rules-check: " + ('PASS' , 'FAIL' )[min (1 , c [logging .ERROR ])])
170- return min (127 , c [logging .CRITICAL ] + c [logging .ERROR ]) # cap at 127 due to OS restrictions
185+ print (
186+ "security-groups-default-rules-check: "
187+ + ("PASS" , "FAIL" )[min (1 , c [logging .ERROR ])]
188+ )
189+ return min (
190+ 127 , c [logging .CRITICAL ] + c [logging .ERROR ]
191+ ) # cap at 127 due to OS restrictions
171192
172193
173194if __name__ == "__main__" :
0 commit comments