Skip to content

Commit bbf7a85

Browse files
authored
🐛 Avoids service shutdown due to swarm networking errors (ITISFoundation#3394)
1 parent 5935fd1 commit bbf7a85

File tree

9 files changed

+296
-11
lines changed

9 files changed

+296
-11
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
from datetime import datetime
3+
from typing import Optional
4+
5+
from pydantic import BaseModel, Field, NonNegativeFloat, PrivateAttr
6+
7+
log = logging.getLogger(__name__)
8+
9+
10+
class DelayedExceptionHandler(BaseModel):
11+
"""
12+
Allows to ignore an exception for an established
13+
period of time after which it is raised.
14+
15+
This use case most commonly occurs when dealing with
16+
external systems.
17+
For example, due to poor network performance or
18+
network congestion, an external system which is healthy,
19+
currently is not reachable any longer.
20+
A possible solution:
21+
- ignore exceptions for an interval in which the
22+
system usually is reachable again by not
23+
raising the error
24+
- if the error persist give up and raise it
25+
26+
Example code usage:
27+
28+
delayed_handler_external_service = DelayedExceptionHandler(
29+
delay_for=60
30+
)
31+
try:
32+
function_called_periodically_accessing_external_service()
33+
except TargetException as e:
34+
delayed_handler_external_service.try_to_raise(e)
35+
else:
36+
delayed_handler_external_service.else_reset()
37+
"""
38+
39+
_first_exception_skip: Optional[datetime] = PrivateAttr(None)
40+
_failure_counter: int = PrivateAttr(0)
41+
42+
delay_for: NonNegativeFloat = Field(
43+
description="interval of time during which exceptions are ignored"
44+
)
45+
46+
def try_to_raise(self, exception: BaseException) -> None:
47+
"""raises `exception` after `delay_for` passed from the first call"""
48+
self._failure_counter += 1
49+
50+
# first time the exception was detected
51+
if self._first_exception_skip is None:
52+
self._first_exception_skip = datetime.utcnow()
53+
54+
# raise if subsequent exception is outside of delay window
55+
elif (
56+
datetime.utcnow() - self._first_exception_skip
57+
).total_seconds() > self.delay_for:
58+
raise exception
59+
60+
# ignore if exception inside delay window
61+
log.warning("%s skip(s) of exception: %s", self._failure_counter, exception)
62+
63+
def else_reset(self) -> None:
64+
"""error no longer occurs reset tracking"""
65+
self._first_exception_skip = None
66+
self._failure_counter = 0
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from math import ceil
2+
from time import sleep
3+
from typing import Final
4+
5+
import pytest
6+
from pydantic import PositiveFloat, PositiveInt
7+
from servicelib.exception_utils import DelayedExceptionHandler
8+
9+
TOLERANCE: Final[PositiveFloat] = 0.1
10+
SLEEP_FOR: Final[PositiveFloat] = TOLERANCE * 0.1
11+
ITERATIONS: Final[PositiveInt] = int(ceil(TOLERANCE / SLEEP_FOR)) + 1
12+
13+
14+
class TargetException(Exception):
15+
pass
16+
17+
18+
def workflow(*, stop_raising_after: PositiveInt) -> int:
19+
counter = 0
20+
21+
def function_which_can_raise():
22+
nonlocal counter
23+
counter += 1
24+
25+
if counter < stop_raising_after:
26+
raise TargetException()
27+
28+
delayed_handler_external_service = DelayedExceptionHandler(delay_for=TOLERANCE)
29+
30+
def periodic_event():
31+
try:
32+
function_which_can_raise()
33+
except TargetException as e:
34+
delayed_handler_external_service.try_to_raise(e)
35+
else:
36+
delayed_handler_external_service.else_reset()
37+
38+
for _ in range(ITERATIONS):
39+
periodic_event()
40+
sleep(SLEEP_FOR)
41+
42+
return counter
43+
44+
45+
def test_workflow_passes() -> None:
46+
assert workflow(stop_raising_after=2) == ITERATIONS
47+
48+
49+
def test_workflow_raises() -> None:
50+
with pytest.raises(TargetException):
51+
workflow(stop_raising_after=ITERATIONS + 1)

services/director-v2/src/simcore_service_director_v2/core/settings.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,16 @@ class DynamicSidecarSettings(BaseCustomSettings):
275275
),
276276
)
277277

