Skip to content

Commit 2d1b08e

Browse files
author
Andrei Neagu
committed
refactor with proper values
1 parent ff3298c commit 2d1b08e

File tree

7 files changed

+50
-40
lines changed

7 files changed

+50
-40
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ async def _start_tasks(
129129
hardware_info=task.hardware_info,
130130
callback=wake_up_callback,
131131
metadata=comp_run.metadata,
132-
run_id=comp_run.run_id,
132+
resource_tracking_run_id=get_resource_tracking_run_id(
133+
user_id, project_id, node_id, comp_run.iteration
134+
),
133135
)
134136
for node_id, task in scheduled_tasks.items()
135137
),

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
from models_library.projects_nodes_io import NodeID
4949
from models_library.resource_tracker import HardwareInfo
5050
from models_library.users import UserID
51-
from pydantic import PositiveInt, TypeAdapter, ValidationError
51+
from pydantic import TypeAdapter, ValidationError
5252
from pydantic.networks import AnyUrl
5353
from servicelib.logging_utils import log_catch
5454
from settings_library.s3 import S3Settings
@@ -293,7 +293,7 @@ async def send_computation_tasks(
293293
remote_fct: ContainerRemoteFct | None = None,
294294
metadata: RunMetadataDict,
295295
hardware_info: HardwareInfo,
296-
run_id: PositiveInt,
296+
resource_tracking_run_id: str,
297297
) -> list[PublishedComputationTask]:
298298
"""actually sends the function remote_fct to be remotely executed. if None is kept then the default
299299
function that runs container will be started.
@@ -397,7 +397,7 @@ async def send_computation_tasks(
397397
node_id=node_id,
398398
node_image=node_image,
399399
metadata=metadata,
400-
run_id=run_id,
400+
resource_tracking_run_id=resource_tracking_run_id,
401401
)
402402
task_owner = dask_utils.compute_task_owner(
403403
user_id, project_id, node_id, metadata.get("project_metadata", {})

services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from models_library.services_types import RunID
2222
from models_library.users import UserID
2323
from models_library.utils.specs_substitution import SpecsSubstitutionsResolver
24-
from pydantic import BaseModel, PositiveInt
24+
from pydantic import BaseModel
2525
from servicelib.fastapi.app_state import SingletonInAppStateMixin
2626
from servicelib.logging_utils import log_context
2727

@@ -225,7 +225,7 @@ async def resolve_and_substitute_session_variables_in_specs(
225225
product_name: str,
226226
project_id: ProjectID,
227227
node_id: NodeID,
228-
run_id: RunID | PositiveInt,
228+
run_id: RunID | str,
229229
) -> dict[str, Any]:
230230
table = OsparcSessionVariablesTable.get_from_app_state(app)
231231
resolver = SpecsSubstitutionsResolver(specs, upgrade=False)

services/director-v2/src/simcore_service_director_v2/utils/dask.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from models_library.projects_nodes_io import NodeID, NodeIDStr
2828
from models_library.services import ServiceKey, ServiceVersion
2929
from models_library.users import UserID
30-
from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter, ValidationError
30+
from pydantic import AnyUrl, ByteSize, TypeAdapter, ValidationError
3131
from servicelib.logging_utils import log_catch, log_context
3232
from simcore_sdk import node_ports_v2
3333
from simcore_sdk.node_ports_common.exceptions import (
@@ -342,7 +342,7 @@ async def compute_task_envs(
342342
node_id: NodeID,
343343
node_image: Image,
344344
metadata: RunMetadataDict,
345-
run_id: PositiveInt,
345+
resource_tracking_run_id: str,
346346
) -> ContainerEnvsDict:
347347
product_name = metadata.get("product_name", UNDEFINED_DOCKER_LABEL)
348348
task_envs = node_image.envs
@@ -361,7 +361,7 @@ async def compute_task_envs(
361361
product_name=product_name,
362362
project_id=project_id,
363363
node_id=node_id,
364-
run_id=run_id,
364+
run_id=resource_tracking_run_id,
365365
)
366366
# NOTE: see https://github.com/ITISFoundation/osparc-simcore/issues/3638
367367
# we currently do not validate as we are using illegal docker key names with underscores

services/director-v2/tests/unit/conftest.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,24 @@
2222
from models_library.generated_models.docker_rest_api import (
2323
ServiceSpec as DockerServiceSpec,
2424
)
25+
from models_library.projects import ProjectID
26+
from models_library.projects_nodes_io import NodeID
2527
from models_library.service_settings_labels import SimcoreServiceLabels
2628
from models_library.services import RunID, ServiceKey, ServiceKeyVersion, ServiceVersion
2729
from models_library.services_enums import ServiceState
30+
from models_library.users import UserID
2831
from models_library.utils._original_fastapi_encoders import jsonable_encoder
29-
from pydantic import PositiveInt, TypeAdapter
32+
from pydantic import TypeAdapter
3033
from pytest_mock.plugin import MockerFixture
3134
from pytest_simcore.helpers.typing_env import EnvVarsDict
3235
from settings_library.s3 import S3Settings
3336
from simcore_sdk.node_ports_v2 import FileLinkType
3437
from simcore_service_director_v2.constants import DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL
3538
from simcore_service_director_v2.core.settings import AppSettings
3639
from simcore_service_director_v2.models.dynamic_services_scheduler import SchedulerData
40+
from simcore_service_director_v2.modules.comp_scheduler._utils import (
41+
get_resource_tracking_run_id,
42+
)
3743

3844

3945
@pytest.fixture
@@ -341,5 +347,7 @@ async def async_docker_client() -> AsyncIterable[aiodocker.Docker]:
341347

342348

343349
@pytest.fixture
344-
def comp_task_run_id() -> PositiveInt:
345-
return 42
350+
def resource_tracking_run_id(
351+
user_id: UserID, project_id: ProjectID, node_id: NodeID
352+
) -> str:
353+
return get_resource_tracking_run_id(user_id, project_id, node_id, 42)

services/director-v2/tests/unit/test_modules_dask_client.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from models_library.projects_nodes_io import NodeID
4747
from models_library.resource_tracker import HardwareInfo
4848
from models_library.users import UserID
49-
from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter
49+
from pydantic import AnyUrl, ByteSize, TypeAdapter
5050
from pytest_mock.plugin import MockerFixture
5151
from pytest_simcore.helpers.typing_env import EnvVarsDict
5252
from settings_library.s3 import S3Settings
@@ -442,7 +442,7 @@ async def test_send_computation_task(
442442
task_labels: ContainerLabelsDict,
443443
empty_hardware_info: HardwareInfo,
444444
faker: Faker,
445-
comp_task_run_id: PositiveInt,
445+
resource_tracking_run_id: str,
446446
):
447447
_DASK_EVENT_NAME = faker.pystr()
448448

@@ -504,7 +504,7 @@ def fake_sidecar_fct(
504504
),
505505
metadata=comp_run_metadata,
506506
hardware_info=empty_hardware_info,
507-
run_id=comp_task_run_id,
507+
resource_tracking_run_id=resource_tracking_run_id,
508508
)
509509
assert node_id_to_job_ids
510510
assert len(node_id_to_job_ids) == 1
@@ -561,7 +561,7 @@ async def test_computation_task_is_persisted_on_dask_scheduler(
561561
mocked_storage_service_api: respx.MockRouter,
562562
comp_run_metadata: RunMetadataDict,
563563
empty_hardware_info: HardwareInfo,
564-
comp_task_run_id: PositiveInt,
564+
resource_tracking_run_id: str,
565565
):
566566
"""rationale:
567567
When a task is submitted to the dask backend, a dask future is returned.
@@ -597,7 +597,7 @@ def fake_sidecar_fct(
597597
remote_fct=fake_sidecar_fct,
598598
metadata=comp_run_metadata,
599599
hardware_info=empty_hardware_info,
600-
run_id=comp_task_run_id,
600+
resource_tracking_run_id=resource_tracking_run_id,
601601
)
602602
assert published_computation_task
603603
assert len(published_computation_task) == 1
@@ -653,7 +653,7 @@ async def test_abort_computation_tasks(
653653
faker: Faker,
654654
comp_run_metadata: RunMetadataDict,
655655
empty_hardware_info: HardwareInfo,
656-
comp_task_run_id: PositiveInt,
656+
resource_tracking_run_id: str,
657657
):
658658
_DASK_EVENT_NAME = faker.pystr()
659659

