Skip to content

Commit e43ab9d

Browse files
adding DB field dag to comp_runs
1 parent e267d86 commit e43ab9d

File tree

7 files changed

+59
-5
lines changed

7 files changed

+59
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""add comp_run_snapshot_tasks table 3
2+
3+
Revision ID: 27b5ea128a2a
4+
Revises: 47faea7f09c2
5+
Create Date: 2025-05-27 13:30:27.592568+00:00
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "27b5ea128a2a"
14+
down_revision = "47faea7f09c2"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.add_column(
22+
"comp_runs", sa.Column("dag_adjacency_list", sa.JSON(), nullable=True)
23+
)
24+
# ### end Alembic commands ###
25+
26+
27+
def downgrade():
28+
# ### commands auto generated by Alembic - please adjust! ###
29+
op.drop_column("comp_runs", "dag_adjacency_list")
30+
# ### end Alembic commands ###

packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@
9898
nullable=False,
9999
doc="the run uses on demand clusters",
100100
),
101+
sa.Column(
102+
"dag_adjacency_list", sa.JSON, doc="Adjancey list for the pipeline's graph"
103+
),
101104
sa.UniqueConstraint("project_uuid", "user_id", "iteration"),
102105
sa.Index("ix_comp_runs_user_id", "user_id"),
103106
)

services/director-v2/src/simcore_service_director_v2/models/comp_runs.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class CompRunsAtDB(BaseModel):
5656
use_on_demand_clusters: bool
5757
scheduled: datetime.datetime | None
5858
processed: datetime.datetime | None
59+
dag_adjacency_list: dict[str, list[str]]
5960

6061
@field_validator("result", mode="before")
6162
@classmethod
@@ -102,6 +103,7 @@ def convert_null_to_empty_metadata(cls, v):
102103
"use_on_demand_clusters": False,
103104
"scheduled": None,
104105
"processed": None,
106+
"dag_adjacency_list": {},
105107
},
106108
{
107109
"run_id": 432,
@@ -117,6 +119,7 @@ def convert_null_to_empty_metadata(cls, v):
117119
"use_on_demand_clusters": False,
118120
"scheduled": None,
119121
"processed": None,
122+
"dag_adjacency_list": {},
120123
},
121124
{
122125
"run_id": 43243,
@@ -139,6 +142,7 @@ def convert_null_to_empty_metadata(cls, v):
139142
"use_on_demand_clusters": False,
140143
"scheduled": None,
141144
"processed": None,
145+
"dag_adjacency_list": {},
142146
},
143147
{
144148
"run_id": 43243,
@@ -155,6 +159,7 @@ def convert_null_to_empty_metadata(cls, v):
155159
"use_on_demand_clusters": False,
156160
"scheduled": None,
157161
"processed": None,
162+
"dag_adjacency_list": {},
158163
},
159164
]
160165
},

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
from typing import Final
33

