Skip to content

Commit 8f9322c

Browse files
committed
removed daskstate and upgrade test using running state
1 parent a7fdc0a commit 8f9322c

File tree

4 files changed

+27
-105
lines changed

4 files changed

+27
-105
lines changed

services/director-v2/src/simcore_service_director_v2/models/dask_subsystem.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ async def _get_tasks_status(
157157
use_on_demand_clusters=comp_run.use_on_demand_clusters,
158158
run_metadata=comp_run.metadata,
159159
) as client:
160-
return await client.get_tasks_status2([f"{t.job_id}" for t in tasks])
160+
return await client.get_tasks_status([f"{t.job_id}" for t in tasks])
161161
# tasks_statuses = await client.get_tasks_status(
162162
# [f"{t.job_id}" for t in tasks]
163163
# )

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 2 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from http.client import HTTPException
1818
from typing import Any, Final, cast
1919

20-
import dask.typing
2120
import distributed
2221
from aiohttp import ClientResponseError
2322
from common_library.json_serialization import json_dumps
@@ -427,7 +426,7 @@ async def send_computation_tasks(
427426

428427
return list_of_node_id_to_job_id
429428

430-
async def get_tasks_status2(self, job_ids: Iterable[str]) -> list[RunningState]:
429+
async def get_tasks_status(self, job_ids: Iterable[str]) -> list[RunningState]:
431430
dask_utils.check_scheduler_is_still_the_same(
432431
self.backend.scheduler_id, self.backend.client
433432
)
@@ -436,6 +435,7 @@ async def get_tasks_status2(self, job_ids: Iterable[str]) -> list[RunningState]:
436435

437436
async def _get_job_id_status(job_id: str) -> RunningState:
438437
# TODO: maybe we should define an event just for that, instead of multiple calls
438+
# but the max length by default is 1000. We should test it
439439
dask_events: tuple[tuple[UnixTimestamp, str], ...] = (
440440
await self.backend.client.get_events(
441441
TASK_LIFE_CYCLE_EVENT.format(key=job_id)
@@ -478,74 +478,6 @@ async def _get_job_id_status(job_id: str) -> RunningState:
478478

479479
return await asyncio.gather(*(_get_job_id_status(job_id) for job_id in job_ids))
480480

481-
async def get_tasks_status(self, job_ids: list[str]) -> list[DaskClientTaskState]:
482-
dask_utils.check_scheduler_is_still_the_same(
483-
self.backend.scheduler_id, self.backend.client
484-
)
485-
dask_utils.check_communication_with_scheduler_is_open(self.backend.client)
486-
dask_utils.check_scheduler_status(self.backend.client)
487-
488-
# try to get the task from the scheduler
489-
def _get_pipeline_statuses(
490-
dask_scheduler: distributed.Scheduler,
491-
) -> dict[dask.typing.Key, DaskSchedulerTaskState | None]:
492-
statuses: dict[dask.typing.Key, DaskSchedulerTaskState | None] = (
493-
dask_scheduler.get_task_status(keys=job_ids)
494-
)
495-
return statuses
496-
497-
task_statuses: dict[dask.typing.Key, DaskSchedulerTaskState | None] = (
498-
await self.backend.client.run_on_scheduler(_get_pipeline_statuses)
499-
)
500-
assert isinstance(task_statuses, dict) # nosec
501-
502-
_logger.debug("found dask task statuses: %s", f"{task_statuses=}")
503-
504-
running_states: list[DaskClientTaskState] = []
505-
for job_id in job_ids:
506-
dask_status = cast(
507-
DaskSchedulerTaskState | None, task_statuses.get(job_id, "lost")
508-
)
509-
if dask_status == "erred":
510-
try:
511-
# find out if this was a cancellation
512-
var = distributed.Variable(job_id, client=self.backend.client)
513-
future: distributed.Future = await var.get(
514-
timeout=_DASK_DEFAULT_TIMEOUT_S
515-
)
516-
exception = await future.exception(timeout=_DASK_DEFAULT_TIMEOUT_S)
517-
assert isinstance(exception, Exception) # nosec
518-
519-
if isinstance(exception, TaskCancelledError):
520-
running_states.append(DaskClientTaskState.ABORTED)
521-
else:
522-
assert exception # nosec
523-
_logger.warning(
524-
"Task %s completed in error:\n%s\nTrace:\n%s",
525-
job_id,
526-
exception,
527-
"".join(traceback.format_exception(exception)),
528-
)
529-
running_states.append(DaskClientTaskState.ERRED)
530-
except TimeoutError:
531-
_logger.warning(
532-
"Task %s could not be retrieved from dask-scheduler, it is lost\n"
533-
"TIP:If the task was unpublished this can happen, or if the dask-scheduler was restarted.",
534-
job_id,
535-
)
536-
running_states.append(DaskClientTaskState.LOST)
537-
538-
elif dask_status is None:
539-
running_states.append(DaskClientTaskState.LOST)
540-
else:
541-
running_states.append(
542-
_DASK_TASK_STATUS_DASK_CLIENT_TASK_STATE_MAP.get(
543-
dask_status, DaskClientTaskState.LOST
544-
)
545-
)
546-
547-
return running_states
548-
549481
async def abort_computation_task(self, job_id: str) -> None:
550482
# Dask future may be cancelled, but only a future that was not already taken by
551483
# a sidecar can be cancelled that way.

services/director-v2/tests/unit/test_modules_dask_client.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from models_library.docker import to_simcore_runtime_docker_label_key
4444
from models_library.projects import ProjectID
4545
from models_library.projects_nodes_io import NodeID
46+
from models_library.projects_state import RunningState
4647
from models_library.resource_tracker import HardwareInfo
4748
from models_library.services_types import ServiceRunID
4849
from models_library.users import UserID
@@ -60,7 +61,6 @@
6061
)
6162
from simcore_service_director_v2.models.comp_runs import RunMetadataDict
6263
from simcore_service_director_v2.models.comp_tasks import Image
63-
from simcore_service_director_v2.models.dask_subsystem import DaskClientTaskState
6464
from simcore_service_director_v2.modules.dask_client import DaskClient, TaskHandlers
6565
from tenacity.asyncio import AsyncRetrying
6666
from tenacity.retry import retry_if_exception_type
@@ -90,7 +90,7 @@ async def _assert_wait_for_cb_call(mocked_fct, timeout: int | None = None):
9090
async def _assert_wait_for_task_status(
9191
job_id: str,
9292
dask_client: DaskClient,
93-
expected_status: DaskClientTaskState,
93+
expected_status: RunningState,
9494
timeout: int | None = None, # noqa: ASYNC109
9595
):
9696
async for attempt in AsyncRetrying(
@@ -105,11 +105,11 @@ async def _assert_wait_for_task_status(
105105
f"Attempt={attempt.retry_state.attempt_number}"
106106
)
107107
got = (await dask_client.get_tasks_status([job_id]))[0]
108-
assert isinstance(got, DaskClientTaskState)
108+
assert isinstance(got, RunningState)
109109
print(f"{got=} vs {expected_status=}")
110-
if got is DaskClientTaskState.ERRED and expected_status not in [
111-
DaskClientTaskState.ERRED,
112-
DaskClientTaskState.LOST,
110+
if got is RunningState.FAILED and expected_status not in [
111+
RunningState.FAILED,
112+
RunningState.UNKNOWN,
113113
]:
114114
try:
115115
# we can fail fast here
@@ -516,7 +516,7 @@ def fake_sidecar_fct(
516516
await _assert_wait_for_task_status(
517517
published_computation_task.job_id,
518518
dask_client,
519-
expected_status=DaskClientTaskState.PENDING_OR_STARTED,
519+
expected_status=RunningState.STARTED,
520520
)
521521

522522
# using the event we let the remote fct continue
@@ -530,7 +530,7 @@ def fake_sidecar_fct(
530530
await _assert_wait_for_task_status(
531531
published_computation_task.job_id,
532532
dask_client,
533-
expected_status=DaskClientTaskState.SUCCESS,
533+
expected_status=RunningState.SUCCESS,
534534
)
535535

536536
# check the results
@@ -544,7 +544,7 @@ def fake_sidecar_fct(
544544
await _assert_wait_for_task_status(
545545
published_computation_task.job_id,
546546
dask_client,
547-
expected_status=DaskClientTaskState.LOST,
547+
expected_status=RunningState.UNKNOWN,
548548
timeout=60,
549549
)
550550

@@ -609,7 +609,7 @@ def fake_sidecar_fct(
609609
await _assert_wait_for_task_status(
610610
published_computation_task[0].job_id,
611611
dask_client,
612-
expected_status=DaskClientTaskState.SUCCESS,
612+
expected_status=RunningState.SUCCESS,
613613
)
614614
assert published_computation_task[0].node_id in image_params.fake_tasks
615615
# creating a new future shows that it is not done????
@@ -704,7 +704,7 @@ def fake_remote_fct(
704704
await _assert_wait_for_task_status(
705705
published_computation_task[0].job_id,
706706
dask_client,
707-
DaskClientTaskState.PENDING_OR_STARTED,
707+
RunningState.STARTED,
708708
)
709709

710710
# we wait to be sure the remote fct is started
@@ -721,7 +721,7 @@ def fake_remote_fct(
721721

722722
await _assert_wait_for_cb_call(mocked_user_completed_cb)
723723
await _assert_wait_for_task_status(
724-
published_computation_task[0].job_id, dask_client, DaskClientTaskState.ABORTED
724+
published_computation_task[0].job_id, dask_client, RunningState.ABORTED
725725
)
726726

727727
# getting the results should throw the cancellation error
@@ -732,9 +732,12 @@ def fake_remote_fct(
732732
await dask_client.release_task_result(published_computation_task[0].job_id)
733733
# NOTE: this change of status takes a very long time to happen and is not relied upon so we skip it since it
734734
# makes the test fail a lot for no gain (it's kept here in case it ever becomes an issue)
735-
# await _assert_wait_for_task_status(
736-
# job_id, dask_client, RunningState.UNKNOWN, timeout=120
737-
# )
735+
await _assert_wait_for_task_status(
736+
published_computation_task[0].job_id,
737+
dask_client,
738+
RunningState.UNKNOWN,
739+
timeout=120,
740+
)
738741

739742

740743
async def test_failed_task_returns_exceptions(
@@ -784,7 +787,7 @@ def fake_failing_sidecar_fct(
784787
await _assert_wait_for_task_status(
785788
published_computation_task[0].job_id,
786789
dask_client,
787-
expected_status=DaskClientTaskState.ERRED,
790+
expected_status=RunningState.FAILED,
788791
)
789792
with pytest.raises(
790793
ValueError,
@@ -1047,7 +1050,7 @@ def fake_remote_fct(
10471050
await _assert_wait_for_task_status(
10481051
published_computation_task[0].job_id,
10491052
dask_client,
1050-
DaskClientTaskState.PENDING_OR_STARTED,
1053+
RunningState.STARTED,
10511054
)
10521055

10531056
# let the remote fct run through now
@@ -1057,7 +1060,7 @@ def fake_remote_fct(
10571060
await _assert_wait_for_task_status(
10581061
published_computation_task[0].job_id,
10591062
dask_client,
1060-
DaskClientTaskState.ERRED if fail_remote_fct else DaskClientTaskState.SUCCESS,
1063+
RunningState.FAILED if fail_remote_fct else RunningState.SUCCESS,
10611064
)
10621065
# release the task results
10631066
await dask_client.release_task_result(published_computation_task[0].job_id)
@@ -1069,7 +1072,7 @@ def fake_remote_fct(
10691072
await _assert_wait_for_task_status(
10701073
published_computation_task[0].job_id,
10711074
dask_client,
1072-
DaskClientTaskState.LOST,
1075+
RunningState.UNKNOWN,
10731076
timeout=60,
10741077
)
10751078

@@ -1209,7 +1212,7 @@ def fake_sidecar_fct(
12091212
await _assert_wait_for_task_status(
12101213
published_computation_task[0].job_id,
12111214
dask_client,
1212-
expected_status=DaskClientTaskState.PENDING_OR_STARTED,
1215+
expected_status=RunningState.STARTED,
12131216
)
12141217

12151218
# check we have one worker using the resources
@@ -1240,7 +1243,7 @@ def fake_sidecar_fct(
12401243
await _assert_wait_for_task_status(
12411244
published_computation_task[0].job_id,
12421245
dask_client,
1243-
expected_status=DaskClientTaskState.SUCCESS,
1246+
expected_status=RunningState.SUCCESS,
12441247
)
12451248

12461249
# check the resources are released

0 commit comments

Comments
 (0)