@@ -10,53 +10,65 @@ class DaskCluster(new_class("DaskCluster", "kubernetes.dask.org/v1")):
1010 scalable_spec = "worker.replicas"
1111
1212 async def worker_groups (self ) -> List [DaskWorkerGroup ]:
13- return await DaskWorkerGroup .list (
14- label_selector = f"dask.org/cluster-name={ self .name } " ,
15- namespace = self .namespace ,
16- )
13+ return [
14+ wg
15+ async for wg in DaskWorkerGroup .list (
16+ label_selector = f"dask.org/cluster-name={ self .name } " ,
17+ namespace = self .namespace ,
18+ )
19+ ]
1720
1821 async def scheduler_pod (self ) -> Pod :
1922 pods = []
2023 while not pods :
21- pods = await Pod .list (
22- label_selector = "," .join (
23- [
24- f"dask.org/cluster-name={ self .name } " ,
25- "dask.org/component=scheduler" ,
26- ]
27- ),
28- namespace = self .namespace ,
29- )
24+ pods = [
25+ pod
26+ async for pod in Pod .list (
27+ label_selector = "," .join (
28+ [
29+ f"dask.org/cluster-name={ self .name } " ,
30+ "dask.org/component=scheduler" ,
31+ ]
32+ ),
33+ namespace = self .namespace ,
34+ )
35+ ]
3036 assert len (pods ) == 1
3137 return pods [0 ]
3238
3339 async def scheduler_deployment (self ) -> Deployment :
3440 deployments = []
3541 while not deployments :
36- deployments = await Deployment .list (
37- label_selector = "," .join (
38- [
39- f"dask.org/cluster-name={ self .name } " ,
40- "dask.org/component=scheduler" ,
41- ]
42- ),
43- namespace = self .namespace ,
44- )
42+ deployments = [
43+ deployment
44+ async for deployment in Deployment .list (
45+ label_selector = "," .join (
46+ [
47+ f"dask.org/cluster-name={ self .name } " ,
48+ "dask.org/component=scheduler" ,
49+ ]
50+ ),
51+ namespace = self .namespace ,
52+ )
53+ ]
4554 assert len (deployments ) == 1
4655 return deployments [0 ]
4756
4857 async def scheduler_service (self ) -> Service :
4958 services = []
5059 while not services :
51- services = await Service .list (
52- label_selector = "," .join (
53- [
54- f"dask.org/cluster-name={ self .name } " ,
55- "dask.org/component=scheduler" ,
56- ]
57- ),
58- namespace = self .namespace ,
59- )
60+ services = [
61+ service
62+ async for service in Service .list (
63+ label_selector = "," .join (
64+ [
65+ f"dask.org/cluster-name={ self .name } " ,
66+ "dask.org/component=scheduler" ,
67+ ]
68+ ),
69+ namespace = self .namespace ,
70+ )
71+ ]
6072 assert len (services ) == 1
6173 return services [0 ]
6274
@@ -74,28 +86,34 @@ class DaskWorkerGroup(new_class("DaskWorkerGroup", "kubernetes.dask.org/v1")):
7486 scalable_spec = "worker.replicas"
7587
7688 async def pods (self ) -> List [Pod ]:
77- return await Pod .list (
78- label_selector = "," .join (
79- [
80- f"dask.org/cluster-name={ self .spec .cluster } " ,
81- "dask.org/component=worker" ,
82- f"dask.org/workergroup-name={ self .name } " ,
83- ]
84- ),
85- namespace = self .namespace ,
86- )
89+ return [
90+ pod
91+ async for pod in Pod .list (
92+ label_selector = "," .join (
93+ [
94+ f"dask.org/cluster-name={ self .spec .cluster } " ,
95+ "dask.org/component=worker" ,
96+ f"dask.org/workergroup-name={ self .name } " ,
97+ ]
98+ ),
99+ namespace = self .namespace ,
100+ )
101+ ]
87102
88103 async def deployments (self ) -> List [Deployment ]:
89- return await Deployment .list (
90- label_selector = "," .join (
91- [
92- f"dask.org/cluster-name={ self .spec .cluster } " ,
93- "dask.org/component=worker" ,
94- f"dask.org/workergroup-name={ self .name } " ,
95- ]
96- ),
97- namespace = self .namespace ,
98- )
104+ return [
105+ deployment
106+ async for deployment in Deployment .list (
107+ label_selector = "," .join (
108+ [
109+ f"dask.org/cluster-name={ self .spec .cluster } " ,
110+ "dask.org/component=worker" ,
111+ f"dask.org/workergroup-name={ self .name } " ,
112+ ]
113+ ),
114+ namespace = self .namespace ,
115+ )
116+ ]
99117
100118 async def cluster (self ) -> DaskCluster :
101119 return await DaskCluster .get (self .spec .cluster , namespace = self .namespace )
@@ -113,14 +131,17 @@ async def cluster(self) -> DaskCluster:
113131 async def pod (self ) -> Pod :
114132 pods = []
115133 while not pods :
116- pods = await Pod .list (
117- label_selector = "," .join (
118- [
119- f"dask.org/cluster-name={ self .name } " ,
120- "dask.org/component=job-runner" ,
121- ]
122- ),
123- namespace = self .namespace ,
124- )
134+ pods = [
135+ pod
136+ async for pod in Pod .list (
137+ label_selector = "," .join (
138+ [
139+ f"dask.org/cluster-name={ self .name } " ,
140+ "dask.org/component=job-runner" ,
141+ ]
142+ ),
143+ namespace = self .namespace ,
144+ )
145+ ]
125146 assert len (pods ) == 1
126147 return pods [0 ]
0 commit comments