|
1 | | -import collections |
2 | | -import logging |
3 | | -from typing import cast |
4 | | - |
5 | | -from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources |
6 | | -from fastapi import FastAPI |
7 | | -from models_library.clusters import ClusterAuthentication |
8 | | -from models_library.docker import ( |
9 | | - DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, |
10 | | - DockerLabelKey, |
11 | | -) |
12 | | -from models_library.generated_models.docker_rest_api import Node |
13 | | -from pydantic import AnyUrl, ByteSize |
14 | | -from servicelib.utils import logged_gather |
15 | | -from types_aiobotocore_ec2.literals import InstanceTypeType |
16 | | - |
17 | | -from ..core.errors import ( |
18 | | - DaskNoWorkersError, |
19 | | - DaskSchedulerNotFoundError, |
20 | | - DaskWorkerNotFoundError, |
21 | | -) |
22 | | -from ..core.settings import get_application_settings |
23 | | -from ..models import AssociatedInstance, DaskTask |
24 | | -from ..utils import computational_scaling as utils |
25 | | -from ..utils import utils_docker, utils_ec2 |
26 | | -from . import dask |
27 | | -from .docker import get_docker_client |
28 | | - |
29 | | -_logger = logging.getLogger(__name__) |
30 | | - |
31 | | - |
32 | | -def _scheduler_url(app: FastAPI) -> AnyUrl: |
33 | | - app_settings = get_application_settings(app) |
34 | | - assert app_settings.AUTOSCALING_DASK # nosec |
35 | | - return app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL |
36 | | - |
37 | | - |
38 | | -def _scheduler_auth(app: FastAPI) -> ClusterAuthentication: |
39 | | - app_settings = get_application_settings(app) |
40 | | - assert app_settings.AUTOSCALING_DASK # nosec |
41 | | - return app_settings.AUTOSCALING_DASK.DASK_SCHEDULER_AUTH |
42 | | - |
43 | | - |
44 | | -class ComputationalAutoscaling: |
45 | | - async def get_monitored_nodes(self, app: FastAPI) -> list[Node]: |
46 | | - return await utils_docker.get_worker_nodes(get_docker_client(app)) |
47 | | - |
48 | | - def get_ec2_tags(self, app: FastAPI) -> EC2Tags: |
49 | | - app_settings = get_application_settings(app) |
50 | | - return utils_ec2.get_ec2_tags_computational(app_settings) |
51 | | - |
52 | | - def get_new_node_docker_tags( |
53 | | - self, app: FastAPI, ec2_instance_data: EC2InstanceData |
54 | | - ) -> dict[DockerLabelKey, str]: |
55 | | - assert app # nosec |
56 | | - return { |
57 | | - DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: ec2_instance_data.type |
58 | | - } |
59 | | - |
60 | | - async def list_unrunnable_tasks(self, app: FastAPI) -> list[DaskTask]: |
61 | | - try: |
62 | | - unrunnable_tasks = await dask.list_unrunnable_tasks( |
63 | | - _scheduler_url(app), _scheduler_auth(app) |
64 | | - ) |
65 | | - # NOTE: any worker "processing" more than 1 task means that the other tasks are queued! |
66 | | - # NOTE: that is not necessarily true, in cases where 1 worker takes multiple tasks?? (osparc.io) |
67 | | - processing_tasks_by_worker = await dask.list_processing_tasks_per_worker( |
68 | | - _scheduler_url(app), _scheduler_auth(app) |
69 | | - ) |
70 | | - queued_tasks = [] |
71 | | - for tasks in processing_tasks_by_worker.values(): |
72 | | - queued_tasks += tasks[1:] |
73 | | - _logger.debug( |
74 | | - "found %s pending tasks and %s potentially queued tasks", |
75 | | - len(unrunnable_tasks), |
76 | | - len(queued_tasks), |
77 | | - ) |
78 | | - return unrunnable_tasks + queued_tasks |
79 | | - except DaskSchedulerNotFoundError: |
80 | | - _logger.warning( |
81 | | - "No dask scheduler found. TIP: Normal during machine startup." |
82 | | - ) |
83 | | - return [] |
84 | | - |
85 | | - def get_task_required_resources(self, task) -> Resources: |
86 | | - return utils.resources_from_dask_task(task) |
87 | | - |
88 | | - async def get_task_defined_instance( |
89 | | - self, app: FastAPI, task |
90 | | - ) -> InstanceTypeType | None: |
91 | | - assert app # nosec |
92 | | - return cast(InstanceTypeType | None, utils.get_task_instance_restriction(task)) |
93 | | - |
94 | | - async def compute_node_used_resources( |
95 | | - self, app: FastAPI, instance: AssociatedInstance |
96 | | - ) -> Resources: |
97 | | - try: |
98 | | - resource = await dask.get_worker_used_resources( |
99 | | - _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance |
100 | | - ) |
101 | | - if resource == Resources.create_as_empty(): |
102 | | - num_results_in_memory = ( |
103 | | - await dask.get_worker_still_has_results_in_memory( |
104 | | - _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance |
105 | | - ) |
106 | | - ) |
107 | | - if num_results_in_memory > 0: |
108 | | - _logger.debug( |
109 | | - "found %s for %s", |
110 | | - f"{num_results_in_memory=}", |
111 | | - f"{instance.ec2_instance.id}", |
112 | | - ) |
113 | | - # NOTE: this is a trick to consider the node still useful |
114 | | - return Resources(cpus=0, ram=ByteSize(1024 * 1024 * 1024)) |
115 | | - |
116 | | - _logger.debug( |
117 | | - "found %s for %s", f"{resource=}", f"{instance.ec2_instance.id}" |
118 | | - ) |
119 | | - return resource |
120 | | - except (DaskWorkerNotFoundError, DaskNoWorkersError): |
121 | | - _logger.debug("no resource found for %s", f"{instance.ec2_instance.id}") |
122 | | - return Resources.create_as_empty() |
123 | | - |
124 | | - async def compute_cluster_used_resources( |
125 | | - self, app: FastAPI, instances: list[AssociatedInstance] |
126 | | - ) -> Resources: |
127 | | - list_of_used_resources: list[Resources] = await logged_gather( |
128 | | - *(self.compute_node_used_resources(app, i) for i in instances) |
129 | | - ) |
130 | | - counter = collections.Counter({k: 0 for k in Resources.model_fields}) |
131 | | - for result in list_of_used_resources: |
132 | | - counter.update(result.model_dump()) |
133 | | - return Resources.model_validate(dict(counter)) |
134 | | - |
135 | | - async def compute_cluster_total_resources( |
136 | | - self, app: FastAPI, instances: list[AssociatedInstance] |
137 | | - ) -> Resources: |
138 | | - try: |
139 | | - return await dask.compute_cluster_total_resources( |
140 | | - _scheduler_url(app), _scheduler_auth(app), instances |
141 | | - ) |
142 | | - except DaskNoWorkersError: |
143 | | - return Resources.create_as_empty() |
144 | | - |
145 | | - async def is_instance_active( |
146 | | - self, app: FastAPI, instance: AssociatedInstance |
147 | | - ) -> bool: |
148 | | - if not utils_docker.is_node_osparc_ready(instance.node): |
149 | | - return False |
150 | | - |
151 | | - # now check if dask-scheduler/dask-worker is available and running |
152 | | - return await dask.is_worker_connected( |
153 | | - _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance |
154 | | - ) |
155 | | - |
156 | | - async def is_instance_retired( |
157 | | - self, app: FastAPI, instance: AssociatedInstance |
158 | | - ) -> bool: |
159 | | - if not utils_docker.is_node_osparc_ready(instance.node): |
160 | | - return False |
161 | | - return await dask.is_worker_retired( |
162 | | - _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance |
163 | | - ) |
164 | | - |
165 | | - async def try_retire_nodes(self, app: FastAPI) -> None: |
166 | | - await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app)) |
0 commit comments