2424from ..utils import computational_scaling as utils
2525from ..utils import utils_docker , utils_ec2
2626from . import dask
27- from .auto_scaling_mode_base import BaseAutoscaling
2827from .docker import get_docker_client
2928
3029_logger = logging .getLogger (__name__ )
@@ -42,27 +41,23 @@ def _scheduler_auth(app: FastAPI) -> ClusterAuthentication:
4241 return app_settings .AUTOSCALING_DASK .DASK_SCHEDULER_AUTH
4342
4443
45- class ComputationalAutoscaling (BaseAutoscaling ):
46- @staticmethod
47- async def get_monitored_nodes (app : FastAPI ) -> list [Node ]:
44+ class ComputationalAutoscaling :
45+ async def get_monitored_nodes (self , app : FastAPI ) -> list [Node ]:
4846 return await utils_docker .get_worker_nodes (get_docker_client (app ))
4947
50- @staticmethod
51- def get_ec2_tags (app : FastAPI ) -> EC2Tags :
48+ def get_ec2_tags (self , app : FastAPI ) -> EC2Tags :
5249 app_settings = get_application_settings (app )
5350 return utils_ec2 .get_ec2_tags_computational (app_settings )
5451
55- @staticmethod
5652 def get_new_node_docker_tags (
57- app : FastAPI , ec2_instance_data : EC2InstanceData
53+ self , app : FastAPI , ec2_instance_data : EC2InstanceData
5854 ) -> dict [DockerLabelKey , str ]:
5955 assert app # nosec
6056 return {
6157 DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY : ec2_instance_data .type
6258 }
6359
64- @staticmethod
65- async def list_unrunnable_tasks (app : FastAPI ) -> list [DaskTask ]:
60+ async def list_unrunnable_tasks (self , app : FastAPI ) -> list [DaskTask ]:
6661 try :
6762 unrunnable_tasks = await dask .list_unrunnable_tasks (
6863 _scheduler_url (app ), _scheduler_auth (app )
@@ -87,18 +82,17 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]:
8782 )
8883 return []
8984
90- @staticmethod
91- def get_task_required_resources (task ) -> Resources :
85+ def get_task_required_resources (self , task ) -> Resources :
9286 return utils .resources_from_dask_task (task )
9387
94- @staticmethod
95- async def get_task_defined_instance (app : FastAPI , task ) -> InstanceTypeType | None :
88+ async def get_task_defined_instance (
89+ self , app : FastAPI , task
90+ ) -> InstanceTypeType | None :
9691 assert app # nosec
9792 return cast (InstanceTypeType | None , utils .get_task_instance_restriction (task ))
9893
99- @staticmethod
10094 async def compute_node_used_resources (
101- app : FastAPI , instance : AssociatedInstance
95+ self , app : FastAPI , instance : AssociatedInstance
10296 ) -> Resources :
10397 try :
10498 resource = await dask .get_worker_used_resources (
@@ -127,24 +121,19 @@ async def compute_node_used_resources(
127121 _logger .debug ("no resource found for %s" , f"{ instance .ec2_instance .id } " )
128122 return Resources .create_as_empty ()
129123
130- @staticmethod
131124 async def compute_cluster_used_resources (
132- app : FastAPI , instances : list [AssociatedInstance ]
125+ self , app : FastAPI , instances : list [AssociatedInstance ]
133126 ) -> Resources :
134127 list_of_used_resources : list [Resources ] = await logged_gather (
135- * (
136- ComputationalAutoscaling .compute_node_used_resources (app , i )
137- for i in instances
138- )
128+ * (self .compute_node_used_resources (app , i ) for i in instances )
139129 )
140130 counter = collections .Counter ({k : 0 for k in Resources .model_fields })
141131 for result in list_of_used_resources :
142132 counter .update (result .model_dump ())
143133 return Resources .model_validate (dict (counter ))
144134
145- @staticmethod
146135 async def compute_cluster_total_resources (
147- app : FastAPI , instances : list [AssociatedInstance ]
136+ self , app : FastAPI , instances : list [AssociatedInstance ]
148137 ) -> Resources :
149138 try :
150139 return await dask .compute_cluster_total_resources (
@@ -153,8 +142,9 @@ async def compute_cluster_total_resources(
153142 except DaskNoWorkersError :
154143 return Resources .create_as_empty ()
155144
156- @staticmethod
157- async def is_instance_active (app : FastAPI , instance : AssociatedInstance ) -> bool :
145+ async def is_instance_active (
146+ self , app : FastAPI , instance : AssociatedInstance
147+ ) -> bool :
158148 if not utils_docker .is_node_osparc_ready (instance .node ):
159149 return False
160150
@@ -163,14 +153,14 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
163153 _scheduler_url (app ), _scheduler_auth (app ), instance .ec2_instance
164154 )
165155
166- @staticmethod
167- async def is_instance_retired (app : FastAPI , instance : AssociatedInstance ) -> bool :
156+ async def is_instance_retired (
157+ self , app : FastAPI , instance : AssociatedInstance
158+ ) -> bool :
168159 if not utils_docker .is_node_osparc_ready (instance .node ):
169160 return False
170161 return await dask .is_worker_retired (
171162 _scheduler_url (app ), _scheduler_auth (app ), instance .ec2_instance
172163 )
173164
174- @staticmethod
175- async def try_retire_nodes (app : FastAPI ) -> None :
165+ async def try_retire_nodes (self , app : FastAPI ) -> None :
176166 await dask .try_retire_nodes (_scheduler_url (app ), _scheduler_auth (app ))
0 commit comments