Skip to content

Commit 9e9ea6d

Browse files
committed
cancellation is now a datetime
1 parent 7d5e19d commit 9e9ea6d

File tree

3 files changed

+30
-11
lines changed

3 files changed

+30
-11
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
)
4848
from ...core.settings import ComputationalBackendSettings
4949
from ...models.comp_pipelines import CompPipelineAtDB
50-
from ...models.comp_runs import CompRunsAtDB, RunMetadataDict
50+
from ...models.comp_runs import RunMetadataDict
5151
from ...models.comp_tasks import CompTaskAtDB
5252
from ...utils.comp_scheduler import (
5353
COMPLETED_STATES,
@@ -131,7 +131,7 @@ async def _triage_changed_tasks(
131131
class ScheduledPipelineParams:
132132
cluster_id: ClusterID
133133
run_metadata: RunMetadataDict
134-
mark_for_cancellation: bool = False
134+
mark_for_cancellation: datetime.datetime | None
135135
use_on_demand_clusters: bool
136136

137137

@@ -169,7 +169,7 @@ async def run_new_pipeline(
169169
return
170170

171171
runs_repo = CompRunsRepository.instance(self.db_engine)
172-
new_run: CompRunsAtDB = await runs_repo.create(
172+
new_run = await runs_repo.create(
173173
user_id=user_id,
174174
project_id=project_id,
175175
cluster_id=cluster_id,
@@ -182,6 +182,7 @@ async def run_new_pipeline(
182182
cluster_id=cluster_id,
183183
run_metadata=new_run.metadata,
184184
use_on_demand_clusters=use_on_demand_clusters,
185+
mark_for_cancellation=None,
185186
)
186187
await publish_project_log(
187188
self.rabbitmq_client,
@@ -212,11 +213,18 @@ async def stop_pipeline(
212213
selected_iteration = iteration
213214

214215
# mark the scheduled pipeline for stopping
215-
self.scheduled_pipelines[
216-
(user_id, project_id, selected_iteration)
217-
].mark_for_cancellation = True
218-
# ensure the scheduler starts right away
219-
self._wake_up_scheduler_now()
216+
updated_comp_run = await CompRunsRepository.instance(
217+
self.db_engine
218+
).mark_for_cancellation(
219+
user_id=user_id, project_id=project_id, iteration=selected_iteration
220+
)
221+
if updated_comp_run:
222+
assert updated_comp_run.cancelled is not None # nosec
223+
self.scheduled_pipelines[
224+
(user_id, project_id, selected_iteration)
225+
].mark_for_cancellation = updated_comp_run.cancelled
226+
# ensure the scheduler starts right away
227+
self._wake_up_scheduler_now()
220228

221229
async def schedule_all_pipelines(self) -> None:
222230
self.wake_up_event.clear()
@@ -343,7 +351,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
343351
if task.last_heartbeat is None:
344352
assert task.start # nosec
345353
return bool(
346-
(utc_now - task.start.replace(tzinfo=datetime.timezone.utc))
354+
(utc_now - task.start.replace(tzinfo=datetime.UTC))
347355
> self.service_runtime_heartbeat_interval
348356
)
349357
return bool(

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler:
4747
r.cluster_id if r.cluster_id is not None else DEFAULT_CLUSTER_ID
4848
),
4949
run_metadata=r.metadata,
50-
mark_for_cancellation=False,
50+
mark_for_cancellation=bool(r.cancelled is not None),
5151
use_on_demand_clusters=r.use_on_demand_clusters,
5252
)
5353
for r in runs

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from collections import deque
44
from typing import Any
55

6+
import arrow
67
import sqlalchemy as sa
78
from aiopg.sa.result import RowProxy
89
from models_library.clusters import DEFAULT_CLUSTER_ID, ClusterID
@@ -146,10 +147,20 @@ async def set_run_result(
146147
) -> CompRunsAtDB | None:
147148
values: dict[str, Any] = {"result": RUNNING_STATE_TO_DB[result_state]}
148149
if final_state:
149-
values.update({"ended": datetime.datetime.now(tz=datetime.UTC)})
150+
values.update({"ended": arrow.utcnow().datetime})
150151
return await self.update(
151152
user_id,
152153
project_id,
153154
iteration,
154155
**values,
155156
)
157+
158+
async def mark_for_cancellation(
159+
self, *, user_id: UserID, project_id: ProjectID, iteration: PositiveInt
160+
) -> CompRunsAtDB | None:
161+
return await self.update(
162+
user_id,
163+
project_id,
164+
iteration,
165+
cancelled=arrow.utcnow().datetime,
166+
)

0 commit comments

Comments
 (0)