@@ -84,6 +84,61 @@ def simple_security_group(request):
84
84
except :
85
85
pass
86
86
87
+ @pytest .fixture
88
+ def security_group_with_vpc (request , simple_vpc ):
89
+ (_ , vpc_cr ) = simple_vpc
90
+ vpc_id = vpc_cr ["status" ]["vpcID" ]
91
+
92
+ assert vpc_id is not None
93
+
94
+ resource_name = random_suffix_name ("security-group-vpc" , 24 )
95
+ resource_file = "security_group"
96
+
97
+ replacements = REPLACEMENT_VALUES .copy ()
98
+ replacements ["SECURITY_GROUP_NAME" ] = resource_name
99
+ replacements ["VPC_ID" ] = vpc_id
100
+ replacements ["SECURITY_GROUP_DESCRIPTION" ] = "TestSecurityGroup"
101
+
102
+ marker = request .node .get_closest_marker ("resource_data" )
103
+ if marker is not None :
104
+ data = marker .args [0 ]
105
+ if 'resource_file' in data :
106
+ resource_file = data ['resource_file' ]
107
+ replacements .update (data )
108
+ if 'tag_key' in data :
109
+ replacements ["TAG_KEY" ] = data ["tag_key" ]
110
+ if 'tag_value' in data :
111
+ replacements ["TAG_VALUE" ] = data ["tag_value" ]
112
+
113
+ # Load Security Group CR
114
+ resource_data = load_ec2_resource (
115
+ resource_file ,
116
+ additional_replacements = replacements ,
117
+ )
118
+ logging .debug (resource_data )
119
+
120
+ # Create k8s resource
121
+ ref = k8s .CustomResourceReference (
122
+ CRD_GROUP , CRD_VERSION , RESOURCE_PLURAL ,
123
+ resource_name , namespace = "default" ,
124
+ )
125
+
126
+ k8s .create_custom_resource (ref , resource_data )
127
+ time .sleep (CREATE_WAIT_AFTER_SECONDS )
128
+
129
+ cr = k8s .wait_resource_consumed_by_controller (ref )
130
+ assert cr is not None
131
+ assert k8s .get_resource_exists (ref )
132
+
133
+ yield (ref , cr )
134
+
135
+ # Try to delete, if doesn't already exist
136
+ try :
137
+ _ , deleted = k8s .delete_custom_resource (ref , 3 , 10 )
138
+ assert deleted
139
+ except :
140
+ pass
141
+
87
142
@service_marker
88
143
@pytest .mark .canary
89
144
class TestSecurityGroup :
@@ -104,6 +159,75 @@ def test_create_delete(self, ec2_client, simple_security_group):
104
159
# Check Security Group no longer exists in AWS
105
160
ec2_validator .assert_security_group (resource_id , exists = False )
106
161
162
+ @pytest .mark .xfail
163
+ def test_create_with_vpc_egress_dups_default_delete (self , ec2_client , security_group_with_vpc ):
164
+ (ref , cr ) = security_group_with_vpc
165
+ resource_id = cr ["status" ]["id" ]
166
+
167
+ # Check resource is late initialized successfully (sets default egress rule)
168
+ assert k8s .wait_on_condition (ref , "ACK.ResourceSynced" , "True" , wait_periods = 5 )
169
+
170
+ # Check Security Group exists in AWS
171
+ ec2_validator = EC2Validator (ec2_client )
172
+ ec2_validator .assert_security_group (resource_id )
173
+
174
+ # Hook code should update Spec rules using data from ReadOne resp
175
+ assert len (cr ["spec" ]["egressRules" ]) == 1
176
+
177
+ # Check default egress rule present
178
+ # default egress rule will be present iff user has NOT specified their own egress rules
179
+ assert len (cr ["status" ]["rules" ]) == 1
180
+ sg_group = ec2_validator .get_security_group (resource_id )
181
+ egress_rules = sg_group ["IpPermissionsEgress" ]
182
+ assert len (egress_rules ) == 1
183
+ logging .debug (f"Default Egress rule: { str (egress_rules [0 ])} " )
184
+
185
+ # Check default egress rule data
186
+ assert egress_rules [0 ]["IpProtocol" ] == "-1"
187
+ assert egress_rules [0 ]["IpRanges" ][0 ]["CidrIp" ] == "0.0.0.0/0"
188
+
189
+ # Add a new Egress rule that "duplicates" the default via patch
190
+ new_egress_rule = {
191
+ "ipProtocol" : "-1" ,
192
+ "ipRanges" : [{
193
+ "cidrIP" : "0.0.0.0/0" ,
194
+ "description" : "Allow traffic from all IPs - test"
195
+ }]
196
+ }
197
+ patch = {"spec" : {"egressRules" :[new_egress_rule ]}}
198
+ _ = k8s .patch_custom_resource (ref , patch )
199
+
200
+ time .sleep (CREATE_WAIT_AFTER_SECONDS )
201
+
202
+ # Check resource gets into synced state
203
+ assert k8s .wait_on_condition (ref , "ACK.ResourceSynced" , "True" , wait_periods = 5 )
204
+
205
+ # assert patched state
206
+ cr = k8s .get_resource (ref )
207
+ assert len (cr ["status" ]["rules" ]) == 1
208
+
209
+ # Check egress rule exists
210
+ sg_group = ec2_validator .get_security_group (resource_id )
211
+ assert len (sg_group ["IpPermissions" ]) == 0
212
+ assert len (sg_group ["IpPermissionsEgress" ]) == 1
213
+
214
+ # Check egress rule data (i.e. ensure default egress rule removed)
215
+ assert sg_group ["IpPermissionsEgress" ][0 ]["IpProtocol" ] == "-1"
216
+ assert len (sg_group ["IpPermissionsEgress" ][0 ]["IpRanges" ]) == 1
217
+ ip_range = sg_group ["IpPermissionsEgress" ][0 ]["IpRanges" ][0 ]
218
+ assert ip_range ["CidrIp" ] == "0.0.0.0/0"
219
+ assert ip_range ["Description" ] == "Allow traffic from all IPs - test"
220
+
221
+ # Delete k8s resource
222
+ _ , deleted = k8s .delete_custom_resource (ref )
223
+ assert deleted is True
224
+
225
+ time .sleep (DELETE_WAIT_AFTER_SECONDS )
226
+
227
+ # Check Security Group no longer exists in AWS
228
+ # Deleting Security Group will also delete rules
229
+ ec2_validator .assert_security_group (resource_id , exists = False )
230
+
107
231
@pytest .mark .resource_data ({
108
232
'resource_file' : 'security_group_rule' ,
109
233
'IP_PROTOCOL' : 'tcp' ,
0 commit comments