Skip to content
Merged
Show file tree
Hide file tree
Changes from 149 commits
Commits
Show all changes
180 commits
Select commit Hold shift + click to select a range
727ec1f
intent needs to be saved before scheduling
Sep 3, 2025
7f4999c
renamed
Sep 3, 2025
cc3714e
renamed
Sep 3, 2025
dd0c3f6
adadded missing
Sep 3, 2025
2e42aa4
removed
Sep 3, 2025
d22bb55
rename
Sep 3, 2025
3b15a9a
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 4, 2025
d8b520b
added base scheduler store
Sep 4, 2025
2c57d5b
refacrtored core modules
Sep 5, 2025
00ef65e
extended store
Sep 5, 2025
8d2268b
updated operationregistry
Sep 5, 2025
285839e
added deferred_task_uid to store
Sep 5, 2025
700426b
corrected key
Sep 5, 2025
bd93ef7
added required fields
Sep 5, 2025
9188e07
added deferred runner module
Sep 5, 2025
ff9ae07
removed unused
Sep 5, 2025
f9c6dd7
added defaults
Sep 5, 2025
ed822b1
added events manger
Sep 5, 2025
b5fba40
added lifespan
Sep 5, 2025
43d2353
added module with tests
Sep 8, 2025
ddab666
fixed tests
Sep 8, 2025
e982d8f
expanded store
Sep 8, 2025
f2108b6
refact store
Sep 8, 2025
d0bf3af
refactore correct value
Sep 8, 2025
79b51f7
fixed
Sep 8, 2025
ac7715b
refaactor
Sep 8, 2025
3244a9c
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 9, 2025
4707361
expanded store
Sep 9, 2025
a8c33c2
removed unused
Sep 9, 2025
5cf9ffa
extended store with operation error
Sep 9, 2025
6993cc2
enhanced store
Sep 9, 2025
e701580
fixed tests
Sep 9, 2025
f974b10
added event scheduler
Sep 9, 2025
33e1797
renamed to revert from destory
Sep 9, 2025
483e2da
first version
Sep 10, 2025
49c2dd1
initial draft
Sep 12, 2025
36d49c4
initial validation test
Sep 12, 2025
cf96cf3
added require fixture
Sep 12, 2025
7d9939c
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 12, 2025
760bcbe
first working version
Sep 12, 2025
b53ef64
refactor
Sep 12, 2025
1741d2b
refactor tests
Sep 12, 2025
1fe71ce
added notes
Sep 12, 2025
cdcdbab
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 12, 2025
82beb8b
added operaitoncontextproxy
Sep 16, 2025
df49539
extended operation
Sep 16, 2025
3273a0c
refactored
Sep 16, 2025
6c13089
refacto
Sep 16, 2025
267f1c5
refactored
Sep 16, 2025
8d60dbf
added keys removal proxy
Sep 16, 2025
d8a8ccc
refactor
Sep 16, 2025
49448d2
added core base usage
Sep 17, 2025
762f929
removed debug
Sep 17, 2025
58225ca
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 17, 2025
5725269
mypy
Sep 17, 2025
62051ed
fixed error message
Sep 17, 2025
9faca0a
added revert tests
Sep 17, 2025
76d5b75
refactor tests
Sep 17, 2025
24c5903
refactor
Sep 17, 2025
034542a
rename
Sep 17, 2025
c205c49
simplfied tests
Sep 17, 2025
2bf3a7a
extended tests for better error reporting
Sep 17, 2025
a863a2f
updated tests
Sep 17, 2025
a0bff16
added cancellation tests
Sep 17, 2025
e45e3fc
renamed
Sep 17, 2025
fc772aa
refactor removal
Sep 17, 2025
ed70751
added repeating steps tests
Sep 17, 2025
0b3b9b2
do not allow manual intervention for steps that are repeatable
Sep 17, 2025
e9a8f0e
added tests for manual intervention
Sep 17, 2025
68812a4
added context integrity check tests
Sep 17, 2025
01dedd9
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 17, 2025
8d305cf
fixed import and refactor if
Sep 18, 2025
a322f68
optimized if
Sep 18, 2025
742d7b9
ignore context manager
Sep 18, 2025
d37b40d
fixed test
Sep 18, 2025
72655ce
added tests context issues tests
Sep 18, 2025
0f11c1f
added more tests
Sep 18, 2025
e2bb020
added public interface
Sep 18, 2025
86f65d3
refactor public interfaces
Sep 18, 2025
11f486f
changed sort order
Sep 18, 2025
4af3945
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 18, 2025
0aa217f
pylint
Sep 18, 2025
424e4ea
removed todo
Sep 18, 2025
41f9a9d
extraced ServiceManager
Sep 18, 2025
9c98595
refactored tests
Sep 18, 2025
5e6518a
added tests
Sep 18, 2025
7e1353e
removed TODO
Sep 18, 2025
741d7b0
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 18, 2025
99328df
moved tests
Sep 18, 2025
200cc11
renamed
Sep 18, 2025
1f328bd
removed unused
Sep 18, 2025
6ca1dcd
fixed imports
Sep 18, 2025
fb93630
removed todo
Sep 18, 2025
1d9f892
fixed tests
Sep 18, 2025
a76828c
pylint
Sep 18, 2025
767caf0
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 18, 2025
988903e
refactor
Sep 18, 2025
7330395
extracted repeating steps function
Sep 18, 2025
c4f485a
refactor
Sep 18, 2025
c234b9f
extracted function
Sep 18, 2025
e68622c
removed todo
Sep 18, 2025
cb95fa7
see if typechecking helps
Sep 19, 2025
29455ab
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 19, 2025
5b9f926
rename
Sep 19, 2025
298b387
sonar
Sep 19, 2025
1f70574
sonar
Sep 19, 2025
055eb5b
removed unused
Sep 19, 2025
00d010e
expose more utilty methods
Sep 19, 2025
3797194
exposed more
Sep 19, 2025
f03af79
added notes
Sep 19, 2025
4e05dbf
exposed all proxies
Sep 19, 2025
243667a
rename interface
Sep 19, 2025
8d2ff4c
added exceptions
Sep 19, 2025
5d1675f
extended store interface
Sep 19, 2025
2a5e2dc
added step retry
Sep 19, 2025
4690969
removed unused
Sep 19, 2025
5fadd47
added test
Sep 19, 2025
0f32e1d
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 19, 2025
47f804c
added tests for restart_revert_operation_step_in_error
Sep 19, 2025
a1a906e
refactor interface
Sep 19, 2025
93be131
refactor tests
Sep 19, 2025
a78438b
removed possible flkayness
Sep 19, 2025
287fe89
added tests for errors
Sep 19, 2025
8009882
refactor
Sep 19, 2025
a538da6
fixed flaky
Sep 19, 2025
b94d089
refactor
Sep 19, 2025
2f1f698
moved
Sep 19, 2025
da536cc
dropped comment
Sep 19, 2025
453a72a
renamed
Sep 19, 2025
24e5ba9
rename
Sep 19, 2025
c1a9633
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 22, 2025
e7f94e9
rename
Sep 22, 2025
9bfbe04
added comments
Sep 22, 2025
7edc5ee
working
Sep 22, 2025
9e9a500
refactor
Sep 22, 2025
bfa4ca1
added comments
Sep 22, 2025
34fdb1f
refactor
Sep 22, 2025
0d9bf7a
refactor
Sep 22, 2025
3500865
refactor
Sep 22, 2025
966fd4c
refactor
Sep 22, 2025
e1c2f85
renaming
Sep 22, 2025
b6c9aee
update comments
Sep 22, 2025
6b0c396
added some more comments
Sep 22, 2025
95be4ab
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 22, 2025
4bc004e
refactor
Sep 23, 2025
719a530
refactor
Sep 23, 2025
6020209
docstring + renaming
Sep 23, 2025
fb04b13
refactor
Sep 23, 2025
56b89d6
typos
Sep 23, 2025
3161b66
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 29, 2025
e6bdbcf
using redis instead of rabbit and removed ping that makes no sense
Sep 29, 2025
5abff06
refactor
Sep 29, 2025
68bf6c7
docstring
Sep 29, 2025
ddb7bea
renamed and docstrings
Sep 29, 2025
99b4d5b
better form
Sep 29, 2025
c0c6b36
refactor
Sep 29, 2025
fef354c
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Sep 29, 2025
9f0f5e2
moved
Sep 29, 2025
3eaf586
typos
Sep 29, 2025
ae6eabf
renamed module
Sep 29, 2025
39107c6
typo
Sep 29, 2025
5109933
renamed error
Sep 29, 2025
c79ad21
fixed name
Sep 29, 2025
654a8da
removed unused
Sep 29, 2025
1735ace
refactor
Sep 29, 2025
e20e8d7
replaced get_store and get_core
Sep 29, 2025
7c4b665
merged lifespans
Sep 29, 2025
99a620c
fixed tests
Sep 29, 2025
d6db1fa
refactor
Sep 29, 2025
ca02c79
typo
Sep 29, 2025
1718990
renamed types
Sep 29, 2025
80b64ed
aligned named in _store module
Sep 29, 2025
382d04c
removed indirection
Sep 29, 2025
a27aeec
renamed store proxy interfaces
Sep 29, 2025
97f22a0
renamed tests
Sep 29, 2025
ae1f05a
typos
Sep 29, 2025
1ed132b
renamed revert to undo
Sep 29, 2025
8ca0bf3
fixed tests
Sep 29, 2025
017e796
fixed mock
Oct 1, 2025
6d78813
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions packages/pytest-simcore/src/pytest_simcore/helpers/docker.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import contextlib
import json
import logging
import os
import re
import subprocess
from collections.abc import AsyncIterator, Callable
from contextlib import AbstractAsyncContextManager
from enum import Enum
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any, Protocol

