Skip to content

Commit 0890bfe

Browse files
authored
♻️ [1/3] dynamic scheduler structure refactor round (ITISFoundation#3698)
1 parent e158d98 commit 0890bfe

File tree

6 files changed

+182
-170
lines changed

6 files changed

+182
-170
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import asyncio
2+
import logging
3+
from copy import deepcopy
4+
from math import floor
5+
6+
from fastapi import FastAPI
7+
from servicelib.error_codes import create_error_code
8+
9+
from ....core.settings import (
10+
DynamicServicesSchedulerSettings,
11+
DynamicServicesSettings,
12+
DynamicSidecarSettings,
13+
)
14+
from ....models.schemas.dynamic_services import (
15+
DynamicSidecarStatus,
16+
SchedulerData,
17+
ServiceName,
18+
)
19+
from ..docker_api import (
20+
are_sidecar_and_proxy_services_present,
21+
is_dynamic_sidecar_stack_missing,
22+
update_scheduler_data_label,
23+
)
24+
from ..errors import GenericDockerError
25+
from ._utils import attempt_pod_removal_and_data_saving
26+
from .events import REGISTERED_EVENTS
27+
28+
logger = logging.getLogger(__name__)
29+
30+
31+
async def _apply_observation_cycle(
32+
scheduler: "DynamicSidecarsScheduler", scheduler_data: SchedulerData
33+
) -> None:
34+
"""
35+
fetches status for service and then processes all the registered events
36+
and updates the status back
37+
"""
38+
app: FastAPI = scheduler.app
39+
dynamic_services_settings: DynamicServicesSettings = (
40+
app.state.settings.DYNAMIC_SERVICES
41+
)
42+
# TODO: PC-> ANE: custom settings are frozen. in principle, no need to create copies.
43+
initial_status = deepcopy(scheduler_data.dynamic_sidecar.status)
44+
45+
if ( # do not refactor, second part of "and condition" is skipped most times
46+
scheduler_data.dynamic_sidecar.were_containers_created
47+
and not await are_sidecar_and_proxy_services_present(
48+
node_uuid=scheduler_data.node_uuid,
49+
dynamic_sidecar_settings=dynamic_services_settings.DYNAMIC_SIDECAR,
50+
)
51+
):
52+
# NOTE: once marked for removal the observation cycle needs
53+
# to continue in order for the service to be removed
54+
logger.warning(
55+
"Removing service %s from observation", scheduler_data.service_name
56+
)
57+
await scheduler.mark_service_for_removal(
58+
node_uuid=scheduler_data.node_uuid,
59+
can_save=scheduler_data.dynamic_sidecar.were_containers_created,
60+
)
61+
62+
for dynamic_scheduler_event in REGISTERED_EVENTS:
63+
if await dynamic_scheduler_event.will_trigger(
64+
app=app, scheduler_data=scheduler_data
65+
):
66+
# event.action will apply changes to the output_scheduler_data
67+
await dynamic_scheduler_event.action(app, scheduler_data)
68+
69+
# check if the status of the services has changed from OK
70+
if initial_status != scheduler_data.dynamic_sidecar.status:
71+
logger.info(
72+
"Service %s overall status changed to %s",
73+
scheduler_data.service_name,
74+
scheduler_data.dynamic_sidecar.status,
75+
)
76+
77+
78+
def _trigger_every_30_seconds(observation_counter: int, wait_interval: float) -> bool:
79+
# divisor to figure out if 30 seconds have passed based on the cycle count
80+
modulo_divisor = max(1, int(floor(30 / wait_interval)))
81+
return observation_counter % modulo_divisor == 0
82+
83+
84+
async def observing_single_service(
85+
scheduler: "DynamicSidecarsScheduler",
86+
service_name: ServiceName,
87+
scheduler_data: SchedulerData,
88+
dynamic_sidecar_settings: DynamicSidecarSettings,
89+
dynamic_scheduler: DynamicServicesSchedulerSettings,
90+
) -> None:
91+
app: FastAPI = scheduler.app
92+
93+
if scheduler_data.dynamic_sidecar.status.current == DynamicSidecarStatus.FAILING:
94+
# potential use-cases:
95+
# 1. service failed on start -> it can be removed safely
96+
# 2. service must be deleted -> it can be removed safely
97+
# 3. service started and failed while running (either
98+
# dy-sidecar, dy-proxy, or containers) -> it cannot be removed safely
99+
# 4. service started, and failed on closing -> it cannot be removed safely
100+
101+
if scheduler_data.dynamic_sidecar.wait_for_manual_intervention_after_error:
102+
# use-cases: 3, 4
103+
# Since user data is important and must be saved, take no further
104+
# action and wait for manual intervention from support.
105+
106+
# After manual intervention service can now be removed
107+
# from tracking.
108+
109+
if (
110+
# NOTE: do not change below order, reduces pressure on the
111+
# docker swarm engine API.
112+
_trigger_every_30_seconds(
113+
scheduler._observation_counter, # pylint:disable=protected-access
114+
dynamic_scheduler.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS,
115+
)
116+
and await is_dynamic_sidecar_stack_missing(
117+
scheduler_data.node_uuid, dynamic_sidecar_settings
118+
)
119+
):
120+
# if both proxy and sidecar ar missing at this point it
121+
# is safe to assume that user manually removed them from
122+
# Portainer after cleaning up.
123+
124+
# NOTE: saving will fail since there is no dy-sidecar,
125+
# and the save was taken care of by support. Disabling it.
126+
scheduler_data.dynamic_sidecar.service_removal_state.can_save = False
127+
await attempt_pod_removal_and_data_saving(app, scheduler_data)
128+
129+
return
130+
131+
# use-cases: 1, 2
132+
# Cleanup all resources related to the dynamic-sidecar.
133+
await attempt_pod_removal_and_data_saving(app, scheduler_data)
134+
return
135+
136+
scheduler_data_copy: SchedulerData = deepcopy(scheduler_data)
137+
try:
138+
await _apply_observation_cycle(scheduler, scheduler_data)
139+
logger.debug("completed observation cycle of %s", f"{service_name=}")
140+
except asyncio.CancelledError: # pylint: disable=try-except-raise
141+
raise # pragma: no cover
142+
except Exception as e: # pylint: disable=broad-except
143+
service_name = scheduler_data.service_name
144+
145+
# With unhandled errors, let's generate and ID and send it to the end-user
146+
# so that we can trace the logs and debug the issue.
147+
148+
error_code = create_error_code(e)
149+
logger.exception(
150+
"Observation of %s unexpectedly failed [%s]",
151+
f"{service_name=} ",
152+
f"{error_code}",
153+
extra={"error_code": error_code},
154+
)
155+
scheduler_data.dynamic_sidecar.status.update_failing_status(
156+
# This message must be human-friendly
157+
f"Upss! This service ({service_name}) unexpectedly failed",
158+
error_code,
159+
)
160+
finally:
161+
if scheduler_data_copy != scheduler_data:
162+
try:
163+
await update_scheduler_data_label(scheduler_data)
164+
except GenericDockerError as e:
165+
logger.warning("Skipped labels update, please check:\n %s", f"{e}")

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task_utils.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/task.py

Lines changed: 9 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
import functools
1919
import logging
2020
from asyncio import Lock, Queue, Task, sleep
21-
from copy import deepcopy
2221
from dataclasses import dataclass, field
23-
from math import floor
2422
from typing import Optional, Union
2523

2624
from fastapi import FastAPI
@@ -30,7 +28,6 @@
3028
from models_library.service_settings_labels import RestartPolicy
3129
from models_library.users import UserID
3230
from pydantic import AnyHttpUrl
33-
from servicelib.error_codes import create_error_code
3431

3532
from ....core.settings import DynamicServicesSchedulerSettings, DynamicSidecarSettings
3633
from ....models.domains.dynamic_services import RetrieveDataOutEnveloped
@@ -44,7 +41,6 @@
4441
from ..docker_api import (
4542
get_dynamic_sidecar_state,
4643
get_dynamic_sidecars_to_observe,
47-
is_dynamic_sidecar_stack_missing,
4844
remove_pending_volume_removal_services,
4945
update_scheduler_data_label,
5046
)
@@ -53,24 +49,15 @@
5349
DockerServiceNotFoundError,
5450
DynamicSidecarError,
5551
DynamicSidecarNotFoundError,
56-
GenericDockerError,
5752
)
58-
from ._task_utils import apply_observation_cycle
59-
from ._utils import attempt_pod_removal_and_data_saving
53+
from ._core import observing_single_service
6054