278+
DYNAMIC_SIDECAR_NETWORK_ISSUES_TOLERANCE_S: PositiveFloat = Field(
279+
1 * MINS,
280+
description=(
281+
"Connectivity between director-v2 and a dy-sidecar can be "
282+
"temporarily disrupted if network between swarm nodes has "
283+
"issues. To avoid the sidecar being marked as failed, "
284+
"allow for some time to pass before declaring it failed."
285+
),
286+
)
287+
278288
#
279289
# DEVELOPMENT ONLY config
280290
#

services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from models_library.services_resources import ServiceResourcesDict
1616
from pydantic import AnyHttpUrl, BaseModel, Extra, Field, constr, parse_obj_as
1717
from servicelib.error_codes import ErrorCodeStr
18+
from servicelib.exception_utils import DelayedExceptionHandler
1819

1920
from ..constants import (
2021
DYNAMIC_PROXY_SERVICE_PREFIX,
@@ -239,6 +240,15 @@ def compose_spec_submitted(self) -> bool:
239240
),
240241
)
241242

243+
inspect_error_handler: DelayedExceptionHandler = Field(
244+
DelayedExceptionHandler(delay_for=0),
245+
description=(
246+
"Set when the dy-sidecar can no longer be reached by the "
247+
"director-v2. If it will be possible to reach the dy-sidecar again, "
248+
"this value will be set to None."
249+
),
250+
)
251+
242252
class Config:
243253
validate_assignment = True
244254

@@ -295,6 +305,10 @@ def make(cls, node_uuid: UUID) -> "DynamicSidecarNamesHelper":
295305

296306

297307
class SchedulerData(CommonServiceDetails, DynamicSidecarServiceLabels):
308+
# TODO: ANE this object is just the context of the dy-sidecar. Should
309+
# be called like so and subcontexts for different handlers should
310+
# also be added. It will make keeping track of env vars more easily
311+
298312
service_name: ServiceName = Field(
299313
...,
300314
description="Name of the current dynamic-sidecar being observed",

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,10 @@ def get_dynamic_sidecar_client(app: FastAPI) -> DynamicSidecarClient:
400400

401401
async def get_dynamic_sidecar_service_health(
402402
app: FastAPI, scheduler_data: SchedulerData
403-
) -> None:
403+
) -> bool:
404404
api_client = get_dynamic_sidecar_client(app)
405405
service_endpoint = scheduler_data.endpoint
406406

407407
# update service health
408408
is_healthy = await api_client.is_healthy(service_endpoint)
409-
scheduler_data.dynamic_sidecar.is_available = is_healthy
409+
return is_healthy

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task_utils.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ async def apply_observation_cycle(
4141
node_uuid=scheduler_data.node_uuid,
4242
can_save=scheduler_data.dynamic_sidecar.service_removal_state.can_save,
4343
)
44-
await get_dynamic_sidecar_service_health(app, scheduler_data)
44+
45+
# TODO: ANE this can be moved to a handler in the future scheduled
46+
# to all the correct cases
47+
scheduler_data.dynamic_sidecar.is_available = (
48+
await get_dynamic_sidecar_service_health(app, scheduler_data)
49+
)
4550

4651
for dynamic_scheduler_event in REGISTERED_EVENTS:
4752
if await dynamic_scheduler_event.will_trigger(

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
195195
docker_node_id=scheduler_data.dynamic_sidecar.docker_node_id,
196196
)
197197