import docker
import yaml
from tenacity import retry
from tenacity.after import after_log
from tenacity.stop import stop_after_attempt
from tenacity.asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt, stop_after_delay
from tenacity.wait import wait_fixed

if TYPE_CHECKING:
from servicelib.rabbitmq import RabbitMQClient
from servicelib.redis import RedisClientSDK


# NOTE: CANNOT use models_library.generated_models.docker_rest_api.Status2 because some of the
# packages tests installations do not include this library!!
Expand Down Expand Up @@ -272,3 +281,56 @@ def save_docker_infos(destination_dir: Path):
f"wrote docker log and json files for {len(all_containers)} containers in ",
destination_dir,
)


class _ClientWithPingProtocol(Protocol):
async def ping(self) -> bool: ...


class ServiceManager:
def __init__(
self,
redis_client: "RedisClientSDK",
rabbit_client: "RabbitMQClient",
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
) -> None:
self.redis_client = redis_client
self.rabbit_client = rabbit_client
self.paused_container = paused_container

@contextlib.asynccontextmanager
async def _paused_container(
self, container_name: str, client: _ClientWithPingProtocol
) -> AsyncIterator[None]:
async with self.paused_container(container_name):
async for attempt in AsyncRetrying(
wait=wait_fixed(0.1),
stop=stop_after_delay(10),
reraise=True,
retry=retry_if_exception_type(AssertionError),
):
with attempt:
assert await client.ping() is False
yield

