Skip to content

Commit 85f338a

Browse files
authored
šŸ›ā™»ļø Is638/fixes docker-compose operations bursts and create containers call in dy-sidecar (ITISFoundation#3187)
1 parent 41f2d33 commit 85f338a

40 files changed

+422
-329
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from typing import AsyncIterable
2+
3+
import pytest
4+
from servicelib.async_utils import cancel_sequential_workers
5+
6+
7+
@pytest.fixture
8+
async def ensure_run_in_sequence_context_is_empty() -> AsyncIterable[None]:
9+
"""
10+
Needed in tests calling functions decorated with 'run_sequentially_in_context'
11+
12+
This is a teardown only fixture
13+
14+
Required when shutting down the application or ending tests
15+
otherwise errors will occur when closing the loop
16+
"""
17+
18+
# nothing on-startup
19+
20+
yield
21+
22+
await cancel_sequential_workers()

ā€Žpackages/service-library/src/servicelib/async_utils.pyā€Ž

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import logging
33
from collections import deque
4+
from contextlib import suppress
45
from dataclasses import dataclass
56
from functools import wraps
67
from typing import TYPE_CHECKING, Any, Callable, Deque, Optional
@@ -32,16 +33,24 @@ class Context:
3233
_sequential_jobs_contexts: dict[str, Context] = {}
3334

3435

35-
async def stop_sequential_workers() -> None:
36-
"""Singlas all workers to close thus avoiding errors on shutdown"""
36+
async def cancel_sequential_workers() -> None:
37+
"""Signals all workers to close thus avoiding errors on shutdown"""
3738
for context in _sequential_jobs_contexts.values():
3839
await context.in_queue.put(None)
3940
if context.task is not None:
40-
await context.task
41+
context.task.cancel()
42+
with suppress(asyncio.CancelledError):
43+
await context.task
44+
4145
_sequential_jobs_contexts.clear()
4246
logger.info("All run_sequentially_in_context pending workers stopped")
4347

4448

49+
# NOTE: If you get funny mismatches with mypy in returned values it might be due to this decorator.
50+
# @run_sequentially_in_contextreturn changes the return type of the decorated function to `Any`.
51+
# Instead we should annotate this decorator with ParamSpec and TypeVar generics.
52+
# SEE https://peps.python.org/pep-0612/
53+
#
4554
def run_sequentially_in_context(
4655
target_args: Optional[list[str]] = None,
4756
) -> Callable:

ā€Žpackages/service-library/src/servicelib/fastapi/requests_decorators.pyā€Ž

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,19 @@ def _validate_signature(handler: _HandlerWithRequestArg):
3838
_POLL_INTERVAL_S: float = 0.01
3939

4040

41-
async def disconnect_poller(request: Request, result: Any):
41+
async def _disconnect_poller(request: Request, result: Any):
4242
"""
4343
Poll for a disconnect.
4444
If the request disconnects, stop polling and return.
4545
"""
4646
while not await request.is_disconnected():
4747
await asyncio.sleep(_POLL_INTERVAL_S)
48-
49-
logger.debug(
50-
"client %s disconnected! Cancelling handler for request %s %s",
51-
request.client,
52-
request.method,
53-
request.url,
54-
)
5548
return result
5649

5750

5851
def cancel_on_disconnect(handler: _HandlerWithRequestArg):
5952
"""
60-
After client dicsonnects, handler gets cancelled in ~<3 secs
53+
After client disconnects, handler gets cancelled in ~<3 secs
6154
"""
6255

6356
_validate_signature(handler)
@@ -69,7 +62,7 @@ async def wrapper(request: Request, *args, **kwargs):
6962
# Create two tasks:
7063
# one to poll the request and check if the client disconnected
7164
poller_task = asyncio.create_task(
72-
disconnect_poller(request, sentinel),
65+
_disconnect_poller(request, sentinel),
7366
name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}",
7467
)
7568
# , and another which is the request handler
@@ -85,15 +78,14 @@ async def wrapper(request: Request, *args, **kwargs):
8578
# One has completed, cancel the other
8679
for t in pending:
8780
t.cancel()
81+
8882
try:
8983
await asyncio.wait_for(t, timeout=3)
84+
9085
except asyncio.CancelledError:
91-
logger.debug("%s was cancelled", t)
92-
except Exception as exc: # pylint: disable=broad-except
86+
pass
87+
except Exception: # pylint: disable=broad-except
9388
if t is handler_task:
94-
logger.warning(
95-
"%s raised %s when being cancelled.", t, exc, exc_info=True
96-
)
9789
raise
9890
finally:
9991
assert t.done() # nosec
@@ -103,15 +95,18 @@ async def wrapper(request: Request, *args, **kwargs):
10395
assert poller_task.done() # nosec
10496
return await handler_task
10597