198-
# update service_port and assing it to the status
198+
# update service_port and assign it to the status
199199
# needed by CreateUserServices action
200200
scheduler_data.service_port = extract_service_port_from_compose_start_spec(
201201
dynamic_sidecar_service_final_spec
@@ -230,23 +230,32 @@ async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool
230230
async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
231231
dynamic_sidecar_client = get_dynamic_sidecar_client(app)
232232
dynamic_sidecar_endpoint = scheduler_data.endpoint
233+
dynamic_sidecar_settings: DynamicSidecarSettings = (
234+
app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR
235+
)
236+
scheduler_data.dynamic_sidecar.inspect_error_handler.delay_for = (
237+
dynamic_sidecar_settings.DYNAMIC_SIDECAR_NETWORK_ISSUES_TOLERANCE_S
238+
)
233239

234240
try:
235241
containers_inspect: dict[
236242
str, Any
237243
] = await dynamic_sidecar_client.containers_inspect(
238244
dynamic_sidecar_endpoint
239245
)
240-
except BaseClientHTTPError:
246+
except BaseClientHTTPError as e:
241247
were_service_containers_detected_before = (
242248
len(scheduler_data.dynamic_sidecar.containers_inspect) > 0
243249
)
244250
if were_service_containers_detected_before:
245251
# Containers disappeared after they were started.
246252
# for now just mark as error and remove the sidecar
247-
# NOTE: this is the correct place where to try and
248-
# restart them (future use case).
249-
raise
253+
254+
# NOTE: Network performance can degrade and the sidecar might
255+
# be temporarily unreachable.
256+
# Adding a delay between when the error is first seen and when the
257+
# error is raised to avoid random shutdowns of dynamic-sidecar services.
258+
scheduler_data.dynamic_sidecar.inspect_error_handler.try_to_raise(e)
250259

251260
# After the service creation it takes a bit of time for the container to start
252261
# If the same message appears in the log multiple times in a row (for the same
@@ -256,12 +265,19 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
256265
scheduler_data.service_name,
257266
)
258267
return
268+
else:
269+
scheduler_data.dynamic_sidecar.inspect_error_handler.else_reset()
259270

260271
# parse and store data from container
261272
scheduler_data.dynamic_sidecar.containers_inspect = parse_containers_inspect(
262273
containers_inspect
263274
)
264275

276+
# TODO: ANE using `were_service_containers_detected_before` together with
277+
# how many containers to expect, it can be detected if containers
278+
# died and these can be restarted. Best way to go about it is
279+
# to have a different handler trigger in this case registered for idling!
280+
265281

