Skip to content

Commit 823c7e6

Browse files
authored
🐛🎨Computational backend stability: improvements step 2 (#8341)
1 parent 9ebb830 commit 823c7e6

File tree

4 files changed

+127
-86
lines changed

4 files changed

+127
-86
lines changed
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
1+
from dataclasses import dataclass
12
from typing import Literal
23

34
from models_library.projects import ProjectID
45
from models_library.rabbitmq_messages import RabbitMessageBase
56
from models_library.users import UserID
67

78
from ...models.comp_runs import Iteration
9+
from ...models.comp_tasks import CompTaskAtDB
810

911

1012
class SchedulePipelineRabbitMessage(RabbitMessageBase):
11-
channel_name: Literal[
13+
channel_name: Literal["simcore.services.director-v2.scheduling"] = (
1214
"simcore.services.director-v2.scheduling"
13-
] = "simcore.services.director-v2.scheduling"
15+
)
1416
user_id: UserID
1517
project_id: ProjectID
1618
iteration: Iteration
1719

1820
def routing_key(self) -> str | None: # pylint: disable=no-self-use # abstract
1921
return None
22+
23+
24+
@dataclass(frozen=True, slots=True)
25+
class TaskStateTracker:
26+
previous: CompTaskAtDB
27+
current: CompTaskAtDB

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
from ..db.repositories.comp_pipelines import CompPipelinesRepository
6464
from ..db.repositories.comp_runs import CompRunsRepository
6565
from ..db.repositories.comp_tasks import CompTasksRepository
66+
from ._models import TaskStateTracker
6667
from ._publisher import request_pipeline_scheduling
6768
from ._utils import (
6869
COMPLETED_STATES,
@@ -76,9 +77,6 @@
7677
_logger = logging.getLogger(__name__)
7778

7879

79-
_Previous = CompTaskAtDB
80-
_Current = CompTaskAtDB
81-
8280
_MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta(
8381
seconds=30
8482
)
@@ -117,47 +115,49 @@ async def _async_cb() -> None:
117115
@dataclass(frozen=True, slots=True)
118116
class SortedTasks:
119117
started: list[CompTaskAtDB]
120-
completed: list[CompTaskAtDB]
121-
waiting: list[CompTaskAtDB]
122-
potentially_lost: list[CompTaskAtDB]
118+
completed: list[TaskStateTracker]
119+
waiting: list[TaskStateTracker]
120+
potentially_lost: list[TaskStateTracker]
123121

124122

125123
async def _triage_changed_tasks(
126-
changed_tasks: list[tuple[_Previous, _Current]],
124+
changed_tasks: list[TaskStateTracker],
127125
) -> SortedTasks:
128126
started_tasks = [
129-
current
130-
for previous, current in changed_tasks
131-
if current.state in RUNNING_STATES
127+
tracker.current
128+
for tracker in changed_tasks
129+
if tracker.current.state in RUNNING_STATES
132130
or (
133-
previous.state in WAITING_FOR_START_STATES
134-
and current.state in COMPLETED_STATES
131+
tracker.previous.state in WAITING_FOR_START_STATES
132+
and tracker.current.state in COMPLETED_STATES
135133
)
136134
]
137135

138136
completed_tasks = [
139-
current for _, current in changed_tasks if current.state in COMPLETED_STATES
137+
tracker
138+
for tracker in changed_tasks
139+
if tracker.current.state in COMPLETED_STATES
140140
]
141141

142142
waiting_for_resources_tasks = [
143-
current
144-
for previous, current in changed_tasks
145-
if current.state in WAITING_FOR_START_STATES
143+
tracker
144+
for tracker in changed_tasks
145+
if tracker.current.state in WAITING_FOR_START_STATES
146146
]
147147

148148
lost_tasks = [
149-
current
150-
for previous, current in changed_tasks
151-
if (current.state is RunningState.UNKNOWN)
149+
tracker
150+
for tracker in changed_tasks
151+
if (tracker.current.state is RunningState.UNKNOWN)
152152
and (
153-
(arrow.utcnow().datetime - previous.modified)
153+
(arrow.utcnow().datetime - tracker.previous.modified)
154154
> _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS
155155
)
156156
]
157157
if lost_tasks:
158158
_logger.warning(
159159
"%s are currently in unknown state. TIP: If they are running in an external cluster and it is not yet ready, that might explain it. But inform @sanderegg nevertheless!",
160-
[t.node_id for t in lost_tasks],
160+
[t.current.node_id for t in lost_tasks],
161161
)
162162

163163
return SortedTasks(
@@ -321,6 +321,7 @@ async def _send_running_tasks_heartbeat(
321321
def _need_heartbeat(task: CompTaskAtDB) -> bool:
322322
if task.state not in RUNNING_STATES:
323323
return False
324+
324325
if task.last_heartbeat is None:
325326
assert task.start # nosec
326327
return bool(
@@ -362,14 +363,14 @@ async def _get_changed_tasks_from_backend(
362363
user_id: UserID,
363364
processing_tasks: list[CompTaskAtDB],
364365
comp_run: CompRunsAtDB,
365-
) -> tuple[list[tuple[_Previous, _Current]], list[CompTaskAtDB]]:
366+
) -> tuple[list[TaskStateTracker], list[CompTaskAtDB]]:
366367
tasks_backend_status = await self._get_tasks_status(
367368
user_id, processing_tasks, comp_run
368369
)
369370

370371
return (
371372
[
372-
(
373+
TaskStateTracker(
373374
task,
374375
task.model_copy(update={"state": backend_state}),
375376
)
@@ -502,16 +503,16 @@ async def _process_started_tasks(
502503
)
503504

504505
async def _process_waiting_tasks(
505-
self, tasks: list[CompTaskAtDB], run_id: PositiveInt
506+
self, tasks: list[TaskStateTracker], run_id: PositiveInt
506507
) -> None:
507508
comp_tasks_repo = CompTasksRepository(self.db_engine)
508509
await asyncio.gather(
509510
*(
510511
comp_tasks_repo.update_project_tasks_state(
511-
t.project_id,
512+
t.current.project_id,
512513
run_id,
513-
[t.node_id],
514-
t.state,
514+
[t.current.node_id],
515+
t.current.state,
515516
)
516517
for t in tasks
517518
)
@@ -602,7 +603,7 @@ async def _stop_tasks(
602603
async def _process_completed_tasks(
603604
self,
604605
user_id: UserID,
605-
tasks: list[CompTaskAtDB],
606+
tasks: list[TaskStateTracker],
606607
iteration: Iteration,
607608
comp_run: CompRunsAtDB,
608609
) -> None:

0 commit comments

Comments
 (0)