106-
# Otherwise, raise an exception. This is not exactly needed, but it will prevent
107-
# validation errors if your request handler is supposed to return something.
108-
logger.debug(
109-
"Request %s %s cancelled:\n - %s\n - %s",
98+
# Otherwise, raise an exception. This is not exactly needed,
99+
# but it will prevent validation errors if your request handler
100+
# is supposed to return something.
101+
logger.warning(
102+
"Request %s %s cancelled since client %s disconnected:\n - %s\n - %s",
110103
request.method,
111104
request.url,
105+
request.client,
112106
f"{poller_task=}",
113107
f"{handler_task=}",
114108
)
109+
115110
assert poller_task.done() # nosec
116111
assert handler_task.done() # nosec
117112

@@ -122,3 +117,6 @@ async def wrapper(request: Request, *args, **kwargs):
122117
)
123118

124119
return wrapper
120+
121+
122+
__all__: tuple[str, ...] = ("cancel_on_disconnect",)

ā€Žpackages/service-library/tests/conftest.pyā€Ž

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
import sys
66
from copy import deepcopy
77
from pathlib import Path
8-
from typing import Any, Dict
8+
from typing import Any
99

1010
import pytest
1111
import servicelib
1212
from faker import Faker
1313

1414
pytest_plugins = [
15-
"pytest_simcore.repository_paths",
1615
"pytest_simcore.pytest_global_environs",
16+
"pytest_simcore.repository_paths",
17+
"pytest_simcore.simcore_service_library_fixtures",
1718
]
1819

1920

@@ -40,7 +41,7 @@ def osparc_simcore_root_dir(here) -> Path:
4041

4142

4243
@pytest.fixture
43-
def fake_data_dict(faker: Faker) -> Dict[str, Any]:
44+
def fake_data_dict(faker: Faker) -> dict[str, Any]:
4445
data = {
4546
"uuid_as_UUID": faker.uuid4(cast_to=None),
4647
"uuid_as_str": faker.uuid4(),

ā€Žpackages/service-library/tests/test_async_utils.pyā€Ž

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,19 @@
88
from collections import deque
99
from dataclasses import dataclass
1010
from time import time
11-
from typing import Any, AsyncIterable, Optional
11+
from typing import Any, Optional
1212

1313
import pytest
1414
from faker import Faker
1515
from servicelib.async_utils import (
1616
_sequential_jobs_contexts,
1717
run_sequentially_in_context,
18-
stop_sequential_workers,
1918
)
2019

2120
RETRIES = 10
2221
DIFFERENT_CONTEXTS_COUNT = 10
2322

2423

25-
@pytest.fixture
26-
async def ensure_run_in_sequence_context_is_empty() -> AsyncIterable[None]:
27-
yield
28-
# NOTE
29-
# required when shutting down the application or ending tests
30-
# otherwise errors will occur when closing the loop
31-
await stop_sequential_workers()
32-
33-
3424
@pytest.fixture
3525
def payload(faker: Faker) -> str:
3626
return faker.text()

ā€Žservices/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.pyā€Ž

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import json
32
import logging
43
from typing import Coroutine, Optional, Union, cast
54
from uuid import UUID
@@ -12,6 +11,7 @@
1211
from models_library.service_settings_labels import SimcoreServiceLabels
1312
from models_library.services import ServiceKeyVersion
1413
from models_library.users import UserID
14+
from servicelib.json_serialization import json_dumps
1515
from starlette import status
1616
from starlette.datastructures import URL
1717
from tenacity import RetryCallState, TryAgain
@@ -88,6 +88,8 @@ async def list_running_dynamic_services(
8888
dynamic_services_settings.DYNAMIC_SIDECAR, user_id, project_id
8989
)
9090
]
91+
92+
# NOTE: Review error handling https://github.com/ITISFoundation/osparc-simcore/issues/3194
9193
dynamic_sidecar_running_services: list[DynamicServiceOut] = cast(
9294
list[DynamicServiceOut], await asyncio.gather(*get_stack_statuse_tasks)
9395
)
@@ -214,7 +216,7 @@ def _log_error(retry_state: RetryCallState):
214216
logger.error(
215217
"Service with %s could not be untracked after %s",
216218
f"{node_uuid=}",
217-
f"{json.dumps(retry_state.retry_object.statistics)}",
219+
f"{json_dumps(retry_state.retry_object.statistics)}",
218220
)
219221

