Skip to content

Commit 06ed666

Browse files
GitHKAndrei Neagupcrespov
authored
♻️ observation cycle is skipped when error detected (ITISFoundation#3195)
* observation cycle is skipped when error detected Co-authored-by: Andrei Neagu <[email protected]> Co-authored-by: Pedro Crespo-Valero <[email protected]>
1 parent 3185694 commit 06ed666

File tree

4 files changed

+180
-20
lines changed

4 files changed

+180
-20
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/task.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,21 @@ def _enqueue_observation_from_service_name(self, service_name: str) -> None:
353353
async def _run_trigger_observation_queue_task(self) -> None:
354354
"""generates events at regular time interval"""
355355

356-
async def observing_single_service(service_name: str) -> None:
356+
async def _observing_single_service(service_name: str) -> None:
357357
scheduler_data: SchedulerData = self._to_observe[service_name]
358+
359+
# NOTE: ANE: not very readable should be refactored
360+
if (
361+
scheduler_data.dynamic_sidecar.status.current
362+
== DynamicSidecarStatus.FAILING
363+
):
364+
# Nothing will be done if there is an error while interacting
365+
# with the sidecar.
366+
# It makes no sense to continuously occupy resources or create
367+
# issues due to high request to components like the `docker damon`
368+
# and the `storage service`.
369+
return
370+
358371
scheduler_data_copy: SchedulerData = deepcopy(scheduler_data)
359372
try:
360373
await _apply_observation_cycle(self.app, self, scheduler_data)
@@ -402,7 +415,7 @@ async def observing_single_service(service_name: str) -> None:
402415
self._service_observation_task[
403416
service_name
404417
] = observation_task = asyncio.create_task(
405-
observing_single_service(service_name),
418+
_observing_single_service(service_name),
406419
name=f"observe_{service_name}",
407420
)
408421
observation_task.add_done_callback(

services/director-v2/tests/unit/conftest.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from models_library.services import ServiceKeyVersion
3333
from pydantic.types import NonNegativeInt
3434
from pytest import MonkeyPatch
35+
from pytest_mock.plugin import MockerFixture
3536
from pytest_simcore.helpers.typing_env import EnvVarsDict
3637
from settings_library.s3 import S3Settings
3738
from simcore_sdk.node_ports_v2 import FileLinkType
@@ -45,6 +46,7 @@
4546
from simcore_service_director_v2.models.schemas.dynamic_services import (
4647
SchedulerData,
4748
ServiceDetails,
49+
ServiceState,
4850
)
4951
from yarl import URL
5052

@@ -379,3 +381,21 @@ def mocked_catalog_service_api(
379381
def caplog_info_level(caplog: LogCaptureFixture) -> Iterable[LogCaptureFixture]:
380382
with caplog.at_level(logging.INFO):
381383
yield caplog
384+
385+
386+
@pytest.fixture
387+
def mock_docker_api(mocker: MockerFixture) -> None:
388+
mocker.patch(
389+
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler.task.get_dynamic_sidecars_to_observe",
390+
autospec=True,
391+
return_value=[],
392+
)
393+
mocker.patch(
394+
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler.task.are_all_services_present",
395+
autospec=True,
396+
return_value=True,
397+
)
398+
mocker.patch(
399+
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler.task.get_dynamic_sidecar_state",
400+
return_value=(ServiceState.PENDING, ""),
401+
)

services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -203,24 +203,6 @@ def mocked_api_client(scheduler_data: SchedulerData) -> Iterator[MockRouter]:
203203
yield mock
204204

205205

206-
@pytest.fixture
207-
def mock_docker_api(mocker: MockerFixture) -> None:
208-
mocker.patch(
209-
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler.task.get_dynamic_sidecars_to_observe",
210-
autospec=True,
211-
return_value=[],
212-
)
213-
mocker.patch(
214-
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler.task.are_all_services_present",
215-
autospec=True,
216-
return_value=True,
217-
)
218-
mocker.patch(
219-
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler.task.get_dynamic_sidecar_state",
220-
return_value=(ServiceState.PENDING, ""),
221-
)
222-
223-
224206
@pytest.fixture
225207
def mock_service_running(mock_docker_api, mocker: MockerFixture) -> Iterator[AsyncMock]:
226208
mock = mocker.patch(
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# pylint: disable=redefined-outer-name
2+
# pylint: disable=unused-argument
3+
4+
import asyncio
5+
import re
6+
from typing import Final, Iterator
7+
8+
import httpx
9+
import pytest
10+
import respx
11+
from _pytest.monkeypatch import MonkeyPatch
12+
from fastapi import FastAPI
13+
from pytest_simcore.helpers.typing_env import EnvVarsDict
14+
from pytest_simcore.helpers.utils_envs import setenvs_from_dict
15+
from respx.router import MockRouter
16+
from simcore_service_director_v2.models.schemas.dynamic_services.scheduler import (
17+
SchedulerData,
18+
)
19+
from simcore_service_director_v2.modules.dynamic_sidecar.scheduler.events import (
20+
REGISTERED_EVENTS,
21+
DynamicSchedulerEvent,
22+
)
23+
from simcore_service_director_v2.modules.dynamic_sidecar.scheduler.task import (
24+
DynamicSidecarsScheduler,
25+
)
26+
27+
SCHEDULER_INTERVAL_SECONDS: Final[float] = 0.1
28+
29+
# FIXTURES
30+
31+
32+
@pytest.fixture
33+
def mock_env(
34+
mock_env: EnvVarsDict,
35+
monkeypatch: MonkeyPatch,
36+
simcore_services_network_name: str,
37+
docker_swarm: None,
38+
mock_docker_api: None,
39+
) -> None:
40+
disabled_services_envs = {
41+
"S3_ENDPOINT": "",
42+
"S3_ACCESS_KEY": "",
43+
"S3_SECRET_KEY": "",
44+
"S3_BUCKET_NAME": "",
45+
"POSTGRES_HOST": "",
46+
"POSTGRES_USER": "",
47+
"POSTGRES_PASSWORD": "",
48+
"POSTGRES_DB": "",
49+
}
50+
setenvs_from_dict(monkeypatch, disabled_services_envs)
51+
52+
monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED", "true")
53+
monkeypatch.setenv(
54+
"DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS",
55+
f"{SCHEDULER_INTERVAL_SECONDS}",
56+
)
57+
58+
59+
@pytest.fixture
60+
def scheduler_data(scheduler_data_from_http_request: SchedulerData) -> SchedulerData:
61+
return scheduler_data_from_http_request
62+
63+
64+
@pytest.fixture
65+
def mock_containers_docker_status(
66+
scheduler_data: SchedulerData,
67+
) -> Iterator[MockRouter]:
68+
service_endpoint = scheduler_data.dynamic_sidecar.endpoint
69+
with respx.mock as mock:
70+
mock.get(
71+
re.compile(
72+
rf"^http://{scheduler_data.service_name}:{scheduler_data.dynamic_sidecar.port}/v1/containers\?only_status=true"
73+
),
74+
name="containers_docker_status",
75+
).mock(httpx.Response(200, json={}))
76+
mock.get(f"{service_endpoint}/health", name="is_healthy").respond(
77+
json=dict(is_healthy=True)
78+
)
79+
80+
yield mock
81+
82+
83+
@pytest.fixture
84+
def scheduler(
85+
mock_containers_docker_status: MockRouter, minimal_app: FastAPI
86+
) -> DynamicSidecarsScheduler:
87+
return minimal_app.state.dynamic_sidecar_scheduler
88+
89+
90+
class ACounter:
91+
def __init__(self, start: int = 0) -> None:
92+
self.start = start
93+
self.count = start
94+
95+
def increment(self) -> None:
96+
self.count += 1
97+
98+
99+
@pytest.fixture
100+
def mocked_dynamic_scheduler_events() -> ACounter:
101+
counter = ACounter()
102+
103+
class AlwaysTriggersDynamicSchedulerEvent(DynamicSchedulerEvent):
104+
@classmethod
105+
async def will_trigger(
106+
cls, app: FastAPI, scheduler_data: SchedulerData
107+
) -> bool:
108+
return True
109+
110+
@classmethod
111+
async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
112+
counter.increment()
113+
raise RuntimeError("Failed as planned")
114+
115+
test_defined_scheduler_events: list[type[DynamicSchedulerEvent]] = [
116+
AlwaysTriggersDynamicSchedulerEvent
117+
]
118+
119+
# replace REGISTERED EVENTS
120+
REGISTERED_EVENTS.clear()
121+
for event in test_defined_scheduler_events:
122+
REGISTERED_EVENTS.append(event)
123+
124+
return counter
125+
126+
127+
# TESTS
128+
129+
130+
async def test_skip_observation_cycle_after_error(
131+
minimal_app: FastAPI,
132+
scheduler: DynamicSidecarsScheduler,
133+
scheduler_data: SchedulerData,
134+
mocked_dynamic_scheduler_events: ACounter,
135+
):
136+
# add a task, emulate an error make sure no observation cycle is
137+
# being triggered again
138+
assert mocked_dynamic_scheduler_events.count == 0
139+
await scheduler.add_service(scheduler_data)
140+
141+
# ensure observation cycle triggers a lot
142+
await asyncio.sleep(SCHEDULER_INTERVAL_SECONDS * 10)
143+
# only expect the event to be triggered once, when it raised
144+
# an error and no longer trigger again
145+
assert mocked_dynamic_scheduler_events.count == 1

0 commit comments

Comments
 (0)