async for attempt in AsyncRetrying(
wait=wait_fixed(0.1),
stop=stop_after_delay(10),
reraise=True,
retry=retry_if_exception_type(AssertionError),
):
with attempt:
assert await client.ping() is True

@contextlib.asynccontextmanager
async def pause_rabbit(self) -> AsyncIterator[None]:
async with self._paused_container("rabbit", self.rabbit_client):
yield

@contextlib.asynccontextmanager
async def pause_redis(self) -> AsyncIterator[None]:
# save db for clean restore point
await self.redis_client.redis.save()

async with self._paused_container("redis", self.redis_client):
yield
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
# pylint:disable=unused-argument

import asyncio
import contextlib
import datetime
import itertools
import json
import random
import sys
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable
from collections.abc import AsyncIterable, Awaitable, Callable
from contextlib import AbstractAsyncContextManager, AsyncExitStack, suppress
from pathlib import Path
from typing import Any, Protocol
from typing import Any

import psutil
import pytest
Expand All @@ -20,6 +19,7 @@
from common_library.serialization import model_dump_with_secrets
from pydantic import NonNegativeFloat, NonNegativeInt
from pytest_mock import MockerFixture
from pytest_simcore.helpers.docker import ServiceManager
from servicelib.rabbitmq import RabbitMQClient
from servicelib.redis import RedisClientSDK
from servicelib.sequences_utils import partition_gen
Expand Down Expand Up @@ -332,59 +332,6 @@ async def rabbit_client(
return create_rabbitmq_client("pinger")


class ClientWithPingProtocol(Protocol):
async def ping(self) -> bool: ...


class ServiceManager:
def __init__(
self,
redis_client: RedisClientSDK,
rabbit_client: RabbitMQClient,
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
) -> None:
self.redis_client = redis_client
self.rabbit_client = rabbit_client
self.paused_container = paused_container

@contextlib.asynccontextmanager
async def _paused_container(
self, container_name: str, client: ClientWithPingProtocol
) -> AsyncIterator[None]:
async with self.paused_container(container_name):
async for attempt in AsyncRetrying(
wait=wait_fixed(0.1),
stop=stop_after_delay(10),
reraise=True,
retry=retry_if_exception_type(AssertionError),
):
with attempt:
assert await client.ping() is False
yield

async for attempt in AsyncRetrying(
wait=wait_fixed(0.1),
stop=stop_after_delay(10),
reraise=True,
retry=retry_if_exception_type(AssertionError),
):
with attempt:
assert await client.ping() is True

@contextlib.asynccontextmanager
async def pause_rabbit(self) -> AsyncIterator[None]:
async with self._paused_container("rabbit", self.rabbit_client):
yield

@contextlib.asynccontextmanager
async def pause_redis(self) -> AsyncIterator[None]:
# save db for clean restore point
await self.redis_client.redis.save()

async with self._paused_container("redis", self.redis_client):
yield


@pytest.fixture
def mock_default_socket_timeout(mocker: MockerFixture) -> None:
mocker.patch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
DynamicServiceGet,
)