@@ -692,7 +692,7 @@ def fake_remote_fct(
692692
remote_fct=fake_remote_fct,
693693
metadata=comp_run_metadata,
694694
hardware_info=empty_hardware_info,
695-
run_id=comp_task_run_id,
695+
resource_tracking_run_id=resource_tracking_run_id,
696696
)
697697
assert published_computation_task
698698
assert len(published_computation_task) == 1
@@ -744,7 +744,7 @@ async def test_failed_task_returns_exceptions(
744744
mocked_storage_service_api: respx.MockRouter,
745745
comp_run_metadata: RunMetadataDict,
746746
empty_hardware_info: HardwareInfo,
747-
comp_task_run_id: PositiveInt,
747+
resource_tracking_run_id: str,
748748
):
749749
# NOTE: this must be inlined so that the test works,
750750
# the dask-worker must be able to import the function
@@ -765,7 +765,7 @@ def fake_failing_sidecar_fct(
765765
remote_fct=fake_failing_sidecar_fct,
766766
metadata=comp_run_metadata,
767767
hardware_info=empty_hardware_info,
768-
run_id=comp_task_run_id,
768+
resource_tracking_run_id=resource_tracking_run_id,
769769
)
770770
assert published_computation_task
771771
assert len(published_computation_task) == 1
@@ -808,7 +808,7 @@ async def test_send_computation_task_with_missing_resources_raises(
808808
mocked_storage_service_api: respx.MockRouter,
809809
comp_run_metadata: RunMetadataDict,
810810
empty_hardware_info: HardwareInfo,
811-
comp_task_run_id: PositiveInt,
811+
resource_tracking_run_id: str,
812812
):
813813
# remove the workers that can handle gpu
814814
scheduler_info = dask_client.backend.client.scheduler_info()
@@ -835,7 +835,7 @@ async def test_send_computation_task_with_missing_resources_raises(
835835
remote_fct=None,
836836
metadata=comp_run_metadata,
837837
hardware_info=empty_hardware_info,
838-
run_id=comp_task_run_id,
838+
resource_tracking_run_id=resource_tracking_run_id,
839839
)
840840
mocked_user_completed_cb.assert_not_called()
841841

