Skip to content

Commit b79307c

Browse files
review @sanderegg
1 parent 469f2db commit b79307c

File tree

12 files changed

+58
-86
lines changed

12 files changed

+58
-86
lines changed
Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""add comp_run_snapshot_tasks table
22
3-
Revision ID: ae0f63bb3c86
3+
Revision ID: 0019fbd911b6
44
Revises: 278daef7e99d
5-
Create Date: 2025-05-27 14:12:10.926590+00:00
5+
Create Date: 2025-05-28 08:51:35.563513+00:00
66
77
"""
88

@@ -11,7 +11,7 @@
1111
from sqlalchemy.dialects import postgresql
1212

1313
# revision identifiers, used by Alembic.
14-
revision = "ae0f63bb3c86"
14+
revision = "0019fbd911b6"
1515
down_revision = "278daef7e99d"
1616
branch_labels = None
1717
depends_on = None
@@ -82,12 +82,6 @@ def upgrade():
8282
sa.Column(
8383
"hardware_info", postgresql.JSONB(astext_type=sa.Text()), nullable=True
8484
),
85-
sa.Column(
86-
"submit",
87-
sa.DateTime(timezone=True),
88-
server_default=sa.text("'1900-01-01T00:00:00Z'::timestamptz"),
89-
nullable=True,
90-
),
9185
sa.ForeignKeyConstraint(
9286
["run_id"],
9387
["comp_runs.run_id"],
@@ -98,7 +92,10 @@ def upgrade():
9892
sa.PrimaryKeyConstraint("snapshot_task_id"),
9993
)
10094
op.add_column(
101-
"comp_runs", sa.Column("dag_adjacency_list", sa.JSON(), nullable=True)
95+
"comp_runs",
96+
sa.Column(
97+
"dag_adjacency_list", postgresql.JSONB(astext_type=sa.Text()), nullable=True
98+
),
10299
)
103100
# ### end Alembic commands ###
104101

packages/postgres-database/src/simcore_postgres_database/models/comp_run_snapshot_tasks.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,4 @@
118118
nullable=True,
119119
doc="Harware information of this task",
120120
),
121-
# deprecated columns must be kept due to legacy services
122-
# utc timestamps for submission/start/end
123-
sa.Column(
124-
"submit",
125-
sa.DateTime(timezone=True),
126-
server_default=sa.text("'1900-01-01T00:00:00Z'::timestamptz"),
127-
doc="[DEPRECATED unused but kept for legacy services and must be filled with a default value of 1 January 1900]",
128-
),
129121
)

packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
doc="the run uses on demand clusters",
100100
),
101101
sa.Column(
102-
"dag_adjacency_list", sa.JSON, doc="Adjancey list for the pipeline's graph"
102+
"dag_adjacency_list", JSONB, doc="Adjancey list for the pipeline's graph"
103103
),
104104
sa.UniqueConstraint("project_uuid", "user_id", "iteration"),
105105
sa.Index("ix_comp_runs_user_id", "user_id"),

packages/postgres-database/src/simcore_postgres_database/utils_comp_run_snapshot_tasks.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Any
2+
13
import sqlalchemy as sa
24
from pydantic import PositiveInt
35
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
@@ -28,7 +30,6 @@
2830
comp_run_snapshot_tasks.c.modified,
2931
comp_run_snapshot_tasks.c.pricing_info,
3032
comp_run_snapshot_tasks.c.hardware_info,
31-
comp_run_snapshot_tasks.c.submit,
3233
)
3334

3435

@@ -38,7 +39,7 @@ async def update_for_run_id_and_node_id(
3839
*,
3940
run_id: PositiveInt,
4041
node_id: str,
41-
data: dict,
42+
data: dict[str, Any],
4243
):
4344
async with pass_or_acquire_connection(engine, connection=conn) as _conn:
4445
result = await _conn.stream(

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ async def _try_start_pipeline(
199199
project: ProjectAtDB,
200200
users_repo: UsersRepository,
201201
projects_metadata_repo: ProjectsMetadataRepository,
202-
tasks_to_run: list[CompTaskAtDB],
203202
) -> None:
204203
if not minimal_dag.nodes():
205204
# 2 options here: either we have cycles in the graph or it's really done
@@ -242,7 +241,6 @@ async def _try_start_pipeline(
242241
)
243242
or {},
244243
use_on_demand_clusters=computation.use_on_demand_clusters,
245-
tasks_to_run=tasks_to_run,
246244
)
247245

248246

@@ -342,13 +340,6 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
342340
rabbitmq_rpc_client=rpc_client,
343341
)
344342

345-
# filter the tasks by the effective pipeline
346-
filtered_tasks = [
347-
t
348-
for t in comp_tasks
349-
if f"{t.node_id}" in set(minimal_computational_dag.nodes())
350-
]
351-
352343
if computation.start_pipeline:
353344
await _try_start_pipeline(
354345
request.app,
@@ -359,9 +350,14 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
359350
project=project,
360351
users_repo=users_repo,
361352
projects_metadata_repo=projects_metadata_repo,
362-
tasks_to_run=filtered_tasks,
363353
)
364354

355+
# filter the tasks by the effective pipeline
356+
filtered_tasks = [
357+
t
358+
for t in comp_tasks
359+
if f"{t.node_id}" in set(minimal_computational_dag.nodes())
360+
]
365361
pipeline_state = utils.get_pipeline_state_from_task_states(filtered_tasks)
366362

367363
# get run details if any

services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,6 @@ class BaseCompTaskAtDB(BaseModel):
150150
pricing_info: dict | None
151151
hardware_info: HardwareInfo
152152

153-
submit: dt.datetime | None = Field(
154-
default=None, deprecated=True, description="Required for legacy services"
155-
)
156-
157153
@field_validator("state", mode="before")
158154
@classmethod
159155
def _convert_state_from_state_type_enum_if_needed(cls, v):
@@ -183,6 +179,9 @@ def _backward_compatible_null_value(cls, v: HardwareInfo | None) -> HardwareInfo
183179

184180
class CompTaskAtDB(BaseCompTaskAtDB):
185181
task_id: PositiveInt | None = None
182+
submit: dt.datetime | None = Field(
183+
default=None, deprecated=True, description="Required for legacy services"
184+
)
186185

187186
def to_db_model(self, **exclusion_rules) -> dict[str, Any]:
188187
# mode json is used to ensure the UUIDs are converted to strings

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from typing import Final
33

4+
import networkx as nx
45
from fastapi import FastAPI
56
from models_library.projects import ProjectID
67
from models_library.users import UserID
@@ -22,6 +23,7 @@
2223
from ..db import get_db_engine
2324
from ..db.repositories.comp_pipelines import CompPipelinesRepository
2425
from ..db.repositories.comp_runs import CompRunsRepository
26+
from ..db.repositories.comp_tasks import CompTasksRepository
2527
from ..rabbitmq import get_rabbitmq_client
2628
from ._constants import (
2729
MAX_CONCURRENT_PIPELINE_SCHEDULING,
@@ -41,13 +43,13 @@ async def run_new_pipeline(
4143
project_id: ProjectID,
4244
run_metadata: RunMetadataDict,
4345
use_on_demand_clusters: bool,
44-
tasks_to_run: list[CompTaskAtDB],
4546
) -> None:
4647
"""Sets a new pipeline to be scheduled on the computational resources."""
4748
# ensure the pipeline exists and is populated with something
4849
db_engine = get_db_engine(app)
4950
comp_pipeline_at_db = await _get_pipeline_at_db(project_id, db_engine)
5051
dag = comp_pipeline_at_db.get_graph()
52+
5153
if not dag:
5254
_logger.warning(
5355
"project %s has no computational dag defined. not scheduled for a run.",
@@ -63,11 +65,11 @@ async def run_new_pipeline(
6365
dag_adjacency_list=comp_pipeline_at_db.dag_adjacency_list,
6466
)
6567

68+
tasks_to_run = await _get_pipeline_tasks_at_db(db_engine, project_id, dag)
6669
db_create_snaphot_tasks = [
6770
{
68-
**task.to_db_model(exclude={"created", "modified", "submit"}),
71+
**task.to_db_model(exclude={"created", "modified"}),
6972
"run_id": new_run.run_id,
70-
# "submit": datetime.fromisoformat(task.submit)
7173
}
7274
for task in tasks_to_run
7375
]
@@ -130,6 +132,17 @@ async def _get_pipeline_at_db(
130132
return pipeline_at_db
131133

132134

135+
async def _get_pipeline_tasks_at_db(
136+
db_engine: AsyncEngine, project_id: ProjectID, pipeline_dag: nx.DiGraph
137+
) -> list[CompTaskAtDB]:
138+
comp_tasks_repo = CompTasksRepository.instance(db_engine)
139+
return [
140+
t
141+
for t in await comp_tasks_repo.list_computational_tasks(project_id)
142+
if (f"{t.node_id}" in list(pipeline_dag.nodes()))
143+
]
144+
145+
133146
_LOST_TASKS_FACTOR: Final[int] = 10
134147

135148

services/director-v2/tests/unit/_helpers.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ class PublishedProject:
2121
project: ProjectAtDB
2222
pipeline: CompPipelineAtDB
2323
tasks: list[CompTaskAtDB]
24-
tasks_to_run: list[CompTaskAtDB]
2524

2625

2726
@dataclass(kw_only=True)

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ async def test_schedule_all_pipelines(
156156
project_id=published_project.project.uuid,
157157
run_metadata=run_metadata,
158158
use_on_demand_clusters=False,
159-
tasks_to_run=published_project.tasks_to_run,
160159
)
161160
# this directly schedule a new pipeline
162161
scheduler_rabbit_client_parser.assert_called_once_with(
@@ -258,7 +257,6 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
258257
project_id=published_project.project.uuid,
259258
run_metadata=run_metadata,
260259
use_on_demand_clusters=False,
261-
tasks_to_run=published_project.tasks_to_run,
262260
)
263261
# this directly schedule a new pipeline
264262
scheduler_rabbit_client_parser.assert_called_once_with(
@@ -342,7 +340,6 @@ async def test_empty_pipeline_is_not_scheduled(
342340
project_id=empty_project.uuid,
343341
run_metadata=run_metadata,
344342
use_on_demand_clusters=False,
345-
tasks_to_run=[],
346343
)
347344
await assert_comp_runs_empty(sqlalchemy_async_engine)
348345
scheduler_rabbit_client_parser.assert_not_called()
@@ -358,7 +355,6 @@ async def test_empty_pipeline_is_not_scheduled(
358355
project_id=empty_project.uuid,
359356
run_metadata=run_metadata,
360357
use_on_demand_clusters=False,
361-
tasks_to_run=[],
362358
)
363359
assert len(caplog.records) == 1
364360
assert "no computational dag defined" in caplog.records[0].message

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ async def _assert_start_pipeline(
169169
project_id=published_project.project.uuid,
170170
run_metadata=run_metadata,
171171
use_on_demand_clusters=False,
172-
tasks_to_run=published_project.tasks_to_run,
173172
)
174173

175174
# check the database is correctly updated, the run is published
@@ -1125,7 +1124,6 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
11251124
project_id=sleepers_project.uuid,
11261125
run_metadata=run_metadata,
11271126
use_on_demand_clusters=False,
1128-
tasks_to_run=[],
11291127
)
11301128
with_disabled_scheduler_publisher.assert_called_once()
11311129
# we shall have a a new comp_runs row with the new pipeline job
@@ -1253,7 +1251,6 @@ async def test_handling_of_disconnected_scheduler_dask(
12531251
project_id=published_project.project.uuid,
12541252
run_metadata=run_metadata,
12551253
use_on_demand_clusters=False,
1256-
tasks_to_run=published_project.tasks_to_run,
12571254
)
12581255

12591256
# since there is no cluster, there is no dask-scheduler,
@@ -1769,7 +1766,6 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
17691766
project_id=published_project.project.uuid,
17701767
run_metadata=run_metadata,
17711768
use_on_demand_clusters=True,
1772-
tasks_to_run=published_project.tasks_to_run,
17731769
)
17741770

17751771
# we ask to use an on-demand cluster, therefore the tasks are published first
@@ -1874,7 +1870,6 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
18741870
project_id=published_project.project.uuid,
18751871
run_metadata=run_metadata,
18761872
use_on_demand_clusters=True,
1877-
tasks_to_run=published_project.tasks_to_run,
18781873
)
18791874

18801875
# we ask to use an on-demand cluster, therefore the tasks are published first

0 commit comments

Comments
 (0)