Skip to content

Commit baeaeb3

Browse files
committed
refactor
1 parent b0ed5bd commit baeaeb3

File tree

1 file changed

+51
-117
lines changed
  • services/autoscaling/src/simcore_service_autoscaling/utils

1 file changed

+51
-117
lines changed

services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py

Lines changed: 51 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
import logging
3-
from typing import cast
43

54
from aws_library.ec2 import Resources
65
from dask_task_models_library.container_tasks.utils import parse_dask_job_id
@@ -26,6 +25,16 @@
2625
_logger = logging.getLogger(__name__)
2726

2827

28+
def _get_task_ids(task: DockerTask | DaskTask) -> tuple[UserID, ProjectID, NodeID]:
29+
if isinstance(task, DockerTask):
30+
labels = StandardSimcoreDockerLabels.from_docker_task(task)
31+
return labels.user_id, labels.project_id, labels.node_id
32+
_service_key, _service_version, user_id, project_id, node_id = parse_dask_job_id(
33+
task.task_id
34+
)
35+
return user_id, project_id, node_id
36+
37+
2938
async def post_tasks_log_message(
3039
app: FastAPI,
3140
tasks: list[DockerTask] | list[DaskTask],
@@ -35,23 +44,21 @@ async def post_tasks_log_message(
3544
) -> None:
3645
if not tasks:
3746
return
38-
if isinstance(tasks[0], DockerTask):
47+
48+
with log_catch(_logger, reraise=False):
3949
await asyncio.gather(
4050
*(
41-
_post_task_log_message_from_docker_task(
42-
app, cast(DockerTask, task), message, level
51+
_post_task_log_message(
52+
app,
53+
user_id=user_id,
54+
project_id=project_id,
55+
node_id=node_id,
56+
log=message,
57+
level=level,
4358
)
44-
for task in tasks
45-
),
46-
return_exceptions=True,
47-
)
48-
else:
49-
await asyncio.gather(
50-
*(
51-
_post_task_log_message_from_dask_task(
52-
app, cast(DaskTask, task), message, level
59+
for user_id, project_id, node_id in (
60+
_get_task_ids(task) for task in tasks
5361
)
54-
for task in tasks
5562
),
5663
return_exceptions=True,
5764
)
@@ -66,70 +73,26 @@ async def post_tasks_progress_message(
6673
) -> None:
6774
if not tasks:
6875
return
69-
if isinstance(tasks[0], DockerTask):
76+
77+
with log_catch(_logger, reraise=False):
7078
await asyncio.gather(
7179
*(
72-
_post_task_progress_message_from_docker_task(
73-
app, cast(DockerTask, task), progress, progress_type
80+
_post_task_progress_message(
81+
app,
82+
user_id=user_id,
83+
project_id=project_id,
84+
node_id=node_id,
85+
progress=progress,
86+
progress_type=progress_type,
7487
)
75-
for task in tasks
76-
),
77-
return_exceptions=True,
78-
)
79-
else:
80-
await asyncio.gather(
81-
*(
82-
_post_task_progress_message_from_dask_task(
83-
app, cast(DaskTask, task), progress, progress_type
88+
for user_id, project_id, node_id in (
89+
_get_task_ids(task) for task in tasks
8490
)
85-
for task in tasks
8691
),
8792
return_exceptions=True,
8893
)
8994

9095

91-
async def _post_task_progress_message_from_docker_task(
92-
app: FastAPI,
93-
task: DockerTask,
94-
progress: float,
95-
progress_type: ProgressType,
96-
) -> None:
97-
with log_catch(_logger, reraise=False):
98-
simcore_label_keys = StandardSimcoreDockerLabels.from_docker_task(task)
99-
await _post_task_progress_message(
100-
app,
101-
user_id=simcore_label_keys.user_id,
102-
project_id=simcore_label_keys.project_id,
103-
node_id=simcore_label_keys.node_id,
104-
progress=progress,
105-
progress_type=progress_type,
106-
)
107-
108-
109-
async def _post_task_progress_message_from_dask_task(
110-
app: FastAPI,
111-
task: DaskTask,
112-
progress: float,
113-
progress_type: ProgressType,
114-
) -> None:
115-
with log_catch(_logger, reraise=False):
116-
(
117-
_service_key,
118-
_service_version,
119-
user_id,
120-
project_id,
121-
node_id,
122-
) = parse_dask_job_id(task.task_id)
123-
await _post_task_progress_message(
124-
app,
125-
user_id=user_id,
126-
project_id=project_id,
127-
node_id=node_id,
128-
progress=progress,
129-
progress_type=progress_type,
130-
)
131-
132-
13396
async def _post_task_progress_message(
13497
app: FastAPI,
13598
*,
@@ -149,42 +112,6 @@ async def _post_task_progress_message(
149112
await post_message(app, message)
150113

151114

152-
async def _post_task_log_message_from_docker_task(
153-
app: FastAPI, task: DockerTask, log: str, level: int
154-
) -> None:
155-
with log_catch(_logger, reraise=False):
156-
simcore_label_keys = StandardSimcoreDockerLabels.from_docker_task(task)
157-
await _post_task_log_message(
158-
app,
159-
user_id=simcore_label_keys.user_id,
160-
project_id=simcore_label_keys.project_id,
161-
node_id=simcore_label_keys.node_id,
162-
log=log,
163-
level=level,
164-
)
165-
166-
167-
async def _post_task_log_message_from_dask_task(
168-
app: FastAPI, task: DaskTask, log: str, level: int
169-
) -> None:
170-
with log_catch(_logger, reraise=False):
171-
(
172-
_service_key,
173-
_service_version,
174-
user_id,
175-
project_id,
176-
node_id,
177-
) = parse_dask_job_id(task.task_id)
178-
await _post_task_log_message(
179-
app,
180-
user_id=user_id,
181-
project_id=project_id,
182-
node_id=node_id,
183-
log=log,
184-
level=level,
185-
)
186-
187-
188115
async def _post_task_log_message(
189116
app: FastAPI,
190117
*,
@@ -194,14 +121,16 @@ async def _post_task_log_message(
194121
log: str,
195122
level: int,
196123
) -> None:
124+
cluster_log = f"[cluster] {log}"
125+
_logger.log(level, cluster_log)
126+
197127
message = LoggerRabbitMessage.model_construct(
198128
user_id=user_id,
199129
project_id=project_id,
200130
node_id=node_id,
201-
messages=[f"[cluster] {log}"],
131+
messages=[cluster_log],
202132
log_level=level,
203133
)
204-
_logger.log(level, message)
205134
await post_message(app, message)
206135

207136

@@ -213,24 +142,29 @@ async def _create_autoscaling_status_message(
213142
) -> RabbitAutoscalingStatusMessage:
214143
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
215144

216-
origin = "unknown"
217145
if app_settings.AUTOSCALING_NODES_MONITORING:
218-
origin = f"dynamic:node_labels={app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS}"
146+
origin = f"dynamic:node_labels={app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS!s}"
219147
elif app_settings.AUTOSCALING_DASK:
220-
origin = f"computational:scheduler_url={app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL}"
148+
origin = f"computational:scheduler_url={app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL!s}"
149+
else:
150+
origin = "unknown"
151+
152+
total_nodes = (
153+
len(cluster.active_nodes)
154+
+ len(cluster.drained_nodes)
155+
+ len(cluster.buffer_drained_nodes)
156+
)
157+
drained_nodes = len(cluster.drained_nodes) + len(cluster.buffer_drained_nodes)
158+
221159
return RabbitAutoscalingStatusMessage.model_construct(
222160
origin=origin,
223-
nodes_total=len(cluster.active_nodes)
224-
+ len(cluster.drained_nodes)
225-
+ len(cluster.buffer_drained_nodes),
161+
nodes_total=total_nodes,
226162
nodes_active=len(cluster.active_nodes),
227-
nodes_drained=len(cluster.drained_nodes) + len(cluster.buffer_drained_nodes),
163+
nodes_drained=drained_nodes,
228164
cluster_total_resources=cluster_total_resources.model_dump(),
229165
cluster_used_resources=cluster_used_resources.model_dump(),
230166
instances_pending=len(cluster.pending_ec2s),
231-
instances_running=len(cluster.active_nodes)
232-
+ len(cluster.drained_nodes)
233-
+ len(cluster.buffer_drained_nodes),
167+
instances_running=total_nodes,
234168
)
235169

236170

0 commit comments

Comments
 (0)