@@ -854,7 +854,7 @@ async def test_send_computation_task_with_hardware_info_raises(
854854
mocked_storage_service_api: respx.MockRouter,
855855
comp_run_metadata: RunMetadataDict,
856856
hardware_info: HardwareInfo,
857-
comp_task_run_id: PositiveInt,
857+
resource_tracking_run_id: str,
858858
):
859859
# NOTE: running on the default cluster will raise missing resources
860860
with pytest.raises(MissingComputationalResourcesError):
@@ -866,7 +866,7 @@ async def test_send_computation_task_with_hardware_info_raises(
866866
remote_fct=None,
867867
metadata=comp_run_metadata,
868868
hardware_info=hardware_info,
869-
run_id=comp_task_run_id,
869+
resource_tracking_run_id=resource_tracking_run_id,
870870
)
871871
mocked_user_completed_cb.assert_not_called()
872872

@@ -884,7 +884,7 @@ async def test_too_many_resources_send_computation_task(
884884
mocked_storage_service_api: respx.MockRouter,
885885
comp_run_metadata: RunMetadataDict,
886886
empty_hardware_info: HardwareInfo,
887-
comp_task_run_id: PositiveInt,
887+
resource_tracking_run_id: str,
888888
):
889889
# create an image that needs a huge amount of CPU
890890
image = Image(
@@ -908,7 +908,7 @@ async def test_too_many_resources_send_computation_task(
908908
remote_fct=None,
909909
metadata=comp_run_metadata,
910910
hardware_info=empty_hardware_info,
911-
run_id=comp_task_run_id,
911+
resource_tracking_run_id=resource_tracking_run_id,
912912
)
913913

914914
mocked_user_completed_cb.assert_not_called()
@@ -925,7 +925,7 @@ async def test_disconnected_backend_raises_exception(
925925
mocked_storage_service_api: respx.MockRouter,
926926
comp_run_metadata: RunMetadataDict,
927927
empty_hardware_info: HardwareInfo,
928-
comp_task_run_id: PositiveInt,
928+
resource_tracking_run_id: str,
929929
):
930930
# DISCONNECT THE CLUSTER
931931
await dask_spec_local_cluster.close() # type: ignore
@@ -938,7 +938,7 @@ async def test_disconnected_backend_raises_exception(
938938
remote_fct=None,
939939
metadata=comp_run_metadata,
940940
hardware_info=empty_hardware_info,
941-
run_id=comp_task_run_id,
941+
resource_tracking_run_id=resource_tracking_run_id,
942942
)
943943
mocked_user_completed_cb.assert_not_called()
944944

@@ -958,7 +958,7 @@ async def test_changed_scheduler_raises_exception(
958958
unused_tcp_port_factory: Callable,
959959
comp_run_metadata: RunMetadataDict,
960960
empty_hardware_info: HardwareInfo,
961-
comp_task_run_id: PositiveInt,
961+
resource_tracking_run_id: str,
962962
):
963963
# change the scheduler (stop the current one and start another at the same address)
964964
scheduler_address = URL(dask_spec_local_cluster.scheduler_address)
@@ -988,7 +988,7 @@ async def test_changed_scheduler_raises_exception(
988988
remote_fct=None,
989989
metadata=comp_run_metadata,
990990
hardware_info=empty_hardware_info,
991-
run_id=comp_task_run_id,
991+
resource_tracking_run_id=resource_tracking_run_id,
992992
)
993993
mocked_user_completed_cb.assert_not_called()
994994

@@ -1006,7 +1006,7 @@ async def test_get_tasks_status(
10061006
fail_remote_fct: bool,
10071007
comp_run_metadata: RunMetadataDict,
10081008
empty_hardware_info: HardwareInfo,
1009-
comp_task_run_id: PositiveInt,
1009+
resource_tracking_run_id: str,
10101010
):
10111011
# NOTE: this must be inlined so that the test works,
10121012
# the dask-worker must be able to import the function
@@ -1034,7 +1034,7 @@ def fake_remote_fct(
10341034
remote_fct=fake_remote_fct,
10351035
metadata=comp_run_metadata,
10361036
hardware_info=empty_hardware_info,
1037-
run_id=comp_task_run_id,
1037+
resource_tracking_run_id=resource_tracking_run_id,
10381038
)
10391039
assert published_computation_task
10401040
assert len(published_computation_task) == 1
@@ -1089,7 +1089,7 @@ async def test_dask_sub_handlers(
10891089
fake_task_handlers: TaskHandlers,
10901090
comp_run_metadata: RunMetadataDict,
10911091
empty_hardware_info: HardwareInfo,
1092-
comp_task_run_id: PositiveInt,
1092+
resource_tracking_run_id: str,
10931093
):
10941094
dask_client.register_handlers(fake_task_handlers)
10951095
_DASK_START_EVENT = "start"
@@ -1119,7 +1119,7 @@ def fake_remote_fct(
11191119
remote_fct=fake_remote_fct,
11201120
metadata=comp_run_metadata,
11211121
hardware_info=empty_hardware_info,
1122-
run_id=comp_task_run_id,
1122+
resource_tracking_run_id=resource_tracking_run_id,
11231123
)
11241124
assert published_computation_task
11251125
assert len(published_computation_task) == 1
@@ -1164,7 +1164,7 @@ async def test_get_cluster_details(
11641164
comp_run_metadata: RunMetadataDict,
11651165
empty_hardware_info: HardwareInfo,
11661166
faker: Faker,
1167-
comp_task_run_id: PositiveInt,
1167+
resource_tracking_run_id: str,
11681168
):
11691169
cluster_details = await dask_client.get_cluster_details()
11701170
assert cluster_details
@@ -1201,7 +1201,7 @@ def fake_sidecar_fct(
12011201
),
12021202
metadata=comp_run_metadata,
12031203
hardware_info=empty_hardware_info,
1204-
run_id=comp_task_run_id,
1204+
resource_tracking_run_id=resource_tracking_run_id,
12051205
)
12061206
assert published_computation_task
12071207
assert len(published_computation_task) == 1

