1
+ import time
1
2
import boto3
2
3
import botocore
3
4
import pytest
@@ -63,6 +64,26 @@ def clean_all_integ_buckets():
63
64
clean_bucket (bucket .name , s3_client )
64
65
65
66
67
+ def _delete_unused_network_interface_by_subnet (ec2_client , subnet_id ):
68
+ """Deletes unused network interface under the provided subnet"""
69
+ paginator = ec2_client .get_paginator ("describe_network_interfaces" )
70
+ response_iterator = paginator .paginate (
71
+ Filters = [
72
+ {"Name" : "subnet-id" , "Values" : [subnet_id ]},
73
+ {"Name" : "status" , "Values" : ["available" ]},
74
+ ]
75
+ )
76
+ network_interface_ids = []
77
+ for page in response_iterator :
78
+ network_interface_ids += [ni ["NetworkInterfaceId" ] for ni in page ["NetworkInterfaces" ]]
79
+
80
+ for ni_id in network_interface_ids :
81
+ ec2_client .delete_network_interface (NetworkInterfaceId = ni_id )
82
+ time .sleep (0.5 )
83
+
84
+ LOG .info ("Deleted %s unused network interfaces under subnet %s" , len (network_interface_ids ), subnet_id )
85
+
86
+
66
87
@pytest .fixture ()
67
88
def setup_companion_stack_once (tmpdir_factory , get_prefix ):
68
89
tests_integ_dir = Path (__file__ ).resolve ().parents [1 ]
@@ -74,6 +95,15 @@ def setup_companion_stack_once(tmpdir_factory, get_prefix):
74
95
companion_stack = Stack (stack_name , companion_stack_tempalte_path , cfn_client , output_dir )
75
96
companion_stack .create_or_update (_stack_exists (stack_name ))
76
97
98
+ ec2_client = ClientProvider ().ec2_client
99
+ precreated_subnet_ids = [
100
+ resource ["PhysicalResourceId" ]
101
+ for resource in companion_stack .stack_resources ["StackResourceSummaries" ]
102
+ if resource ["LogicalResourceId" ].startswith ("PreCreatedSubnet" )
103
+ ]
104
+ for subnet_id in precreated_subnet_ids :
105
+ _delete_unused_network_interface_by_subnet (ec2_client , subnet_id )
106
+
77
107
78
108
@pytest .fixture ()
79
109
def get_serverless_application_repository_app ():
0 commit comments