266282
class PrepareServicesEnvironment(DynamicSchedulerEvent):
267283
"""
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# pylint:disable=redefined-outer-name
2+
# pylint:disable=unused-argument
3+
4+
import asyncio
5+
from typing import Final
6+
7+
import pytest
8+
from _pytest.logging import LogCaptureFixture
9+
from _pytest.monkeypatch import MonkeyPatch
10+
from fastapi import FastAPI
11+
from pydantic import PositiveFloat, PositiveInt
12+
from pytest_mock import MockerFixture
13+
from pytest_simcore.helpers.typing_env import EnvVarsDict
14+
from pytest_simcore.helpers.utils_envs import setenvs_from_dict
15+
from simcore_service_director_v2.models.schemas.dynamic_services import SchedulerData
16+
from simcore_service_director_v2.models.schemas.dynamic_services.scheduler import (
17+
DockerContainerInspect,
18+
DockerStatus,
19+
)
20+
from simcore_service_director_v2.modules.dynamic_sidecar.api_client import (
21+
BaseClientHTTPError,
22+
)
23+
from simcore_service_director_v2.modules.dynamic_sidecar.scheduler import events
24+
25+
NETWORK_TOLERANCE_S: Final[PositiveFloat] = 0.1
26+
STEPS: Final[PositiveFloat] = 10
27+
SLEEP_BETWEEN_CALLS: Final[PositiveFloat] = NETWORK_TOLERANCE_S / STEPS
28+
REPEAT_COUNT: Final[PositiveInt] = STEPS + 1
29+
30+
31+
@pytest.fixture
32+
def mock_env(
33+
mock_env: EnvVarsDict,
34+
monkeypatch: MonkeyPatch,
35+
) -> None:
36+
setenvs_from_dict(
37+
monkeypatch,
38+
{
39+
"S3_ENDPOINT": "",
40+
"S3_ACCESS_KEY": "",
41+
"S3_SECRET_KEY": "",
42+
"S3_BUCKET_NAME": "",
43+
"S3_SECURE": "false",
44+
"POSTGRES_HOST": "",
45+
"POSTGRES_USER": "",
46+
"POSTGRES_PASSWORD": "",
47+
"POSTGRES_DB": "",
48+
"DYNAMIC_SIDECAR_NETWORK_ISSUES_TOLERANCE_S": f"{NETWORK_TOLERANCE_S}",
49+
},
50+
)
51+
52+
53+
@pytest.fixture
54+
def mock_dynamic_sidecar_client_always_fail(mocker: MockerFixture) -> None:
55+
class MockedObj:
56+
@classmethod
57+
async def containers_inspect(cls, *args, **kwargs) -> None:
58+
raise BaseClientHTTPError("will always fail")
59+
60+
mocker.patch.object(events, "get_dynamic_sidecar_client", return_value=MockedObj())
61+
62+
63+
@pytest.fixture
64+
def mock_dynamic_sidecar_client_stops_failing(mocker: MockerFixture) -> None:
65+
class MockedObj:
66+
def __init__(self) -> None:
67+
self.counter = 0
68+
69+
async def containers_inspect(self, *args, **kwargs) -> None:
70+
self.counter += 1
71+
if self.counter < STEPS / 2:
72+
raise BaseClientHTTPError("will always fail")
73+
74+
mocker.patch.object(events, "get_dynamic_sidecar_client", return_value=MockedObj())
75+
76+
77+
@pytest.fixture
78+
def docker_container_inspect() -> DockerContainerInspect:
79+
return DockerContainerInspect(status=DockerStatus.DEAD, name="", id="")
80+
81+
82+
@pytest.fixture
83+
def scheduler_data(
84+
scheduler_data: SchedulerData, docker_container_inspect: DockerContainerInspect
85+
) -> SchedulerData:
86+
scheduler_data.dynamic_sidecar.containers_inspect = [docker_container_inspect]
87+
return scheduler_data
88+
89+
90+
_CHECK_LOG_EXCEPTION_IS_SKIPPED = "skip(s) of exception"
91+
92+
93+
async def test_event_get_status_network_connectivity(
94+
mock_dynamic_sidecar_client_always_fail: None,
95+
minimal_app: FastAPI,
96+
scheduler_data: SchedulerData,
97+
caplog_info_level: LogCaptureFixture,
98+
):
99+
with pytest.raises(BaseClientHTTPError):
100+
for _ in range(REPEAT_COUNT):
101+
await events.GetStatus.action(minimal_app, scheduler_data)
102+
await asyncio.sleep(SLEEP_BETWEEN_CALLS)
103+
104+
assert caplog_info_level.text.count(_CHECK_LOG_EXCEPTION_IS_SKIPPED) > 1
105+
106+
107+
async def test_event_get_status_recovers_after_error(
108+
mock_dynamic_sidecar_client_stops_failing: None,
109+
minimal_app: FastAPI,
110+
scheduler_data: SchedulerData,
111+
caplog_info_level: LogCaptureFixture,
112+
):
113+
for _ in range(REPEAT_COUNT):
114+
await events.GetStatus.action(minimal_app, scheduler_data)
115+
await asyncio.sleep(SLEEP_BETWEEN_CALLS)
116+
assert caplog_info_level.text.count(_CHECK_LOG_EXCEPTION_IS_SKIPPED) >= 1

0 commit comments

Comments
 (0)