Skip to content

Commit 49134e7

Browse files
committed
clean unused stuff
1 parent 9149e41 commit 49134e7

File tree

2 files changed

+3
-157
lines changed

2 files changed

+3
-157
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
import logging
1313
import traceback
1414
from collections.abc import Callable, Iterable
15-
from copy import deepcopy
1615
from dataclasses import dataclass
1716
from http.client import HTTPException
18-
from typing import Any, Final, cast
17+
from typing import Final, cast
1918

2019
import distributed
2120
from aiohttp import ClientResponseError
@@ -49,17 +48,16 @@
4948
create_ec2_resource_constraint_key,
5049
)
5150
from fastapi import FastAPI
52-
from models_library.api_schemas_directorv2.clusters import ClusterDetails, Scheduler
5351
from models_library.clusters import ClusterAuthentication, ClusterTypeInModel
5452
from models_library.projects import ProjectID
5553
from models_library.projects_nodes_io import NodeID
5654
from models_library.projects_state import RunningState
5755
from models_library.resource_tracker import HardwareInfo
5856
from models_library.services import ServiceRunID
5957
from models_library.users import UserID
60-
from pydantic import TypeAdapter, ValidationError
58+
from pydantic import ValidationError
6159
from pydantic.networks import AnyUrl
62-
from servicelib.logging_utils import log_catch, log_context
60+
from servicelib.logging_utils import log_context
6361
from settings_library.s3 import S3Settings
6462
from simcore_sdk.node_ports_common.exceptions import NodeportsException
6563
from simcore_sdk.node_ports_v2 import FileLinkType
@@ -538,50 +536,3 @@ async def release_task_result(self, job_id: str) -> None:
538536

539537
except KeyError:
540538
_logger.warning("Unknown task cannot be unpublished: %s", f"{job_id=}")
541-
542-
async def get_cluster_details(self) -> ClusterDetails:
543-
dask_utils.check_scheduler_is_still_the_same(
544-
self.backend.scheduler_id, self.backend.client
545-
)
546-
dask_utils.check_communication_with_scheduler_is_open(self.backend.client)
547-
dask_utils.check_scheduler_status(self.backend.client)
548-
scheduler_info = self.backend.client.scheduler_info()
549-
scheduler_status = self.backend.client.status
550-
dashboard_link = self.backend.client.dashboard_link
551-
552-
def _get_worker_used_resources(
553-
dask_scheduler: distributed.Scheduler,
554-
) -> dict[str, dict]:
555-
used_resources = {}
556-
for worker_name, worker_state in dask_scheduler.workers.items():
557-
used_resources[worker_name] = worker_state.used_resources
558-
return used_resources
559-
560-
with log_catch(_logger, reraise=False):
561-
# NOTE: this runs directly on the dask-scheduler and may rise exceptions
562-
used_resources_per_worker: dict[str, dict[str, Any]] = (
563-
await dask_utils.wrap_client_async_routine(
564-
self.backend.client.run_on_scheduler(_get_worker_used_resources)
565-
)
566-
)
567-
568-
# let's update the scheduler info, with default to 0s since sometimes
569-
# workers are destroyed/created without us knowing right away
570-
for worker_name, worker_info in scheduler_info.get("workers", {}).items():
571-
used_resources: dict[str, float] = deepcopy(
572-
worker_info.get("resources", {})
573-
)
574-
# reset default values
575-
for res_name in used_resources:
576-
used_resources[res_name] = 0
577-
# if the scheduler has info, let's override them
578-
used_resources = used_resources_per_worker.get(
579-
worker_name, used_resources
580-
)
581-
worker_info.update(used_resources=used_resources)
582-
583-
assert dashboard_link # nosec
584-
return ClusterDetails(
585-
scheduler=Scheduler(status=scheduler_status, **scheduler_info),
586-
dashboard_link=TypeAdapter(AnyUrl).validate_python(dashboard_link),
587-
)

services/director-v2/tests/unit/test_modules_dask_client.py

