17
17
from lightkube import Client
18
18
from lightkube .models .meta_v1 import ObjectMeta
19
19
from lightkube .resources .apps_v1 import StatefulSet
20
- from lightkube .resources .core_v1 import PersistentVolume , PersistentVolumeClaim , Pod
20
+ from lightkube .resources .core_v1 import Endpoints , PersistentVolume , PersistentVolumeClaim , Pod
21
21
from pytest_operator .plugin import OpsTest
22
22
from tenacity import RetryError , Retrying , retry , stop_after_attempt , stop_after_delay , wait_fixed
23
23
@@ -122,6 +122,7 @@ async def deploy_and_scale_mysql(
122
122
mysql_application_name : str = MYSQL_DEFAULT_APP_NAME ,
123
123
num_units : int = 3 ,
124
124
model : Optional [Model ] = None ,
125
+ cluster_name : str = CLUSTER_NAME ,
125
126
) -> str :
126
127
"""Deploys and scales the mysql application charm.
127
128
@@ -132,6 +133,7 @@ async def deploy_and_scale_mysql(
132
133
mysql_application_name: The name of the mysql application if it is to be deployed
133
134
num_units: The number of units to deploy
134
135
model: The model to deploy the mysql application to
136
+ cluster_name: The name of the mysql cluster
135
137
"""
136
138
application_name = get_application_name (ops_test , "mysql" )
137
139
if not model :
@@ -150,7 +152,7 @@ async def deploy_and_scale_mysql(
150
152
# Cache the built charm to avoid rebuilding it between tests
151
153
mysql_charm = charm
152
154
153
- config = {"cluster-name" : CLUSTER_NAME , "profile" : "testing" }
155
+ config = {"cluster-name" : cluster_name , "profile" : "testing" }
154
156
resources = {"mysql-image" : METADATA ["resources" ]["mysql-image" ]["upstream-source" ]}
155
157
156
158
async with ops_test .fast_forward ("60s" ):
@@ -177,15 +179,21 @@ async def deploy_and_scale_mysql(
177
179
return mysql_application_name
178
180
179
181
180
- async def deploy_and_scale_application (ops_test : OpsTest ) -> str :
182
+ async def deploy_and_scale_application (
183
+ ops_test : OpsTest ,
184
+ check_for_existing_application : bool = True ,
185
+ test_application_name : str = APPLICATION_DEFAULT_APP_NAME ,
186
+ ) -> str :
181
187
"""Deploys and scales the test application charm.
182
188
183
189
Args:
184
190
ops_test: The ops test framework
191
+ check_for_existing_application: Whether to check for existing test applications
192
+ test_application_name: Name of test application to be deployed
185
193
"""
186
- application_name = get_application_name (ops_test , APPLICATION_DEFAULT_APP_NAME )
194
+ application_name = get_application_name (ops_test , test_application_name )
187
195
188
- if application_name :
196
+ if check_for_existing_application and application_name :
189
197
if len (ops_test .model .applications [application_name ].units ) != 1 :
190
198
async with ops_test .fast_forward ("60s" ):
191
199
await scale_application (ops_test , application_name , 1 )
@@ -195,22 +203,22 @@ async def deploy_and_scale_application(ops_test: OpsTest) -> str:
195
203
async with ops_test .fast_forward ("60s" ):
196
204
await ops_test .model .deploy (
197
205
APPLICATION_DEFAULT_APP_NAME ,
198
- application_name = APPLICATION_DEFAULT_APP_NAME ,
206
+ application_name = test_application_name ,
199
207
num_units = 1 ,
200
208
channel = "latest/edge" ,
201
209
202
210
)
203
211
204
212
await ops_test .model .wait_for_idle (
205
- apps = [APPLICATION_DEFAULT_APP_NAME ],
213
+ apps = [test_application_name ],
206
214
status = "waiting" ,
207
215
raise_on_blocked = True ,
208
216
timeout = TIMEOUT ,
209
217
)
210
218
211
- assert len (ops_test .model .applications [APPLICATION_DEFAULT_APP_NAME ].units ) == 1
219
+ assert len (ops_test .model .applications [test_application_name ].units ) == 1
212
220
213
- return APPLICATION_DEFAULT_APP_NAME
221
+ return test_application_name
214
222
215
223
216
224
async def relate_mysql_and_application (
@@ -223,13 +231,27 @@ async def relate_mysql_and_application(
223
231
mysql_application_name: The mysql charm application name
224
232
application_name: The continuous writes test charm application name
225
233
"""
226
- if is_relation_joined (ops_test , "database" , "database" ):
234
+ if is_relation_joined (
235
+ ops_test ,
236
+ "database" ,
237
+ "database" ,
238
+ application_one = mysql_application_name ,
239
+ application_two = application_name ,
240
+ ):
227
241
return
228
242
229
243
await ops_test .model .relate (
230
244
f"{ application_name } :database" , f"{ mysql_application_name } :database"
231
245
)
232
- await ops_test .model .block_until (lambda : is_relation_joined (ops_test , "database" , "database" ))
246
+ await ops_test .model .block_until (
247
+ lambda : is_relation_joined (
248
+ ops_test ,
249
+ "database" ,
250
+ "database" ,
251
+ application_one = mysql_application_name ,
252
+ application_two = application_name ,
253
+ )
254
+ )
233
255
234
256
await ops_test .model .wait_for_idle (
235
257
apps = [mysql_application_name , application_name ],
@@ -669,3 +691,15 @@ def delete_pod(ops_test: OpsTest, unit: Unit) -> None:
669
691
],
670
692
check = True ,
671
693
)
694
+
695
+
696
+ def get_endpoint_addresses (ops_test : OpsTest , endpoint_name : str ) -> list [str ]:
697
+ """Retrieve the addresses selected by a K8s endpoint."""
698
+ client = lightkube .Client ()
699
+ endpoint = client .get (
700
+ Endpoints ,
701
+ namespace = ops_test .model .info .name ,
702
+ name = endpoint_name ,
703
+ )
704
+
705
+ return [address .ip for subset in endpoint .subsets for address in subset .addresses ]
0 commit comments