1+ from typing import Any , Generator
2+
3+ import pytest
4+
5+ from scaleway .k8s .v1 import K8SV1API , CNI
6+ from scaleway .vpc .v2 import VpcV2API , PrivateNetwork
7+ from scaleway_core .api import ScalewayException
8+ from scaleway_core .client import Client
9+ from vcr_config import scw_vcr
10+
11+ # mypy: ignore-errors
12+
13+ @pytest .fixture (scope = "module" )
14+ @scw_vcr .use_cassette
15+ def k8s_api () -> K8SV1API :
16+ client = Client .from_config_file_and_env ()
17+ k8s_api = K8SV1API (client )
18+ return k8s_api
19+
20+ @pytest .fixture (scope = "module" )
21+ @scw_vcr .use_cassette
22+ def private_network () -> Generator [PrivateNetwork , Any , None ]:
23+ client = Client .from_config_file_and_env ()
24+ vpc_api = VpcV2API (client )
25+ vpc = vpc_api .create_vpc (enable_routing = False )
26+ pn = vpc_api .create_private_network (vpc_id = vpc .id , default_route_propagation_enabled = True )
27+ yield pn
28+ vpc_api .delete_vpc (vpc_id = vpc .id )
29+
30+
31+ def safe_wait_for_cluster (k8s_api : K8SV1API , cluster_id : str ):
32+ try :
33+ return k8s_api .wait_for_cluster (cluster_id = cluster_id )
34+ except ScalewayException as e :
35+ err_type = getattr (e , "type" , None ) or getattr (e , "error" , None )
36+ if err_type == "not_found" or "not found" in str (e ).lower ():
37+ return None
38+ raise
39+
40+ @scw_vcr .use_cassette
41+ def test_k8s_cluster_list (k8s_api : K8SV1API , private_network : PrivateNetwork ) -> None :
42+ cluster = k8s_api .create_cluster (type_ = "kapsule" , version = "1.32.7" , cni = CNI .CILIUM , description = "" , private_network_id = private_network .id )
43+ assert cluster .id is not None
44+
45+ clusters = k8s_api .list_clusters ()
46+ assert clusters .total_count >= 1
47+
48+ k8s_api .delete_cluster (cluster_id = cluster .id , with_additional_resources = True )
49+ cluster = safe_wait_for_cluster (k8s_api = k8s_api , cluster_id = cluster .id )
50+ assert cluster is None
0 commit comments