Skip to content

Commit 486fb33

Browse files
author
Andrei Neagu
committed
refactor to use new RunID
1 parent dfdacbb commit 486fb33

File tree

19 files changed

+99
-76
lines changed

19 files changed

+99
-76
lines changed

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from .projects_state import RunningState
1616
from .services import ServiceKey, ServiceType, ServiceVersion
1717
from .services_resources import ServiceResourcesDict
18+
from .services_types import RunID
1819
from .users import UserID
1920
from .utils.enums import StrAutoEnum
2021
from .wallets import WalletID
@@ -178,7 +179,7 @@ class RabbitResourceTrackingMessageType(StrAutoEnum):
178179
class RabbitResourceTrackingBaseMessage(RabbitMessageBase):
179180
channel_name: Literal["io.simcore.service.tracking"] = "io.simcore.service.tracking"
180181

181-
service_run_id: str = Field(
182+
service_run_id: RunID = Field(
182183
..., description="uniquely identitifies the service run"
183184
)
184185
created_at: datetime.datetime = Field(

packages/models-library/src/models_library/services_types.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
1-
from typing import Annotated, Any, TypeAlias
1+
from typing import TYPE_CHECKING, Annotated, Any, Self, TypeAlias
22
from uuid import uuid4
33

44
import arrow
5-
from pydantic import GetCoreSchemaHandler, StringConstraints, ValidationInfo
5+
from pydantic import (
6+
GetCoreSchemaHandler,
7+
PositiveInt,
8+
StringConstraints,
9+
ValidationInfo,
10+
)
611
from pydantic_core import CoreSchema, core_schema
712

813
from .basic_regex import PROPERTY_KEY_RE, SIMPLE_VERSION_RE
14+
from .projects_nodes_io import NodeID
915
from .services_regex import (
1016
COMPUTATIONAL_SERVICE_KEY_RE,
1117
DYNAMIC_SERVICE_KEY_RE,
1218
FILENAME_RE,
1319
SERVICE_ENCODED_KEY_RE,
1420
SERVICE_KEY_RE,
1521
)
22+
from .users import UserID
23+
24+
if TYPE_CHECKING:
25+
from .projects import ProjectID
1626

1727
ServicePortKey: TypeAlias = Annotated[str, StringConstraints(pattern=PROPERTY_KEY_RE)]
1828

@@ -44,12 +54,15 @@ class RunID(str):
4454
and old volumes for different runs.
4555
Avoids overwriting data that left dropped on the node (due to an error)
4656
and gives the osparc-agent an opportunity to back it up.
57+
The resource-usage-tracker tracker uses these RunIDs to keep track of
58+
resource usage from comp and dynamic services.
4759
"""
4860

4961
__slots__ = ()
5062

5163
@classmethod
52-
def create(cls) -> "RunID":
64+
def create_for_dynamic_sidecar(cls) -> Self:
65+
"""used for dynamic services"""
5366
# NOTE: there was a legacy version of this RunID
5467
# legacy version:
5568
# '0ac3ed64-665b-42d2-95f7-e59e0db34242'
@@ -59,6 +72,17 @@ def create(cls) -> "RunID":
5972
run_id_format = f"{utc_int_timestamp}_{uuid4()}"
6073
return cls(run_id_format)
6174

75+
@classmethod
76+
def get_resource_tracking_run_id(
77+
cls,
78+
user_id: UserID,
79+
project_id: "ProjectID",
80+
node_id: NodeID,
81+
iteration: PositiveInt,
82+
) -> Self:
83+
"""used by computational services"""
84+
return cls(f"comp_{user_id}_{project_id}_{node_id}_{iteration}")
85+
6286
@classmethod
6387
def __get_pydantic_core_schema__(
6488
cls,
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import pytest
2+
from models_library.projects import ProjectID
3+
from models_library.projects_nodes import NodeID
4+
from models_library.services_types import RunID
5+
from models_library.users import UserID
6+
from pydantic import PositiveInt
7+
8+
9+
@pytest.mark.parametrize(
10+
"user_id, project_id, node_id, iteration, expected_result",
11+
[
12+
(
13+
2,
14+
ProjectID("e08356e4-eb74-49e9-b769-2c26e34c61d9"),
15+
NodeID("a08356e4-eb74-49e9-b769-2c26e34c61d1"),
16+
5,
17+
"comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_a08356e4-eb74-49e9-b769-2c26e34c61d1_5",
18+
)
19+
],
20+
)
21+
def test_run_id_get_resource_tracking_run_id(
22+
user_id: UserID,
23+
project_id: ProjectID,
24+
node_id: NodeID,
25+
iteration: PositiveInt,
26+
expected_result: str,
27+
):
28+
resource_tracking_run_id = RunID.get_resource_tracking_run_id(
29+
user_id, project_id, node_id, iteration
30+
)
31+
assert isinstance(resource_tracking_run_id, RunID)
32+
assert resource_tracking_run_id == expected_result
33+
34+
35+
def test_run_id_create_for_dynamic_sidecar():
36+
assert isinstance(RunID.create_for_dynamic_sidecar(), RunID)

services/agent/tests/unit/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def test_client(initialized_app: FastAPI) -> TestClient:
5757

5858
@pytest.fixture
5959
def run_id() -> RunID:
60-
return RunID.create()
60+
return RunID.create_for_dynamic_sidecar()
6161

6262

6363
@pytest.fixture

services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ class SchedulerData(CommonServiceDetails, DynamicSidecarServiceLabels):
377377
description="Name of the current dynamic-sidecar being observed",
378378
)
379379
run_id: RunID = Field(
380-
default_factory=RunID.create,
380+
default_factory=RunID.create_for_dynamic_sidecar,
381381
description=(
382382
"Uniquely identify the dynamic sidecar session (a.k.a. 2 "
383383
"subsequent exact same services will have a different run_id)"

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from models_library.projects_nodes_io import NodeID, NodeIDStr
2727
from models_library.projects_state import RunningState
2828
from models_library.services import ServiceType
29+
from models_library.services_types import RunID
2930
from models_library.users import UserID
3031
from networkx.classes.reportviews import InDegreeView
3132
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
@@ -66,7 +67,6 @@
6667
TASK_TO_START_STATES,
6768
WAITING_FOR_START_STATES,
6869
create_service_resources_from_task,
69-
get_resource_tracking_run_id,
7070
)
7171

7272
_logger = logging.getLogger(__name__)
@@ -295,7 +295,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
295295
*(
296296
publish_service_resource_tracking_heartbeat(
297297
self.rabbitmq_client,
298-
get_resource_tracking_run_id(
298+
RunID.get_resource_tracking_run_id(
299299
user_id, t.project_id, t.node_id, iteration
300300
),
301301
)
@@ -348,7 +348,7 @@ async def _process_started_tasks(
348348
*(
349349
publish_service_resource_tracking_started(
350350
self.rabbitmq_client,
351-
service_run_id=get_resource_tracking_run_id(
351+
service_run_id=RunID.get_resource_tracking_run_id(
352352
user_id, t.project_id, t.node_id, iteration
353353
),
354354
wallet_id=run_metadata.get("wallet_id"),

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from models_library.projects_nodes_io import NodeID
1919
from models_library.projects_state import RunningState
2020
from models_library.rabbitmq_messages import SimcorePlatformStatus
21+
from models_library.services_types import RunID
2122
from models_library.users import UserID
2223
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
2324
from servicelib.logging_utils import log_catch
@@ -48,7 +49,6 @@
4849
from ..db.repositories.comp_runs import CompRunsRepository
4950
from ..db.repositories.comp_tasks import CompTasksRepository
5051
from ._scheduler_base import BaseCompScheduler
51-
from ._utils import get_resource_tracking_run_id
5252

5353
_logger = logging.getLogger(__name__)
5454

@@ -129,7 +129,7 @@ async def _start_tasks(
129129
hardware_info=task.hardware_info,
130130
callback=wake_up_callback,
131131
metadata=comp_run.metadata,
132-
resource_tracking_run_id=get_resource_tracking_run_id(
132+
resource_tracking_run_id=RunID.get_resource_tracking_run_id(
133133
user_id, project_id, node_id, comp_run.iteration
134134
),
135135
)
@@ -322,7 +322,9 @@ async def _process_task_result(
322322
# resource tracking
323323
await publish_service_resource_tracking_stopped(
324324
self.rabbitmq_client,
325-
get_resource_tracking_run_id(user_id, project_id, node_id, iteration),
325+
RunID.get_resource_tracking_run_id(
326+
user_id, project_id, node_id, iteration
327+
),
326328
simcore_platform_status=simcore_platform_status,
327329
)
328330
# instrumentation

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_utils.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,15 @@
22

33
from fastapi import FastAPI
44
from models_library.docker import DockerGenericTag
5-
from models_library.projects import ProjectID
6-
from models_library.projects_nodes_io import NodeID
75
from models_library.projects_state import RunningState
86
from models_library.services_resources import (
97
ResourceValue,
108
ServiceResourcesDict,
119
ServiceResourcesDictHelpers,
1210
)
13-
from models_library.users import UserID
1411
from servicelib.redis import RedisClientSDK
1512
from settings_library.redis import RedisDatabase
1613

17-
from ...models.comp_runs import Iteration
1814
from ...models.comp_tasks import CompTaskAtDB
1915
from ..redis import get_redis_client_manager
2016

@@ -55,12 +51,6 @@
5551
}
5652

5753

58-
def get_resource_tracking_run_id(
59-
user_id: UserID, project_id: ProjectID, node_id: NodeID, iteration: Iteration
60-
) -> str:
61-
return f"comp_{user_id}_{project_id}_{node_id}_{iteration}"
62-
63-
6454
def create_service_resources_from_task(task: CompTaskAtDB) -> ServiceResourcesDict:
6555
assert task.image.node_requirements # nosec
6656
return ServiceResourcesDictHelpers.create_from_single_service(

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
237237

238238
# Each time a new dynamic-sidecar service is created
239239
# generate a new `run_id` to avoid resource collisions
240-
scheduler_data.run_id = RunID.create()
240+
scheduler_data.run_id = RunID.create_for_dynamic_sidecar()
241241

242242
rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client
243243

services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from models_library.services import ServiceKey, ServiceType, ServiceVersion
1717
from models_library.services_resources import ServiceResourcesDict
18+
from models_library.services_types import RunID
1819
from models_library.users import UserID
1920
from models_library.wallets import WalletID
2021
from pydantic import NonNegativeFloat
@@ -70,7 +71,7 @@ async def publish_service_stopped_metrics(
7071

7172
async def publish_service_resource_tracking_started( # pylint: disable=too-many-arguments # noqa: PLR0913
7273
rabbitmq_client: RabbitMQClient,
73-
service_run_id: str,
74+
service_run_id: RunID,
7475
*,
7576
wallet_id: WalletID | None,
7677
wallet_name: str | None,
@@ -127,7 +128,7 @@ async def publish_service_resource_tracking_started( # pylint: disable=too-many
127128

128129
async def publish_service_resource_tracking_stopped(
129130
rabbitmq_client: RabbitMQClient,
130-
service_run_id: str,
131+
service_run_id: RunID,
131132
*,
132133
simcore_platform_status: SimcorePlatformStatus,
133134
) -> None:
@@ -138,7 +139,7 @@ async def publish_service_resource_tracking_stopped(
138139

139140

140141
async def publish_service_resource_tracking_heartbeat(
141-
rabbitmq_client: RabbitMQClient, service_run_id: str
142+
rabbitmq_client: RabbitMQClient, service_run_id: RunID
142143
) -> None:
143144
message = RabbitResourceTrackingHeartbeatMessage(service_run_id=service_run_id)
144145
await rabbitmq_client.publish(message.channel_name, message)

0 commit comments

Comments
 (0)