4-
import networkx as nx
54
from fastapi import FastAPI
65
from models_library.projects import ProjectID
76
from models_library.users import UserID
@@ -11,6 +10,7 @@
1110
from servicelib.logging_utils import log_context
1211
from servicelib.redis import CouldNotAcquireLockError, exclusive
1312
from servicelib.utils import limited_gather
13+
from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB
1414
from simcore_service_director_v2.modules.db.repositories.comp_runs_snapshot_tasks import (
1515
CompRunsSnapshotTasksRepository,
1616
)
@@ -46,7 +46,8 @@ async def run_new_pipeline(
4646
"""Sets a new pipeline to be scheduled on the computational resources."""
4747
# ensure the pipeline exists and is populated with something
4848
db_engine = get_db_engine(app)
49-
dag = await _get_pipeline_dag(project_id, db_engine)
49+
comp_pipeline_at_db = await _get_pipeline_at_db(project_id, db_engine)
50+
dag = comp_pipeline_at_db.get_graph()
5051
if not dag:
5152
_logger.warning(
5253
"project %s has no computational dag defined. not scheduled for a run.",
@@ -59,6 +60,7 @@ async def run_new_pipeline(
5960
project_id=project_id,
6061
metadata=run_metadata,
6162
use_on_demand_clusters=use_on_demand_clusters,
63+
dag_adjacency_list=comp_pipeline_at_db.dag_adjacency_list,
6264
)
6365

6466
db_create_snaphot_tasks = [
@@ -120,12 +122,12 @@ async def stop_pipeline(
120122
)
121123

122124

123-
async def _get_pipeline_dag(
125+
async def _get_pipeline_at_db(
124126
project_id: ProjectID, db_engine: AsyncEngine
125-
) -> nx.DiGraph:
127+
) -> CompPipelineAtDB:
126128
comp_pipeline_repo = CompPipelinesRepository.instance(db_engine)
127129
pipeline_at_db = await comp_pipeline_repo.get_pipeline(project_id)
128-
return pipeline_at_db.get_graph()
130+
return pipeline_at_db
129131

130132

131133
_LOST_TASKS_FACTOR: Final[int] = 10

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ async def create(
358358
iteration: PositiveInt | None = None,
359359
metadata: RunMetadataDict,
360360
use_on_demand_clusters: bool,
361+
dag_adjacency_list: dict[str, list[str]],
361362
) -> CompRunsAtDB:
362363
try:
363364
async with transaction_context(self.db_engine) as conn:
@@ -373,6 +374,7 @@ async def create(
373374
result=RUNNING_STATE_TO_DB[RunningState.PUBLISHED],
374375
metadata=jsonable_encoder(metadata),
375376
use_on_demand_clusters=use_on_demand_clusters,
377+
dag_adjacency_list=dag_adjacency_list,
376378
)
377379
.returning(literal_column("*"))
378380
)

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ async def test_list(
9393
iteration=None,
9494
metadata=run_metadata,
9595
use_on_demand_clusters=faker.pybool(),
96+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
9697
)
9798
assert await CompRunsRepository(sqlalchemy_async_engine).list_() == [created]
9899

@@ -104,6 +105,7 @@ async def test_list(
104105
iteration=created.iteration + n + 1,
105106
metadata=run_metadata,
106107
use_on_demand_clusters=faker.pybool(),
108+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
107109
)
108110
for n in range(50)
109111
)
@@ -268,6 +270,7 @@ async def test_create(
268270
iteration=None,
269271
metadata=run_metadata,
270272
use_on_demand_clusters=faker.pybool(),
273+
dag_adjacency_list={},
271274
)
272275
published_project = await publish_project()
273276
with pytest.raises(UserNotFoundError):
@@ -277,6 +280,7 @@ async def test_create(
277280
iteration=None,
278281
metadata=run_metadata,
279282
use_on_demand_clusters=faker.pybool(),
283+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
280284
)
281285

282286
created = await CompRunsRepository(sqlalchemy_async_engine).create(
@@ -285,6 +289,7 @@ async def test_create(
285289
iteration=None,
286290
metadata=run_metadata,
287291
use_on_demand_clusters=faker.pybool(),
292+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
288293
)
289294
got = await CompRunsRepository(sqlalchemy_async_engine).get(
290295
user_id=published_project.user["id"],
@@ -299,6 +304,7 @@ async def test_create(
299304
iteration=None,
300305
metadata=run_metadata,
301306
use_on_demand_clusters=faker.pybool(),
307+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
302308
)
303309
assert created != got
304310
assert created.iteration == got.iteration + 1
@@ -332,6 +338,7 @@ async def test_update(
332338
iteration=None,
333339
metadata=run_metadata,
334340
use_on_demand_clusters=faker.pybool(),
341+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
335342
)
336343

337344
got = await CompRunsRepository(sqlalchemy_async_engine).get(
@@ -365,6 +372,7 @@ async def test_set_run_result(
365372
iteration=None,
366373
metadata=run_metadata,
367374
use_on_demand_clusters=faker.pybool(),
375+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
368376
)
369377
got = await CompRunsRepository(sqlalchemy_async_engine).get(
370378
user_id=published_project.user["id"],
@@ -412,6 +420,7 @@ async def test_mark_for_cancellation(
412420
iteration=None,
413421
metadata=run_metadata,
414422
use_on_demand_clusters=faker.pybool(),
423+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
415424
)
416425
got = await CompRunsRepository(sqlalchemy_async_engine).get(
417426
user_id=published_project.user["id"],
@@ -443,6 +452,7 @@ async def test_mark_for_scheduling(
443452
iteration=None,
444453
metadata=run_metadata,
445454
use_on_demand_clusters=faker.pybool(),
455+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
446456
)
447457
got = await CompRunsRepository(sqlalchemy_async_engine).get(
448458
user_id=published_project.user["id"],
@@ -476,6 +486,7 @@ async def test_mark_scheduling_done(
476486
iteration=None,
477487
metadata=run_metadata,
478488
use_on_demand_clusters=faker.pybool(),
489+
dag_adjacency_list=published_project.pipeline.dag_adjacency_list,
479490
)
480491
got = await CompRunsRepository(sqlalchemy_async_engine).get(
481492
user_id=published_project.user["id"],

services/director-v2/tests/unit/with_dbs/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ async def _(
196196
"result": StateType.NOT_STARTED,
197197
"metadata": jsonable_encoder(run_metadata),
198198
"use_on_demand_clusters": False,
199+
"dag_adjacency_list": {},
199200
}
200201
run_config.update(**run_kwargs)
201202
async with sqlalchemy_async_engine.begin() as conn:

0 commit comments

Comments
 (0)