Skip to content

Commit 755bdd5

Browse files
GitHKAndrei Neagu
authored andcommitted
dynamic-scheduler pushes service state to the frontend (⚠️ devops) (ITISFoundation#5892)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 3fb9076 commit 755bdd5

File tree

52 files changed

+2010
-98
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2010
-98
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from typing import Final
2+
3+
SOCKET_IO_SERVICE_STATUS_EVENT: Final[str] = "serviceStatus"

packages/models-library/src/models_library/api_schemas_webserver/projects_nodes.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,36 @@ class NodeGet(OutputSchema):
9393

9494
class Config:
9595
schema_extra: ClassVar[dict[str, Any]] = {
96-
"example": {
97-
"published_port": 30000,
98-
"entrypoint": "/the/entry/point/is/here",
99-
"service_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
100-
"service_key": "simcore/services/comp/itis/sleeper",
101-
"service_version": "1.2.3",
102-
"service_host": "jupyter_E1O2E-LAH",
103-
"service_port": 8081,
104-
"service_basepath": "/x/E1O2E-LAH",
105-
"service_state": "pending",
106-
"service_message": "no suitable node (insufficient resources on 1 node)",
107-
"user_id": 123,
108-
}
96+
"examples": [
97+
# computational
98+
{
99+
"published_port": 30000,
100+
"entrypoint": "/the/entry/point/is/here",
101+
"service_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
102+
"service_key": "simcore/services/comp/itis/sleeper",
103+
"service_version": "1.2.3",
104+
"service_host": "jupyter_E1O2E-LAH",
105+
"service_port": 8081,
106+
"service_basepath": "/x/E1O2E-LAH",
107+
"service_state": "pending",
108+
"service_message": "no suitable node (insufficient resources on 1 node)",
109+
"user_id": 123,
110+
},
111+
# dynamic
112+
{
113+
"published_port": 30000,
114+
"entrypoint": "/the/entry/point/is/here",
115+
"service_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
116+
"service_key": "simcore/services/dynamic/some-dynamic-service",
117+
"service_version": "1.2.3",
118+
"service_host": "jupyter_E1O2E-LAH",
119+
"service_port": 8081,
120+
"service_basepath": "/x/E1O2E-LAH",
121+
"service_state": "pending",
122+
"service_message": "no suitable node (insufficient resources on 1 node)",
123+
"user_id": 123,
124+
},
125+
]
109126
}
110127

111128

packages/models-library/src/models_library/services_enums.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@ class ServiceBootType(str, Enum):
1111
@functools.total_ordering
1212
@unique
1313
class ServiceState(Enum):
14+
FAILED = "failed"
15+
1416
PENDING = "pending"
1517
PULLING = "pulling"
1618
STARTING = "starting"
1719
RUNNING = "running"
18-
COMPLETE = "complete"
19-
FAILED = "failed"
20+
2021
STOPPING = "stopping"
2122

23+
COMPLETE = "complete"
24+
IDLE = "idle"
25+
2226
def __lt__(self, other):
2327
if self.__class__ is other.__class__:
2428
comparison_order = ServiceState.comparison_order()
@@ -39,6 +43,7 @@ def comparison_order() -> dict["ServiceState", int]:
3943
ServiceState.RUNNING: 4,
4044
ServiceState.STOPPING: 5,
4145
ServiceState.COMPLETE: 6,
46+
ServiceState.IDLE: 7,
4247
}
4348

4449

packages/models-library/tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import pytest
1010

