|
8 | 8 |
|
9 | 9 | """ |
10 | 10 |
|
| 11 | +import asyncio |
11 | 12 | import logging |
12 | 13 | import traceback |
13 | | -from collections.abc import Callable |
| 14 | +from collections.abc import Callable, Iterable |
14 | 15 | from copy import deepcopy |
15 | 16 | from dataclasses import dataclass |
16 | 17 | from http.client import HTTPException |
|
38 | 39 | TaskOwner, |
39 | 40 | ) |
40 | 41 | from dask_task_models_library.container_tasks.utils import generate_dask_job_id |
41 | | -from dask_task_models_library.models import DaskJobID, DaskResources |
| 42 | +from dask_task_models_library.models import ( |
| 43 | + TASK_LIFE_CYCLE_EVENT, |
| 44 | + DaskJobID, |
| 45 | + DaskResources, |
| 46 | + TaskLifeCycleState, |
| 47 | +) |
42 | 48 | from dask_task_models_library.resource_constraints import ( |
43 | 49 | create_ec2_resource_constraint_key, |
44 | 50 | ) |
|
48 | 54 | from models_library.clusters import ClusterAuthentication, ClusterTypeInModel |
49 | 55 | from models_library.projects import ProjectID |
50 | 56 | from models_library.projects_nodes_io import NodeID |
| 57 | +from models_library.projects_state import RunningState |
51 | 58 | from models_library.resource_tracker import HardwareInfo |
52 | 59 | from models_library.services import ServiceRunID |
53 | 60 | from models_library.users import UserID |
|
77 | 84 | from ..utils.dask_client_utils import ( |
78 | 85 | DaskSubSystem, |
79 | 86 | TaskHandlers, |
| 87 | + UnixTimestamp, |
80 | 88 | connect_to_dask_scheduler, |
81 | 89 | ) |
82 | 90 | from .db import get_db_engine |
@@ -419,6 +427,57 @@ async def send_computation_tasks( |
419 | 427 |
|
420 | 428 | return list_of_node_id_to_job_id |
421 | 429 |
|
| 430 | + async def get_tasks_status2(self, job_ids: Iterable[str]) -> list[RunningState]: |
| 431 | + dask_utils.check_scheduler_is_still_the_same( |
| 432 | + self.backend.scheduler_id, self.backend.client |
| 433 | + ) |
| 434 | + dask_utils.check_communication_with_scheduler_is_open(self.backend.client) |
| 435 | + dask_utils.check_scheduler_status(self.backend.client) |
| 436 | + |
| 437 | + async def _get_job_id_status(job_id: str) -> RunningState: |
| 438 | + # TODO: maybe we should define an event just for that, instead of multiple calls |
| 439 | + dask_events: tuple[tuple[UnixTimestamp, str], ...] = ( |
| 440 | + await self.backend.client.get_events( |
| 441 | + TASK_LIFE_CYCLE_EVENT.format(key=job_id) |
| 442 | + ) |
| 443 | + ) |
| 444 | + if not dask_events: |
| 445 | + return RunningState.UNKNOWN |
| 446 | + # we are interested in the last event |
| 447 | + parsed_event = TaskLifeCycleState.model_validate(dask_events[-1][1]) |
| 448 | + |
| 449 | + if parsed_event.state == RunningState.FAILED: |
| 450 | + try: |
| 451 | + # find out if this was a cancellation |
| 452 | + var = distributed.Variable(job_id, client=self.backend.client) |
| 453 | + future: distributed.Future = await var.get( |
| 454 | + timeout=_DASK_DEFAULT_TIMEOUT_S |
| 455 | + ) |
| 456 | + exception = await future.exception(timeout=_DASK_DEFAULT_TIMEOUT_S) |
| 457 | + assert isinstance(exception, Exception) # nosec |
| 458 | + |
| 459 | + if isinstance(exception, TaskCancelledError): |
| 460 | + return RunningState.ABORTED |
| 461 | + assert exception # nosec |
| 462 | + _logger.warning( |
| 463 | + "Task %s completed in error:\n%s\nTrace:\n%s", |
| 464 | + job_id, |
| 465 | + exception, |
| 466 | + "".join(traceback.format_exception(exception)), |
| 467 | + ) |
| 468 | + return RunningState.FAILED |
| 469 | + except TimeoutError: |
| 470 | + _logger.warning( |
| 471 | + "Task %s could not be retrieved from dask-scheduler, it is lost\n" |
| 472 | + "TIP:If the task was unpublished this can happen, or if the dask-scheduler was restarted.", |
| 473 | + job_id, |
| 474 | + ) |
| 475 | + return RunningState.UNKNOWN |
| 476 | + |
| 477 | + return parsed_event.state |
| 478 | + |
| 479 | + return await asyncio.gather(*(_get_job_id_status(job_id) for job_id in job_ids)) |
| 480 | + |
422 | 481 | async def get_tasks_status(self, job_ids: list[str]) -> list[DaskClientTaskState]: |
423 | 482 | dask_utils.check_scheduler_is_still_the_same( |
424 | 483 | self.backend.scheduler_id, self.backend.client |
|
0 commit comments