220222
async for attempt in AsyncRetrying(

ā€Žservices/director-v2/src/simcore_service_director_v2/core/events.pyā€Ž

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from servicelib.async_utils import stop_sequential_workers
1+
from servicelib.async_utils import cancel_sequential_workers
22

33
from ..meta import PROJECT_NAME, __version__
44

@@ -11,7 +11,7 @@
1111
| | | |_ _ __ ___ ___| |_ ___ _ __
1212
| | | | | '__/ _ \/ __| __/ _ \| '__|
1313
| |/ /| | | | __/ (__| || (_) | |
14-
|___/ |_|_| \___|\___|\__\___/|_| {0}
14+
|___/ |_|_| \___|\___|\__\___/|_| {}
1515
1616
""".format(
1717
f"v{__version__}"
@@ -23,6 +23,6 @@ async def on_startup() -> None:
2323

2424

2525
async def on_shutdown() -> None:
26-
await stop_sequential_workers()
26+
await cancel_sequential_workers()
2727
msg = PROJECT_NAME + f" v{__version__} SHUT DOWN"
2828
print(f"{msg:=^100}", flush=True)

ā€Žservices/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api.pyā€Ž

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77
import time
88
from contextlib import asynccontextmanager
9-
from typing import Any, AsyncIterator, Mapping, Optional
9+
from typing import Any, AsyncIterator, Mapping, Optional, Union
1010

1111
import aiodocker
1212
from aiodocker.utils import clean_filters, clean_map
@@ -16,6 +16,7 @@
1616
from models_library.projects import ProjectID
1717
from models_library.projects_nodes_io import NodeID
1818
from models_library.users import UserID
19+
from servicelib.json_serialization import json_dumps
1920
from servicelib.utils import logged_gather
2021
from tenacity._asyncio import AsyncRetrying
2122
from tenacity.retry import retry_if_exception_type
@@ -106,10 +107,21 @@ async def create_network(network_config: dict[str, Any]) -> str:
106107
)
107108

108109

109-
async def create_service_and_get_id(create_service_data: AioDockerServiceSpec) -> str:
110+
async def create_service_and_get_id(
111+
create_service_data: Union[AioDockerServiceSpec, dict[str, Any]]
112+
) -> str:
113+
# NOTE: ideally the argument should always be AioDockerServiceSpec
114+
# but for that we need get_dynamic_proxy_spec to return that type
110115
async with docker_client() as client:
111-
service_start_result = await client.services.create(
112-
**jsonable_encoder(create_service_data, by_alias=True, exclude_unset=True)
116+
kwargs = jsonable_encoder(
117+
create_service_data, by_alias=True, exclude_unset=True
118+
)
119+
service_start_result = await client.services.create(**kwargs)
120+
121+
log.debug(
122+
"Started service %s with\n%s",
123+
service_start_result,
124+
json.dumps(kwargs, indent=1),
113125
)
114126

115127
if "ID" not in service_start_result:
@@ -489,7 +501,7 @@ async def _update_service_spec(
489501
await client._query_json( # pylint: disable=protected-access
490502
f"services/{service_id}/update",
491503
method="POST",
492-
data=json.dumps(clean_map(updated_spec)),
504+
data=json_dumps(clean_map(updated_spec)),
493505
params={"version": service_version},
494506
)
495507
except aiodocker.exceptions.DockerError as e:

ā€Žservices/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.pyā€Ž

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import json
21
import logging
32
from copy import deepcopy
4-
from typing import Dict, List, Optional, Union
3+
from typing import Optional, Union
54

65
from fastapi.applications import FastAPI
76
from models_library.service_settings_labels import ComposeSpecLabel, PathMappingsLabel
@@ -12,10 +11,11 @@
1211
ServiceResourcesDict,
1312
)
1413
from servicelib.docker_compose import replace_env_vars_in_compose_spec
14+
from servicelib.json_serialization import json_dumps
1515
from settings_library.docker_registry import RegistrySettings
1616

17-
EnvKeyEqValueList = List[str]
18-
EnvVarsMap = Dict[str, Optional[str]]
17+
EnvKeyEqValueList = list[str]
18+
EnvVarsMap = dict[str, Optional[str]]
1919

2020

2121
logger = logging.getLogger(__name__)
@@ -97,7 +97,7 @@ def _update_paths_mappings(
9797
env_vars["DY_SIDECAR_PATH_OUTPUTS"] = f"{path_mappings.outputs_path}"
9898
env_vars[
9999
"DY_SIDECAR_STATE_PATHS"
100-
] = f"{json.dumps([f'{p}' for p in path_mappings.state_paths])}"
100+
] = f"{json_dumps( { f'{p}' for p in path_mappings.state_paths } )}"
101101

102102
service_content["environment"] = _environment_section.export_as_list(env_vars)
103103

0 commit comments

Comments
Ā (0)