Skip to content

Commit aaa2e8b

Browse files
authored
Merge branch 'master' into pr-osparc-debug-web-worker
2 parents 264de6f + 2eb46b7 commit aaa2e8b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+5650
-86
lines changed

packages/pytest-simcore/src/pytest_simcore/helpers/docker.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ContainerStatus(str, Enum):
3737

3838
_COLOR_ENCODING_RE = re.compile(r"\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[mGK]")
3939
_MAX_PATH_CHAR_LEN_ALLOWED = 260
40-
_kFILENAME_TOO_LONG = 36
40+
_FILENAME_TOO_LONG = 36
4141
_NORMPATH_COUNT = 0
4242

4343

@@ -94,7 +94,7 @@ def get_service_published_port(
9494
)
9595

9696
for target_port in ports_to_look_for:
97-
target_port = int(target_port)
97+
target_port = int(target_port) # noqa: PLW2901
9898
for p in service_ports:
9999
if p["TargetPort"] == target_port:
100100
published_port = p["PublishedPort"]
@@ -158,7 +158,7 @@ def run_docker_compose_config(
158158
args = [f"{docker_compose_path}", *bash_options]
159159
print(" ".join(args))
160160

161-
process = subprocess.run(
161+
process = subprocess.run( # noqa: S603
162162
args,
163163
cwd=project_dir,
164164
capture_output=True,
@@ -189,7 +189,7 @@ def shorten_path(filename: str) -> Path:
189189
# This helper function tries to normalize the path
190190
# Another possibility would be that the path has some
191191
# problematic characters but so far we did not find any case ...
192-
global _NORMPATH_COUNT # pylint: disable=global-statement
192+
global _NORMPATH_COUNT # pylint: disable=global-statement # noqa: PLW0603
193193

194194
if len(filename) > _MAX_PATH_CHAR_LEN_ALLOWED:
195195
_NORMPATH_COUNT += 1
@@ -215,7 +215,7 @@ def safe_artifact_name(name: str) -> str:
215215
return BANNED_CHARS_FOR_ARTIFACTS.sub("_", name)
216216

217217

218-
def save_docker_infos(destination_dir: Path):
218+
def save_docker_infos(destination_dir: Path): # noqa: C901
219219
client = docker.from_env()
220220

221221
# Includes stop containers, which might be e.g. failing tasks
@@ -228,7 +228,7 @@ def save_docker_infos(destination_dir: Path):
228228
destination_dir.mkdir(parents=True, exist_ok=True)
229229

230230
except OSError as err:
231-
if err.errno == _kFILENAME_TOO_LONG:
231+
if err.errno == _FILENAME_TOO_LONG:
232232
destination_dir = shorten_path(err.filename)
233233
destination_dir.mkdir(parents=True, exist_ok=True)
234234

@@ -245,7 +245,7 @@ def save_docker_infos(destination_dir: Path):
245245
)
246246

247247
except OSError as err:
248-
if err.errno == _kFILENAME_TOO_LONG:
248+
if err.errno == _FILENAME_TOO_LONG:
249249
shorten_path(err.filename).write_text(
250250
_COLOR_ENCODING_RE.sub("", logs)
251251
)
@@ -256,12 +256,12 @@ def save_docker_infos(destination_dir: Path):
256256
json.dumps(container.attrs, indent=2)
257257
)
258258
except OSError as err:
259-
if err.errno == _kFILENAME_TOO_LONG:
259+
if err.errno == _FILENAME_TOO_LONG:
260260
shorten_path(err.filename).write_text(
261261
json.dumps(container.attrs, indent=2)
262262
)
263263

264-
except Exception as err: # pylint: disable=broad-except # noqa: PERF203
264+
except Exception as err: # pylint: disable=broad-except
265265
if container.status != ContainerStatus.created:
266266
print(
267267
f"Error while dumping {container.name=}, {container.status=}.\n\t{err=}"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from collections.abc import AsyncIterator, Callable
2+
from contextlib import AbstractAsyncContextManager, asynccontextmanager
3+
from typing import TYPE_CHECKING, Protocol
4+
5+
from tenacity.asyncio import AsyncRetrying
6+
from tenacity.retry import retry_if_exception_type
7+
from tenacity.stop import stop_after_delay
8+
from tenacity.wait import wait_fixed
9+
10+
if TYPE_CHECKING:
11+
from servicelib.rabbitmq import RabbitMQClient
12+
from servicelib.redis import RedisClientSDK
13+
14+
15+
class _ClientWithPingProtocol(Protocol):
16+
async def ping(self) -> bool: ...
17+
18+
19+
@asynccontextmanager
20+
async def _paused_container(
21+
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
22+
container_name: str,
23+
client: _ClientWithPingProtocol,
24+
) -> AsyncIterator[None]:
25+
async with paused_container(container_name):
26+
yield
27+
28+
async for attempt in AsyncRetrying(
29+
wait=wait_fixed(0.1),
30+
stop=stop_after_delay(10),
31+
reraise=True,
32+
retry=retry_if_exception_type(AssertionError),
33+
):
34+
with attempt:
35+
assert await client.ping() is True
36+
37+
38+
@asynccontextmanager
39+
async def pause_rabbit(
40+
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
41+
rabbit_client: "RabbitMQClient",
42+
) -> AsyncIterator[None]:
43+
"""
44+
Pause RabbitMQ container during the context block,
45+
ensuring it's fully down before and back up after.
46+
"""
47+
async with _paused_container(paused_container, "rabbit", rabbit_client):
48+
yield
49+
50+
51+
@asynccontextmanager
52+
async def pause_redis(
53+
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
54+
redis_client: "RedisClientSDK",
55+
) -> AsyncIterator[None]:
56+
"""
57+
Pause Redis container during the context block,
58+
saving a DB snapshot first for a clean restore point.
59+
Ensures Redis is down before yielding, and back up after.
60+
"""
61+
await redis_client.redis.save()
62+
63+
async with _paused_container(paused_container, "redis", redis_client):
64+
yield

packages/service-library/tests/deferred_tasks/test_deferred_tasks.py

Lines changed: 15 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,24 @@
22
# pylint:disable=unused-argument
33

44
import asyncio
5-
import contextlib
5+
import datetime
66
import itertools
77
import json
88
import random
99
import sys
10-
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable
10+
from collections.abc import AsyncIterable, Awaitable, Callable
1111
from contextlib import AbstractAsyncContextManager, AsyncExitStack, suppress
1212
from pathlib import Path
13-
from typing import Any, Protocol
13+
from typing import Any
1414

1515
import psutil
1616
import pytest
1717
from aiohttp.test_utils import unused_port
1818
from common_library.json_serialization import json_dumps
1919
from common_library.serialization import model_dump_with_secrets
2020
from pydantic import NonNegativeFloat, NonNegativeInt
21+
from pytest_mock import MockerFixture
22+
from pytest_simcore.helpers.paused_container import pause_rabbit, pause_redis
2123
from servicelib.rabbitmq import RabbitMQClient
2224
from servicelib.redis import RedisClientSDK
2325
from servicelib.sequences_utils import partition_gen
@@ -330,57 +332,12 @@ async def rabbit_client(
330332
return create_rabbitmq_client("pinger")
331333

332334

333-
class ClientWithPingProtocol(Protocol):
334-
async def ping(self) -> bool: ...
335-
336-
337-
class ServiceManager:
338-
def __init__(
339-
self,
340-
redis_client: RedisClientSDK,
341-
rabbit_client: RabbitMQClient,
342-
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
343-
) -> None:
344-
self.redis_client = redis_client
345-
self.rabbit_client = rabbit_client
346-
self.paused_container = paused_container
347-
348-
@contextlib.asynccontextmanager
349-
async def _paused_container(
350-
self, container_name: str, client: ClientWithPingProtocol
351-
) -> AsyncIterator[None]:
352-
async with self.paused_container(container_name):
353-
async for attempt in AsyncRetrying(
354-
wait=wait_fixed(0.1),
355-
stop=stop_after_delay(10),
356-
reraise=True,
357-
retry=retry_if_exception_type(AssertionError),
358-
):
359-
with attempt:
360-
assert await client.ping() is False
361-
yield
362-
363-
async for attempt in AsyncRetrying(
364-
wait=wait_fixed(0.1),
365-
stop=stop_after_delay(10),
366-
reraise=True,
367-
retry=retry_if_exception_type(AssertionError),
368-
):
369-
with attempt:
370-
assert await client.ping() is True
371-
372-
@contextlib.asynccontextmanager
373-
async def pause_rabbit(self) -> AsyncIterator[None]:
374-
async with self._paused_container("rabbit", self.rabbit_client):
375-
yield
376-
377-
@contextlib.asynccontextmanager
378-
async def pause_redis(self) -> AsyncIterator[None]:
379-
# save db for clean restore point
380-
await self.redis_client.redis.save()
381-
382-
async with self._paused_container("redis", self.redis_client):
383-
yield
335+
@pytest.fixture
336+
def mock_default_socket_timeout(mocker: MockerFixture) -> None:
337+
mocker.patch(
338+
"servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT",
339+
datetime.timedelta(seconds=0.25),
340+
)
384341

385342

386343
@pytest.mark.parametrize("max_workers", [10])
@@ -397,9 +354,6 @@ async def test_workflow_with_third_party_services_outages(
397354
deferred_tasks_to_start: int,
398355
service: str,
399356
):
400-
service_manager = ServiceManager(
401-
redis_client_sdk_deferred_tasks, rabbit_client, paused_container
402-
)
403357

404358
async with _RemoteProcessLifecycleManager(
405359
await get_remote_process(),
@@ -423,14 +377,16 @@ async def test_workflow_with_third_party_services_outages(
423377
match service:
424378
case "rabbit":
425379
print("[rabbit]: pausing")
426-
async with service_manager.pause_rabbit():
380+
async with pause_rabbit(paused_container, rabbit_client):
427381
print("[rabbit]: paused")
428382
await _sleep_in_interval(0.2, 0.4)
429383
print("[rabbit]: resumed")
430384

431385
case "redis":
432386
print("[redis]: pausing")
433-
async with service_manager.pause_redis():
387+
async with pause_redis(
388+
paused_container, redis_client_sdk_deferred_tasks
389+
):
434390
print("[redis]: paused")
435391
await _sleep_in_interval(0.2, 0.4)
436392
print("[redis]: resumed")

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_ops.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
DynamicServiceGet,
66
)
77

8-
from ...services import scheduler_interface
8+
from ...services import common_interface
99
from ._dependencies import (
1010
get_app,
1111
)
@@ -19,6 +19,6 @@ async def running_services(
1919
) -> list[DynamicServiceGet]:
2020
"""returns all running dynamic services. Used by ops internall to determine
2121
when it is safe to shutdown the platform"""
22-
return await scheduler_interface.list_tracked_dynamic_services(
22+
return await common_interface.list_tracked_dynamic_services(
2323
app, user_id=None, project_id=None
2424
)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
ServiceWasNotFoundError,
2121
)
2222

23-
from ...services import scheduler_interface
23+
from ...services import common_interface
2424

2525
router = RPCRouter()
2626

@@ -29,7 +29,7 @@
2929
async def list_tracked_dynamic_services(
3030
app: FastAPI, *, user_id: UserID | None = None, project_id: ProjectID | None = None
3131
) -> list[DynamicServiceGet]:
32-
return await scheduler_interface.list_tracked_dynamic_services(
32+
return await common_interface.list_tracked_dynamic_services(
3333
app, user_id=user_id, project_id=project_id
3434
)
3535

@@ -38,14 +38,14 @@ async def list_tracked_dynamic_services(
3838
async def get_service_status(
3939
app: FastAPI, *, node_id: NodeID
4040
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
41-
return await scheduler_interface.get_service_status(app, node_id=node_id)
41+
return await common_interface.get_service_status(app, node_id=node_id)
4242

4343

4444
@router.expose()
4545
async def run_dynamic_service(
4646
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
4747
) -> NodeGet | DynamicServiceGet:
48-
return await scheduler_interface.run_dynamic_service(
48+
return await common_interface.run_dynamic_service(
4949
app, dynamic_service_start=dynamic_service_start
5050
)
5151

@@ -59,7 +59,7 @@ async def run_dynamic_service(
5959
async def stop_dynamic_service(
6060
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
6161
) -> None:
62-
return await scheduler_interface.stop_dynamic_service(
62+
return await common_interface.stop_dynamic_service(
6363
app, dynamic_service_stop=dynamic_service_stop
6464
)
6565

@@ -68,25 +68,25 @@ async def stop_dynamic_service(
6868
async def get_project_inactivity(
6969
app: FastAPI, *, project_id: ProjectID, max_inactivity_seconds: NonNegativeInt
7070
) -> GetProjectInactivityResponse:
71-
return await scheduler_interface.get_project_inactivity(
71+
return await common_interface.get_project_inactivity(
7272
app, project_id=project_id, max_inactivity_seconds=max_inactivity_seconds
7373
)
7474

7575

7676
@router.expose()
7777
async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
78-
await scheduler_interface.restart_user_services(app, node_id=node_id)
78+
await common_interface.restart_user_services(app, node_id=node_id)
7979

8080

8181
@router.expose()
8282
async def retrieve_inputs(
8383
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
8484
) -> RetrieveDataOutEnveloped:
85-
return await scheduler_interface.retrieve_inputs(
85+
return await common_interface.retrieve_inputs(
8686
app, node_id=node_id, port_keys=port_keys
8787
)
8888

8989

9090
@router.expose()
9191
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
92-
await scheduler_interface.update_projects_networks(app, project_id=project_id)
92+
await common_interface.update_projects_networks(app, project_id=project_id)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from ..services.deferred_manager import deferred_manager_lifespan
2424
from ..services.director_v0 import director_v0_lifespan
2525
from ..services.director_v2 import director_v2_lifespan
26+
from ..services.generic_scheduler import generic_scheduler_lifespan
2627
from ..services.notifier import get_notifier_lifespans
2728
from ..services.rabbitmq import rabbitmq_lifespan
2829
from ..services.redis import redis_lifespan
@@ -79,6 +80,8 @@ def create_app_lifespan(
7980
for lifespan in get_notifier_lifespans():
8081
app_lifespan.add(lifespan)
8182

83+
app_lifespan.add(generic_scheduler_lifespan)
84+
8285
app_lifespan.add(service_tracker_lifespan)
8386
app_lifespan.add(deferred_manager_lifespan)
8487
app_lifespan.add(status_monitor_lifespan)

0 commit comments

Comments
 (0)