Lines changed: 0 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,108 +1162,3 @@ def fake_remote_fct(
11621162
(mock.ANY, "my name is progress")
11631163
)
11641164
await _assert_wait_for_cb_call(mocked_user_completed_cb)
1165-
1166-
1167-
async def test_get_cluster_details(
1168-
dask_client: DaskClient,
1169-
user_id: UserID,
1170-
project_id: ProjectID,
1171-
image_params: ImageParams,
1172-
_mocked_node_ports: None,
1173-
mocked_user_completed_cb: mock.AsyncMock,
1174-
mocked_storage_service_api: respx.MockRouter,
1175-
comp_run_metadata: RunMetadataDict,
1176-
empty_hardware_info: HardwareInfo,
1177-
faker: Faker,
1178-
resource_tracking_run_id: ServiceRunID,
1179-
):
1180-
cluster_details = await dask_client.get_cluster_details()
1181-
assert cluster_details
1182-
1183-
_DASK_EVENT_NAME = faker.pystr()
1184-
1185-
# send a fct that uses resources
1186-
def fake_sidecar_fct(
1187-
task_parameters: ContainerTaskParameters,
1188-
docker_auth: DockerBasicAuth,
1189-
log_file_url: LogFileUploadURL,
1190-
s3_settings: S3Settings | None,
1191-
expected_annotations,
1192-
) -> TaskOutputData:
1193-
# get the task data
1194-
worker = get_worker()
1195-
task = worker.state.tasks.get(worker.get_current_task())
1196-
assert task is not None
1197-
assert task.annotations == expected_annotations
1198-
assert task_parameters.command == ["run"]
1199-
event = distributed.Event(_DASK_EVENT_NAME)
1200-
event.wait(timeout=25)
1201-
1202-
return TaskOutputData.model_validate({"some_output_key": 123})
1203-
1204-
# NOTE: We pass another fct so it can run in our localy created dask cluster
1205-
published_computation_task = await dask_client.send_computation_tasks(
1206-
user_id=user_id,
1207-
project_id=project_id,
1208-
tasks=image_params.fake_tasks,
1209-
callback=mocked_user_completed_cb,
1210-
remote_fct=functools.partial(
1211-
fake_sidecar_fct, expected_annotations=image_params.expected_annotations
1212-
),
1213-
metadata=comp_run_metadata,
1214-
hardware_info=empty_hardware_info,
1215-
resource_tracking_run_id=resource_tracking_run_id,
1216-
)
1217-
assert published_computation_task
1218-
assert len(published_computation_task) == 1
1219-
1220-
assert published_computation_task[0].node_id in image_params.fake_tasks
1221-
1222-
# check status goes to PENDING/STARTED
1223-
await _assert_wait_for_task_status(
1224-
published_computation_task[0].job_id,
1225-
dask_client,
1226-
expected_status=RunningState.STARTED,
1227-
)
1228-
1229-
# check we have one worker using the resources
1230-
# one of the workers should now get the job and use the resources
1231-
worker_with_the_task: AnyUrl | None = None
1232-
async for attempt in AsyncRetrying(reraise=True, stop=stop_after_delay(10)):
1233-
with attempt:
1234-
cluster_details = await dask_client.get_cluster_details()
1235-
assert cluster_details
1236-
assert (
1237-
cluster_details.scheduler.workers
1238-
), f"there are no workers in {cluster_details.scheduler=!r}"
1239-
for worker_url, worker_data in cluster_details.scheduler.workers.items():
1240-
if all(
1241-
worker_data.used_resources.get(res_name) == res_value
1242-
for res_name, res_value in image_params.expected_used_resources.items()
1243-
):
1244-
worker_with_the_task = worker_url
1245-
assert (
1246-
worker_with_the_task is not None
1247-
), f"there is no worker in {cluster_details.scheduler.workers.keys()=} consuming {image_params.expected_annotations=!r}"
1248-
1249-
# using the event we let the remote fct continue
1250-
event = distributed.Event(_DASK_EVENT_NAME, client=dask_client.backend.client)
1251-
await event.set() # type: ignore
1252-
1253-
# wait for the task to complete
1254-
await _assert_wait_for_task_status(
1255-
published_computation_task[0].job_id,
1256-
dask_client,
1257-
expected_status=RunningState.SUCCESS,
1258-
)
1259-
1260-
# check the resources are released
1261-
cluster_details = await dask_client.get_cluster_details()
1262-
assert cluster_details
1263-
assert cluster_details.scheduler.workers
1264-
assert worker_with_the_task
1265-
currently_used_resources = cluster_details.scheduler.workers[
1266-
worker_with_the_task
1267-
].used_resources
1268-
1269-
assert all(res == 0.0 for res in currently_used_resources.values())

0 commit comments

Comments
 (0)