from ...services import scheduler_interface
from ...services import common_interface
from ._dependencies import (
get_app,
)
Expand All @@ -19,6 +19,6 @@ async def running_services(
) -> list[DynamicServiceGet]:
"""returns all running dynamic services. Used by ops internall to determine
when it is safe to shutdown the platform"""
return await scheduler_interface.list_tracked_dynamic_services(
return await common_interface.list_tracked_dynamic_services(
app, user_id=None, project_id=None
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
ServiceWasNotFoundError,
)

from ...services import scheduler_interface
from ...services import common_interface

router = RPCRouter()

Expand All @@ -29,7 +29,7 @@
async def list_tracked_dynamic_services(
app: FastAPI, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
return await scheduler_interface.list_tracked_dynamic_services(
return await common_interface.list_tracked_dynamic_services(
app, user_id=user_id, project_id=project_id
)

Expand All @@ -38,14 +38,14 @@ async def list_tracked_dynamic_services(
async def get_service_status(
app: FastAPI, *, node_id: NodeID
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
return await scheduler_interface.get_service_status(app, node_id=node_id)
return await common_interface.get_service_status(app, node_id=node_id)


@router.expose()
async def run_dynamic_service(
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
return await scheduler_interface.run_dynamic_service(
return await common_interface.run_dynamic_service(
app, dynamic_service_start=dynamic_service_start
)

Expand All @@ -59,7 +59,7 @@ async def run_dynamic_service(
async def stop_dynamic_service(
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
) -> None:
return await scheduler_interface.stop_dynamic_service(
return await common_interface.stop_dynamic_service(
app, dynamic_service_stop=dynamic_service_stop
)

Expand All @@ -68,25 +68,25 @@ async def stop_dynamic_service(
async def get_project_inactivity(
app: FastAPI, *, project_id: ProjectID, max_inactivity_seconds: NonNegativeInt
) -> GetProjectInactivityResponse:
return await scheduler_interface.get_project_inactivity(
return await common_interface.get_project_inactivity(
app, project_id=project_id, max_inactivity_seconds=max_inactivity_seconds
)


@router.expose()
async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
await scheduler_interface.restart_user_services(app, node_id=node_id)
await common_interface.restart_user_services(app, node_id=node_id)


@router.expose()
async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
return await scheduler_interface.retrieve_inputs(
return await common_interface.retrieve_inputs(
app, node_id=node_id, port_keys=port_keys
)


@router.expose()
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
await scheduler_interface.update_projects_networks(app, project_id=project_id)
await common_interface.update_projects_networks(app, project_id=project_id)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ..services.deferred_manager import deferred_manager_lifespan
from ..services.director_v0 import director_v0_lifespan
from ..services.director_v2 import director_v2_lifespan
from ..services.generic_scheduler import get_generic_scheduler_lifespans
from ..services.notifier import get_notifier_lifespans
from ..services.rabbitmq import rabbitmq_lifespan
from ..services.redis import redis_lifespan
Expand Down Expand Up @@ -79,6 +80,9 @@ def create_app_lifespan(
for lifespan in get_notifier_lifespans():
app_lifespan.add(lifespan)

for lifespan in get_generic_scheduler_lifespans():
app_lifespan.add(lifespan)

app_lifespan.add(service_tracker_lifespan)
app_lifespan.add(deferred_manager_lifespan)
app_lifespan.add(status_monitor_lifespan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ async def get_service_status(
async def run_dynamic_service(
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
await set_request_as_running(app, dynamic_service_start)

settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError
Expand All @@ -59,13 +61,14 @@ async def run_dynamic_service(
await director_v2_client.run_dynamic_service(dynamic_service_start)
)

await set_request_as_running(app, dynamic_service_start)
return response


async def stop_dynamic_service(
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
) -> None:
await set_request_as_stopped(app, dynamic_service_stop)

settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError
Expand All @@ -78,8 +81,6 @@ async def stop_dynamic_service(
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
)

await set_request_as_stopped(app, dynamic_service_stop)


async def get_project_inactivity(
app: FastAPI, *, project_id: ProjectID, max_inactivity_seconds: NonNegativeInt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from ._api import (
cancel_operation,
restart_operation_stuck_in_error_during_revert,
restart_operation_stuck_in_manual_intervention_during_create,
start_operation,
)
from ._deferred_runner import (
get_operation_context_proxy,
get_step_group_proxy,
get_step_store_proxy,
)
from ._lifespan import get_generic_scheduler_lifespans
from ._models import (
OperationName,
ProvidedOperationContext,
RequiredOperationContext,
ScheduleId,
)
from ._operation import (
BaseStep,
Operation,
OperationRegistry,
ParallelStepGroup,
SingleStepGroup,
)
from ._store import OperationContextProxy, StepGroupProxy, StepStoreProxy

__all__: tuple[str, ...] = (
"BaseStep",
"cancel_operation",
"get_generic_scheduler_lifespans",
"get_operation_context_proxy",
"get_step_group_proxy",
"get_step_store_proxy",
"Operation",
"OperationContextProxy",
"OperationName",
"OperationRegistry",
"ParallelStepGroup",
"ProvidedOperationContext",
"RequiredOperationContext",
"restart_operation_stuck_in_error_during_revert",
"restart_operation_stuck_in_manual_intervention_during_create",
"ScheduleId",
"SingleStepGroup",
"start_operation",
"StepGroupProxy",
"StepStoreProxy",
)
Loading
Loading