Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""add cancellation mark

Revision ID: 8bfe65a5e294
Revises: 5ad02358751a
Create Date: 2024-11-08 14:40:59.266181+00:00

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "8bfe65a5e294"
down_revision = "5ad02358751a"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"comp_runs", sa.Column("cancelled", sa.DateTime(timezone=True), nullable=True)
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("comp_runs", "cancelled")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
nullable=True,
doc="When the run was finished",
),
sa.Column(
"cancelled",
sa.DateTime(timezone=True),
nullable=True,
doc="If filled, when cancellation was requested",
),
sa.Column("metadata", JSONB, nullable=True, doc="the run optional metadata"),
sa.Column(
"use_on_demand_clusters",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class CompRunsAtDB(BaseModel):
modified: datetime.datetime
started: datetime.datetime | None
ended: datetime.datetime | None
cancelled: datetime.datetime | None
metadata: RunMetadataDict = RunMetadataDict()
use_on_demand_clusters: bool

Expand All @@ -72,7 +73,7 @@ def convert_null_to_default_cluster_id(cls, v):
@classmethod
def ensure_utc(cls, v: datetime.datetime | None) -> datetime.datetime | None:
if v is not None and v.tzinfo is None:
v = v.replace(tzinfo=datetime.timezone.utc)
v = v.replace(tzinfo=datetime.UTC)
return v

@validator("metadata", pre=True)
Expand All @@ -93,9 +94,22 @@ class Config:
"user_id": 132,
"cluster_id": 0,
"iteration": 42,
"result": "UNKNOWN",
"created": "2021-03-01 13:07:34.19161",
"modified": "2021-03-01 13:07:34.19161",
"cancelled": None,
"use_on_demand_clusters": False,
},
{
"run_id": 432,
"project_uuid": "65fee9d2-e030-452c-a29c-45d288577ca5",
"user_id": 132,
"cluster_id": None, # this default to DEFAULT_CLUSTER_ID
"iteration": 42,
"result": "NOT_STARTED",
"created": "2021-03-01 13:07:34.19161",
"modified": "2021-03-01 13:07:34.19161",
"cancelled": None,
"use_on_demand_clusters": False,
},
{
Expand All @@ -109,6 +123,7 @@ class Config:
"modified": "2021-03-01 13:07:34.19161",
"started": "2021-03-01 8:07:34.19161",
"ended": "2021-03-01 13:07:34.10",
"cancelled": None,
"metadata": {
"node_id_names_map": {},
"product_name": "osparc",
Expand All @@ -118,5 +133,20 @@ class Config:
},
"use_on_demand_clusters": False,
},
{
"run_id": 43243,
"project_uuid": "65fee9d2-e030-452c-a29c-45d288577ca5",
"user_id": 132,
"cluster_id": 123,
"iteration": 12,
"result": "SUCCESS",
"created": "2021-03-01 13:07:34.19161",
"modified": "2021-03-01 13:07:34.19161",
"started": "2021-03-01 8:07:34.19161",
"ended": "2021-03-01 13:07:34.10",
"cancelled": None,
"metadata": None,
"use_on_demand_clusters": False,
},
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)
from ...core.settings import ComputationalBackendSettings
from ...models.comp_pipelines import CompPipelineAtDB
from ...models.comp_runs import CompRunsAtDB, RunMetadataDict
from ...models.comp_runs import RunMetadataDict
from ...models.comp_tasks import CompTaskAtDB
from ...utils.comp_scheduler import (
COMPLETED_STATES,
Expand Down Expand Up @@ -131,7 +131,7 @@ async def _triage_changed_tasks(
class ScheduledPipelineParams:
cluster_id: ClusterID
run_metadata: RunMetadataDict
mark_for_cancellation: bool = False
mark_for_cancellation: datetime.datetime | None
use_on_demand_clusters: bool


Expand Down Expand Up @@ -169,7 +169,7 @@ async def run_new_pipeline(
return

runs_repo = CompRunsRepository.instance(self.db_engine)
new_run: CompRunsAtDB = await runs_repo.create(
new_run = await runs_repo.create(
user_id=user_id,
project_id=project_id,
cluster_id=cluster_id,
Expand All @@ -182,6 +182,7 @@ async def run_new_pipeline(
cluster_id=cluster_id,
run_metadata=new_run.metadata,
use_on_demand_clusters=use_on_demand_clusters,
mark_for_cancellation=None,
)
await publish_project_log(
self.rabbitmq_client,
Expand Down Expand Up @@ -212,11 +213,18 @@ async def stop_pipeline(
selected_iteration = iteration

# mark the scheduled pipeline for stopping
self.scheduled_pipelines[
(user_id, project_id, selected_iteration)
].mark_for_cancellation = True
# ensure the scheduler starts right away
self._wake_up_scheduler_now()
updated_comp_run = await CompRunsRepository.instance(
self.db_engine
).mark_for_cancellation(
user_id=user_id, project_id=project_id, iteration=selected_iteration
)
if updated_comp_run:
assert updated_comp_run.cancelled is not None # nosec
self.scheduled_pipelines[
(user_id, project_id, selected_iteration)
].mark_for_cancellation = updated_comp_run.cancelled
# ensure the scheduler starts right away
self._wake_up_scheduler_now()

async def schedule_all_pipelines(self) -> None:
self.wake_up_event.clear()
Expand Down Expand Up @@ -343,7 +351,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
if task.last_heartbeat is None:
assert task.start # nosec
return bool(
(utc_now - task.start.replace(tzinfo=datetime.timezone.utc))
(utc_now - task.start.replace(tzinfo=datetime.UTC))
> self.service_runtime_heartbeat_interval
)
return bool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler:
r.cluster_id if r.cluster_id is not None else DEFAULT_CLUSTER_ID
),
run_metadata=r.metadata,
mark_for_cancellation=False,
mark_for_cancellation=r.cancelled,
use_on_demand_clusters=r.use_on_demand_clusters,
)
for r in runs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import deque
from typing import Any

import arrow
import sqlalchemy as sa
from aiopg.sa.result import RowProxy
from models_library.clusters import DEFAULT_CLUSTER_ID, ClusterID
Expand Down Expand Up @@ -146,10 +147,20 @@ async def set_run_result(
) -> CompRunsAtDB | None:
values: dict[str, Any] = {"result": RUNNING_STATE_TO_DB[result_state]}
if final_state:
values.update({"ended": datetime.datetime.now(tz=datetime.UTC)})
values.update({"ended": arrow.utcnow().datetime})
return await self.update(
user_id,
project_id,
iteration,
**values,
)

async def mark_for_cancellation(
self, *, user_id: UserID, project_id: ProjectID, iteration: PositiveInt
) -> CompRunsAtDB | None:
return await self.update(
user_id,
project_id,
iteration,
cancelled=arrow.utcnow().datetime,
)
2 changes: 1 addition & 1 deletion services/director-v2/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async def initialized_app(mock_env: EnvVarsDict) -> AsyncIterable[FastAPI]:
@pytest.fixture()
async def async_client(initialized_app: FastAPI) -> AsyncIterable[httpx.AsyncClient]:
async with httpx.AsyncClient(
app=initialized_app,
transport=httpx.ASGITransport(app=initialized_app),
base_url="http://director-v2.testserver.io",
headers={"Content-Type": "application/json"},
) as client:
Expand Down
47 changes: 45 additions & 2 deletions services/director-v2/tests/unit/with_dbs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, cast
from uuid import uuid4

import arrow
import pytest
import sqlalchemy as sa
from _helpers import PublishedProject, RunningProject
Expand Down Expand Up @@ -318,6 +319,7 @@ async def running_project(
) -> RunningProject:
user = registered_user()
created_project = await project(user, workbench=fake_workbench_without_outputs)
now_time = arrow.utcnow().datetime
return RunningProject(
project=created_project,
pipeline=pipeline(
Expand All @@ -329,9 +331,50 @@ async def running_project(
project=created_project,
state=StateType.RUNNING,
progress=0.0,
start=datetime.datetime.now(tz=datetime.UTC),
start=now_time,
),
runs=runs(
user=user,
project=created_project,
started=now_time,
result=StateType.RUNNING,
),
)


@pytest.fixture
async def running_project_mark_for_cancellation(
registered_user: Callable[..., dict[str, Any]],
project: Callable[..., Awaitable[ProjectAtDB]],
pipeline: Callable[..., CompPipelineAtDB],
tasks: Callable[..., list[CompTaskAtDB]],
runs: Callable[..., CompRunsAtDB],
fake_workbench_without_outputs: dict[str, Any],
fake_workbench_adjacency: dict[str, Any],
) -> RunningProject:
user = registered_user()
created_project = await project(user, workbench=fake_workbench_without_outputs)
now_time = arrow.utcnow().datetime
return RunningProject(
project=created_project,
pipeline=pipeline(
project_id=f"{created_project.uuid}",
dag_adjacency_list=fake_workbench_adjacency,
),
tasks=tasks(
user=user,
project=created_project,
state=StateType.RUNNING,
progress=0.0,
start=now_time,
),
runs=runs(
user=user,
project=created_project,
result=StateType.RUNNING,
started=now_time,
cancelled=now_time + datetime.timedelta(seconds=5),
),
runs=runs(user=user, project=created_project, result=StateType.RUNNING),
)


Expand Down
Loading
Loading