services/director-v2/tests/unit/with_dbs/test_utils_dask.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from models_library.projects import ProjectID
3636
from models_library.projects_nodes_io import NodeID, SimCoreFileLink, SimcoreS3FileID
3737
from models_library.users import UserID
38-
from pydantic import ByteSize, PositiveInt, TypeAdapter
38+
from pydantic import ByteSize, TypeAdapter
3939
from pydantic.networks import AnyUrl
4040
from pytest_mock.plugin import MockerFixture
4141
from pytest_simcore.helpers.typing_env import EnvVarsDict
@@ -647,7 +647,7 @@ async def test_compute_task_envs(
647647
run_metadata: RunMetadataDict,
648648
input_task_envs: ContainerEnvsDict,
649649
expected_computed_task_envs: ContainerEnvsDict,
650-
comp_task_run_id: PositiveInt,
650+
resource_tracking_run_id: str,
651651
):
652652
sleeper_task: CompTaskAtDB = published_project.tasks[1]
653653
sleeper_task.image.envs = input_task_envs
@@ -659,6 +659,6 @@ async def test_compute_task_envs(
659659
node_id=sleeper_task.node_id,
660660
node_image=sleeper_task.image,
661661
metadata=run_metadata,
662-
run_id=comp_task_run_id,
662+
resource_tracking_run_id=resource_tracking_run_id,
663663
)
664664
assert task_envs == expected_computed_task_envs

0 commit comments

Comments
 (0)