Skip to content

Commit 5c72a18

Browse files
authored
Merge branch 'master' into enh/ux-create-workspace
2 parents 6e9ef18 + e32787b commit 5c72a18

File tree

9 files changed

+240
-27
lines changed

9 files changed

+240
-27
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""add cancellation mark
2+
3+
Revision ID: 8bfe65a5e294
4+
Revises: 5ad02358751a
5+
Create Date: 2024-11-08 14:40:59.266181+00:00
6+
7+
"""
8+
import sqlalchemy as sa
9+
from alembic import op
10+
11+
# revision identifiers, used by Alembic.
12+
revision = "8bfe65a5e294"
13+
down_revision = "5ad02358751a"
14+
branch_labels = None
15+
depends_on = None
16+
17+
18+
def upgrade():
19+
# ### commands auto generated by Alembic - please adjust! ###
20+
op.add_column(
21+
"comp_runs", sa.Column("cancelled", sa.DateTime(timezone=True), nullable=True)
22+
)
23+
# ### end Alembic commands ###
24+
25+
26+
def downgrade():
27+
# ### commands auto generated by Alembic - please adjust! ###
28+
op.drop_column("comp_runs", "cancelled")
29+
# ### end Alembic commands ###

packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@
9999
nullable=True,
100100
doc="When the run was finished",
101101
),
102+
sa.Column(
103+
"cancelled",
104+
sa.DateTime(timezone=True),
105+
nullable=True,
106+
doc="If filled, when cancellation was requested",
107+
),
102108
sa.Column("metadata", JSONB, nullable=True, doc="the run optional metadata"),
103109
sa.Column(
104110
"use_on_demand_clusters",

services/director-v2/src/simcore_service_director_v2/models/comp_runs.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class CompRunsAtDB(BaseModel):
4646
modified: datetime.datetime
4747
started: datetime.datetime | None
4848
ended: datetime.datetime | None
49+
cancelled: datetime.datetime | None
4950
metadata: RunMetadataDict = RunMetadataDict()
5051
use_on_demand_clusters: bool
5152

@@ -72,7 +73,7 @@ def convert_null_to_default_cluster_id(cls, v):
7273
@classmethod
7374
def ensure_utc(cls, v: datetime.datetime | None) -> datetime.datetime | None:
7475
if v is not None and v.tzinfo is None:
75-
v = v.replace(tzinfo=datetime.timezone.utc)
76+
v = v.replace(tzinfo=datetime.UTC)
7677
return v
7778

7879
@validator("metadata", pre=True)
@@ -93,9 +94,22 @@ class Config:
9394
"user_id": 132,
9495
"cluster_id": 0,
9596
"iteration": 42,
97+
"result": "UNKNOWN",
98+
"created": "2021-03-01 13:07:34.19161",
99+
"modified": "2021-03-01 13:07:34.19161",
100+
"cancelled": None,
101+
"use_on_demand_clusters": False,
102+
},
103+
{
104+
"run_id": 432,
105+
"project_uuid": "65fee9d2-e030-452c-a29c-45d288577ca5",
106+
"user_id": 132,
107+
"cluster_id": None, # this default to DEFAULT_CLUSTER_ID
108+
"iteration": 42,
96109
"result": "NOT_STARTED",
97110
"created": "2021-03-01 13:07:34.19161",
98111
"modified": "2021-03-01 13:07:34.19161",
112+
"cancelled": None,
99113
"use_on_demand_clusters": False,
100114
},
101115
{
@@ -109,6 +123,7 @@ class Config:
109123
"modified": "2021-03-01 13:07:34.19161",
110124
"started": "2021-03-01 8:07:34.19161",
111125
"ended": "2021-03-01 13:07:34.10",
126+
"cancelled": None,
112127
"metadata": {
113128
"node_id_names_map": {},
114129
"product_name": "osparc",
@@ -118,5 +133,20 @@ class Config:
118133
},
119134
"use_on_demand_clusters": False,
120135
},
136+
{
137+
"run_id": 43243,
138+
"project_uuid": "65fee9d2-e030-452c-a29c-45d288577ca5",
139+
"user_id": 132,
140+
"cluster_id": 123,
141+
"iteration": 12,
142+
"result": "SUCCESS",
143+
"created": "2021-03-01 13:07:34.19161",
144+
"modified": "2021-03-01 13:07:34.19161",
145+
"started": "2021-03-01 8:07:34.19161",
146+
"ended": "2021-03-01 13:07:34.10",
147+
"cancelled": None,
148+
"metadata": None,
149+
"use_on_demand_clusters": False,
150+
},
121151
]
122152
}

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
)
4848
from ...core.settings import ComputationalBackendSettings
4949
from ...models.comp_pipelines import CompPipelineAtDB
50-
from ...models.comp_runs import CompRunsAtDB, RunMetadataDict
50+
from ...models.comp_runs import RunMetadataDict
5151
from ...models.comp_tasks import CompTaskAtDB
5252
from ...utils.comp_scheduler import (
5353
COMPLETED_STATES,
@@ -131,7 +131,7 @@ async def _triage_changed_tasks(
131131
class ScheduledPipelineParams:
132132
cluster_id: ClusterID
133133
run_metadata: RunMetadataDict
134-
mark_for_cancellation: bool = False
134+
mark_for_cancellation: datetime.datetime | None
135135
use_on_demand_clusters: bool
136136

137137

@@ -169,7 +169,7 @@ async def run_new_pipeline(
169169
return
170170

171171
runs_repo = CompRunsRepository.instance(self.db_engine)
172-
new_run: CompRunsAtDB = await runs_repo.create(
172+
new_run = await runs_repo.create(
173173
user_id=user_id,
174174
project_id=project_id,
175175
cluster_id=cluster_id,
@@ -182,6 +182,7 @@ async def run_new_pipeline(
182182
cluster_id=cluster_id,
183183
run_metadata=new_run.metadata,
184184
use_on_demand_clusters=use_on_demand_clusters,
185+
mark_for_cancellation=None,
185186
)
186187
await publish_project_log(
187188
self.rabbitmq_client,
@@ -212,11 +213,18 @@ async def stop_pipeline(
212213
selected_iteration = iteration
213214

214215
# mark the scheduled pipeline for stopping
215-
self.scheduled_pipelines[
216-
(user_id, project_id, selected_iteration)
217-
].mark_for_cancellation = True
218-
# ensure the scheduler starts right away
219-
self._wake_up_scheduler_now()
216+
updated_comp_run = await CompRunsRepository.instance(
217+
self.db_engine
218+
).mark_for_cancellation(
219+
user_id=user_id, project_id=project_id, iteration=selected_iteration
220+
)
221+
if updated_comp_run:
222+
assert updated_comp_run.cancelled is not None # nosec
223+
self.scheduled_pipelines[
224+
(user_id, project_id, selected_iteration)
225+
].mark_for_cancellation = updated_comp_run.cancelled
226+
# ensure the scheduler starts right away
227+
self._wake_up_scheduler_now()
220228

221229
async def schedule_all_pipelines(self) -> None:
222230
self.wake_up_event.clear()
@@ -343,7 +351,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
343351
if task.last_heartbeat is None:
344352
assert task.start # nosec
345353
return bool(
346-
(utc_now - task.start.replace(tzinfo=datetime.timezone.utc))
354+
(utc_now - task.start.replace(tzinfo=datetime.UTC))
347355
> self.service_runtime_heartbeat_interval
348356
)
349357
return bool(

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler:
4747
r.cluster_id if r.cluster_id is not None else DEFAULT_CLUSTER_ID
4848
),
4949
run_metadata=r.metadata,
50-
mark_for_cancellation=False,
50+
mark_for_cancellation=r.cancelled,
5151
use_on_demand_clusters=r.use_on_demand_clusters,
5252
)
5353
for r in runs

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from collections import deque
44
from typing import Any
55

6+
import arrow
67
import sqlalchemy as sa
78
from aiopg.sa.result import RowProxy
89
from models_library.clusters import DEFAULT_CLUSTER_ID, ClusterID
@@ -146,10 +147,20 @@ async def set_run_result(
146147
) -> CompRunsAtDB | None:
147148
values: dict[str, Any] = {"result": RUNNING_STATE_TO_DB[result_state]}
148149
if final_state:
149-
values.update({"ended": datetime.datetime.now(tz=datetime.UTC)})
150+
values.update({"ended": arrow.utcnow().datetime})
150151
return await self.update(
151152
user_id,
152153
project_id,
153154
iteration,
154155
**values,
155156
)
157+
158+
async def mark_for_cancellation(
159+
self, *, user_id: UserID, project_id: ProjectID, iteration: PositiveInt
160+
) -> CompRunsAtDB | None:
161+
return await self.update(
162+
user_id,
163+
project_id,
164+
iteration,
165+
cancelled=arrow.utcnow().datetime,
166+
)

services/director-v2/tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ async def initialized_app(mock_env: EnvVarsDict) -> AsyncIterable[FastAPI]:
218218
@pytest.fixture()
219219
async def async_client(initialized_app: FastAPI) -> AsyncIterable[httpx.AsyncClient]:
220220
async with httpx.AsyncClient(
221-
app=initialized_app,
221+
transport=httpx.ASGITransport(app=initialized_app),
222222
base_url="http://director-v2.testserver.io",
223223
headers={"Content-Type": "application/json"},
224224
) as client:

services/director-v2/tests/unit/with_dbs/conftest.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Any, cast
1212
from uuid import uuid4
1313

14+
import arrow
1415
import pytest
1516
import sqlalchemy as sa
1617
from _helpers import PublishedProject, RunningProject
@@ -318,6 +319,7 @@ async def running_project(
318319
) -> RunningProject:
319320
user = registered_user()
320321
created_project = await project(user, workbench=fake_workbench_without_outputs)
322+
now_time = arrow.utcnow().datetime
321323
return RunningProject(
322324
project=created_project,
323325
pipeline=pipeline(
@@ -329,9 +331,50 @@ async def running_project(
329331
project=created_project,
330332
state=StateType.RUNNING,
331333
progress=0.0,
332-
start=datetime.datetime.now(tz=datetime.UTC),
334+
start=now_time,
335+
),
336+
runs=runs(
337+
user=user,
338+
project=created_project,
339+
started=now_time,
340+
result=StateType.RUNNING,
341+
),
342+
)
343+
344+
345+
@pytest.fixture
346+
async def running_project_mark_for_cancellation(
347+
registered_user: Callable[..., dict[str, Any]],
348+
project: Callable[..., Awaitable[ProjectAtDB]],
349+
pipeline: Callable[..., CompPipelineAtDB],
350+
tasks: Callable[..., list[CompTaskAtDB]],
351+
runs: Callable[..., CompRunsAtDB],
352+
fake_workbench_without_outputs: dict[str, Any],
353+
fake_workbench_adjacency: dict[str, Any],
354+
) -> RunningProject:
355+
user = registered_user()
356+
created_project = await project(user, workbench=fake_workbench_without_outputs)
357+
now_time = arrow.utcnow().datetime
358+
return RunningProject(
359+
project=created_project,
360+
pipeline=pipeline(
361+
project_id=f"{created_project.uuid}",
362+
dag_adjacency_list=fake_workbench_adjacency,
363+
),
364+
tasks=tasks(
365+
user=user,
366+
project=created_project,
367+
state=StateType.RUNNING,
368+
progress=0.0,
369+
start=now_time,
370+
),
371+
runs=runs(
372+
user=user,
373+
project=created_project,
374+
result=StateType.RUNNING,
375+
started=now_time,
376+
cancelled=now_time + datetime.timedelta(seconds=5),
333377
),
334-
runs=runs(user=user, project=created_project, result=StateType.RUNNING),
335378
)
336379

337380

0 commit comments

Comments
 (0)