6155
logger = logging.getLogger(__name__)
6256

63-
ServiceName = str
6457

6558
_DISABLED_MARK = object()
6659

6760

68-
def _trigger_every_30_seconds(observation_counter: int, wait_interval: float) -> bool:
69-
# divisor to figure out if 30 seconds have passed based on the cycle count
70-
modulo_divisor = max(1, int(floor(30 / wait_interval)))
71-
return observation_counter % modulo_divisor == 0
72-
73-
7461
@dataclass
7562
class DynamicSidecarsScheduler: # pylint: disable=too-many-instance-attributes
7663
app: FastAPI
@@ -382,94 +369,6 @@ async def _run_trigger_observation_queue_task(self) -> None:
382369
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER
383370
)
384371

385-
async def _observing_single_service(service_name: str) -> None:
386-
scheduler_data: SchedulerData = self._to_observe[service_name]
387-
388-
if (
389-
scheduler_data.dynamic_sidecar.status.current
390-
== DynamicSidecarStatus.FAILING
391-
):
392-
# potential use-cases:
393-
# 1. service failed on start -> it can be removed safely
394-
# 2. service must be deleted -> it can be removed safely
395-
# 3. service started and failed while running (either
396-
# dy-sidecar, dy-proxy, or containers) -> it cannot be removed safely
397-
# 4. service started, and failed on closing -> it cannot be removed safely
398-
399-
if (
400-
scheduler_data.dynamic_sidecar.wait_for_manual_intervention_after_error
401-
):
402-
# use-cases: 3, 4
403-
# Since user data is important and must be saved, take no further
404-
# action and wait for manual intervention from support.
405-
406-
# After manual intervention service can now be removed
407-
# from tracking.
408-
409-
if (
410-
# NOTE: do not change below order, reduces pressure on the
411-
# docker swarm engine API.
412-
_trigger_every_30_seconds(
413-
self._observation_counter,
414-
dynamic_scheduler.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS,
415-
)
416-
and await is_dynamic_sidecar_stack_missing(
417-
scheduler_data.node_uuid, dynamic_sidecar_settings
418-
)
419-
):
420-
# if both proxy and sidecar ar missing at this point it
421-
# is safe to assume that user manually removed them from
422-
# Portainer after cleaning up.
423-
424-
# NOTE: saving will fail since there is no dy-sidecar,
425-
# and the save was taken care of by support. Disabling it.
426-
scheduler_data.dynamic_sidecar.service_removal_state.can_save = (
427-
False
428-
)
429-
await attempt_pod_removal_and_data_saving(
430-
self.app, scheduler_data
431-
)
432-
433-
return
434-
435-
# use-cases: 1, 2
436-
# Cleanup all resources related to the dynamic-sidecar.
437-
await attempt_pod_removal_and_data_saving(self.app, scheduler_data)
438-
return
439-
440-
scheduler_data_copy: SchedulerData = deepcopy(scheduler_data)
441-
try:
442-
await apply_observation_cycle(self.app, self, scheduler_data)
443-
logger.debug("completed observation cycle of %s", f"{service_name=}")
444-
except asyncio.CancelledError: # pylint: disable=try-except-raise
445-
raise # pragma: no cover
446-
except Exception as e: # pylint: disable=broad-except
447-
service_name = scheduler_data.service_name
448-
449-
# With unhandled errors, let's generate and ID and send it to the end-user
450-
# so that we can trace the logs and debug the issue.
451-
452-
error_code = create_error_code(e)
453-
logger.exception(
454-
"Observation of %s unexpectedly failed [%s]",
455-
f"{service_name=} ",
456-
f"{error_code}",
457-
extra={"error_code": error_code},
458-
)
459-
scheduler_data.dynamic_sidecar.status.update_failing_status(
460-
# This message must be human-friendly
461-
f"Upss! This service ({service_name}) unexpectedly failed",
462-
error_code,
463-
)
464-
finally:
465-
if scheduler_data_copy != scheduler_data:
466-
try:
467-
await update_scheduler_data_label(scheduler_data)
468-
except GenericDockerError as e:
469-
logger.warning(
470-
"Skipped labels update, please check:\n %s", f"{e}"
471-
)
472-
473372
service_name: str
474373
while service_name := await self._trigger_observation_queue.get():
475374
logger.info("Handling observation for %s", service_name)
@@ -481,10 +380,17 @@ async def _observing_single_service(service_name: str) -> None:
481380
continue
482381

483382
if self._service_observation_task.get(service_name) is None:
383+
scheduler_data: SchedulerData = self._to_observe[service_name]
484384
self._service_observation_task[
485385
service_name
486386
] = observation_task = asyncio.create_task(
487-
_observing_single_service(service_name),
387+
observing_single_service(
388+
scheduler=self,
389+
service_name=service_name,
390+
scheduler_data=scheduler_data,
391+
dynamic_sidecar_settings=dynamic_sidecar_settings,
392+
dynamic_scheduler=dynamic_scheduler,
393+
),
488394
name=f"observe_{service_name}",
489395
)
490396
observation_task.add_done_callback(

services/director-v2/tests/unit/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ def mock_docker_api(mocker: MockerFixture) -> None:
416416
return_value=[],
417417
)
418418
mocker.patch(
419-
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._task_utils.are_sidecar_and_proxy_services_present",
419+
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core.are_sidecar_and_proxy_services_present",
420420
autospec=True,
421421
return_value=True,
422422
)

0 commit comments

Comments
 (0)