|
15 | 15 | import datetime |
16 | 16 | import logging |
17 | 17 | from abc import ABC, abstractmethod |
| 18 | +from asyncio import tasks |
18 | 19 | from collections.abc import Callable |
19 | 20 | from dataclasses import dataclass |
20 | 21 | from typing import Final |
|
76 | 77 |
|
77 | 78 | _Previous = CompTaskAtDB |
78 | 79 | _Current = CompTaskAtDB |
79 | | -_MAX_WAITING_FOR_CLUSTER_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta( |
80 | | - minutes=10 |
81 | | -) |
| 80 | + |
82 | 81 | _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta( |
83 | 82 | seconds=30 |
84 | 83 | ) |
@@ -667,7 +666,7 @@ async def apply( |
667 | 666 | ) |
668 | 667 | # 4. timeout if waiting for cluster has been there for more than X minutes |
669 | 668 | comp_tasks = await self._timeout_if_waiting_for_cluster_too_long( |
670 | | - user_id, project_id, comp_run.run_id, comp_tasks |
| 669 | + user_id, project_id, comp_run, comp_tasks |
671 | 670 | ) |
672 | 671 | # 5. send a heartbeat |
673 | 672 | await self._send_running_tasks_heartbeat( |
@@ -902,31 +901,34 @@ async def _timeout_if_waiting_for_cluster_too_long( |
902 | 901 | self, |
903 | 902 | user_id: UserID, |
904 | 903 | project_id: ProjectID, |
905 | | - run_id: PositiveInt, |
| 904 | + comp_run: CompRunsAtDB, |
906 | 905 | comp_tasks: dict[NodeIDStr, CompTaskAtDB], |
907 | 906 | ) -> dict[NodeIDStr, CompTaskAtDB]: |
908 | | - if all( |
909 | | - c.state is RunningState.WAITING_FOR_CLUSTER for c in comp_tasks.values() |
910 | | - ): |
| 907 | + if comp_run.result is RunningState.WAITING_FOR_CLUSTER: |
| 908 | + tasks_waiting_for_cluster = [ |
| 909 | + t |
| 910 | + for t in comp_tasks.values() |
| 911 | + if t.state is RunningState.WAITING_FOR_CLUSTER |
| 912 | + ] |
911 | 913 | # get latest modified task |
912 | 914 | latest_modified_of_all_tasks = max( |
913 | | - comp_tasks.values(), key=lambda task: task.modified |
| 915 | + tasks_waiting_for_cluster, key=lambda task: task.modified |
914 | 916 | ).modified |
915 | 917 |
|
916 | 918 | if ( |
917 | 919 | arrow.utcnow().datetime - latest_modified_of_all_tasks |
918 | | - ) > _MAX_WAITING_FOR_CLUSTER_TIMEOUT: |
| 920 | + ) > self.settings.COMPUTATIONAL_BACKEND_MAX_WAITING_FOR_CLUSTER_TIMEOUT: |
919 | 921 | await CompTasksRepository.instance( |
920 | 922 | self.db_engine |
921 | 923 | ).update_project_tasks_state( |
922 | 924 | project_id, |
923 | | - run_id, |
924 | | - [NodeID(idstr) for idstr in comp_tasks], |
| 925 | + comp_run.run_id, |
| 926 | + [task.node_id for task in tasks_waiting_for_cluster], |
925 | 927 | RunningState.FAILED, |
926 | 928 | optional_progress=1.0, |
927 | 929 | optional_stopped=arrow.utcnow().datetime, |
928 | 930 | ) |
929 | | - for task in comp_tasks.values(): |
| 931 | + for task in tasks_waiting_for_cluster: |
930 | 932 | task.state = RunningState.FAILED |
931 | 933 | msg = "Timed-out waiting for computational cluster! Please try again and/or contact Osparc support." |
932 | 934 | _logger.error(msg) |
|
0 commit comments