1111
pytest_plugins = [
12+
"pytest_simcore.faker_projects_data",
1213
"pytest_simcore.pydantic_models",
1314
"pytest_simcore.pytest_global_environs",
1415
"pytest_simcore.repository_paths",

packages/models-library/tests/test_utils_nodes.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@
1616
from models_library.utils.nodes import compute_node_hash
1717
from pydantic import AnyUrl, parse_obj_as
1818

19-
20-
@pytest.fixture()
21-
def node_id() -> NodeID:
22-
return uuid4()
23-
24-
2519
ANOTHER_NODE_ID = uuid4()
2620
ANOTHER_NODE_OUTPUT_KEY = "the_output_link"
2721
ANOTHER_NODE_PAYLOAD = {"outputs": {ANOTHER_NODE_OUTPUT_KEY: 36}}

packages/service-library/src/servicelib/deferred_tasks/_redis_task_tracker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pickle
12
from typing import Final
23
from uuid import uuid4
34

@@ -33,13 +34,15 @@ async def get_new_unique_identifier(self) -> TaskUID:
3334

3435
async def _get_raw(self, redis_key: str) -> TaskScheduleModel | None:
3536
found_data = await self.redis_client_sdk.redis.get(redis_key)
36-
return None if found_data is None else TaskScheduleModel.parse_raw(found_data)
37+
return None if found_data is None else pickle.loads(found_data) # noqa: S301
3738

3839
async def get(self, task_uid: TaskUID) -> TaskScheduleModel | None:
3940
return await self._get_raw(_get_key(task_uid))
4041

4142
async def save(self, task_uid: TaskUID, task_schedule: TaskScheduleModel) -> None:
42-
await self.redis_client_sdk.redis.set(_get_key(task_uid), task_schedule.json())
43+
await self.redis_client_sdk.redis.set(
44+
_get_key(task_uid), pickle.dumps(task_schedule)
45+
)
4346

4447
async def remove(self, task_uid: TaskUID) -> None:
4548
await self.redis_client_sdk.redis.delete(_get_key(task_uid))

packages/service-library/src/servicelib/services_utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import urllib.parse
22

3+
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
4+
from models_library.api_schemas_webserver.projects_nodes import (
5+
NodeGet,
6+
NodeGetIdle,
7+
NodeGetUnknown,
8+
)
39
from models_library.services import ServiceType
410

511

@@ -9,3 +15,14 @@ def get_service_from_key(service_key: str) -> ServiceType:
915
if encoded_service_type == "comp":
1016
encoded_service_type = "computational"
1117
return ServiceType(encoded_service_type)
18+
19+
20+
def get_status_as_dict(
21+
status: NodeGetIdle | NodeGetUnknown | DynamicServiceGet | NodeGet,
22+
) -> dict:
23+
"""shared between different backend services to guarantee same result to frontend"""
24+
return (
25+
status.dict(by_alias=True)
26+
if isinstance(status, DynamicServiceGet)
27+
else status.dict()
28+
)

packages/service-library/tests/conftest.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,11 @@ async def get_redis_client_sdk(
7676
Callable[[RedisDatabase], AbstractAsyncContextManager[RedisClientSDK]]
7777
]:
7878
@asynccontextmanager
79-
async def _(database: RedisDatabase) -> AsyncIterator[RedisClientSDK]:
79+
async def _(
80+
database: RedisDatabase, decode_response: bool = True # noqa: FBT002
81+
) -> AsyncIterator[RedisClientSDK]:
8082
redis_resources_dns = redis_service.build_redis_dsn(database)
81-
client = RedisClientSDK(redis_resources_dns)
83+
client = RedisClientSDK(redis_resources_dns, decode_responses=decode_response)
8284
assert client
8385
assert client.redis_dsn == redis_resources_dns
8486
await client.setup()

packages/service-library/tests/deferred_tasks/conftest.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
@pytest.fixture
1010
async def redis_client_sdk_deferred_tasks(
1111
get_redis_client_sdk: Callable[
12-
[RedisDatabase], AbstractAsyncContextManager[RedisClientSDK]
12+
[RedisDatabase, bool], AbstractAsyncContextManager[RedisClientSDK]
1313
]
1414
) -> AsyncIterator[RedisClientSDK]:
15-
async with get_redis_client_sdk(RedisDatabase.DEFERRED_TASKS) as client:
15+
async with get_redis_client_sdk(
16+
RedisDatabase.DEFERRED_TASKS, decode_response=False
17+
) as client:
1618
yield client

packages/service-library/tests/deferred_tasks/example_app.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from uuid import uuid4
99

1010
from pydantic import NonNegativeInt
11+
from redis.asyncio import Redis
1112
from servicelib.deferred_tasks import (
1213
BaseDeferredHandler,
1314
DeferredContext,
@@ -54,21 +55,22 @@ async def on_result(cls, result: str, context: DeferredContext) -> None:
5455

5556
class InMemoryLists:
5657
def __init__(self, redis_settings: RedisSettings, port: int) -> None:
57-
self.redis_client_sdk = RedisClientSDK(
58-
redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS)
59-
)
58+
# NOTE: RedisClientSDK is not required here but it's used to easily construct
59+
# a redis connection
60+
self.redis: Redis = RedisClientSDK(
61+
redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS),
62+
decode_responses=True,
63+
).redis
6064
self.port = port
6165

6266
def _get_queue_name(self, queue_name: str) -> str:
6367
return f"in_memory_lists::{queue_name}.{self.port}"
6468

6569
async def append_to(self, queue_name: str, value: Any) -> None:
66-
await self.redis_client_sdk.redis.rpush(self._get_queue_name(queue_name), value) # type: ignore
70+
await self.redis.rpush(self._get_queue_name(queue_name), value) # type: ignore
6771

6872
async def get_all_from(self, queue_name: str) -> list:
69-
return await self.redis_client_sdk.redis.lrange(
70-
self._get_queue_name(queue_name), 0, -1
71-
) # type: ignore
73+
return await self.redis.lrange(self._get_queue_name(queue_name), 0, -1) # type: ignore
7274

7375

7476
class ExampleApp:
@@ -79,18 +81,19 @@ def __init__(
7981
in_memory_lists: InMemoryLists,
8082
max_workers: NonNegativeInt,
8183
) -> None:
82-
self._redis_client_sdk = RedisClientSDK(
83-
redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS)
84+
self._redis_client = RedisClientSDK(
85+
redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS),
86+
decode_responses=False,
8487
)
8588
self._manager = DeferredManager(
8689
rabbit_settings,
87-
self._redis_client_sdk,
90+
self._redis_client,
8891
globals_context={"in_memory_lists": in_memory_lists},
8992
max_workers=max_workers,
9093
)
9194

9295
async def setup(self) -> None:
93-
await self._redis_client_sdk.setup()
96+
await self._redis_client.setup()
9497
await self._manager.setup()
9598

9699

0 commit comments

Comments
 (0)