Skip to content

Commit 6085d15

Browse files
work
1 parent a0d9481 commit 6085d15

File tree

14 files changed

+633
-30
lines changed

14 files changed

+633
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""add comp_run_snapshot_tasks table
2+
3+
Revision ID: dfa87810f245
4+
Revises: 278daef7e99d
5+
Create Date: 2025-05-26 14:17:04.227338+00:00
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
from sqlalchemy.dialects import postgresql
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "dfa87810f245"
15+
down_revision = "278daef7e99d"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
# ### commands auto generated by Alembic - please adjust! ###
22+
op.create_table(
23+
"comp_run_snapshot_tasks",
24+
sa.Column("snapshot_task_id", sa.Integer(), nullable=False),
25+
sa.Column("run_id", sa.Integer(), nullable=False),
26+
sa.Column("project_id", sa.String(), nullable=True),
27+
sa.Column("node_id", sa.String(), nullable=True),
28+
sa.Column(
29+
"node_class",
30+
postgresql.ENUM(
31+
"COMPUTATIONAL",
32+
"INTERACTIVE",
33+
"FRONTEND",
34+
name="nodeclass",
35+
create_type=False,
36+
),
37+
nullable=True,
38+
),
39+
sa.Column("job_id", sa.String(), nullable=True),
40+
sa.Column("internal_id", sa.Integer(), nullable=True),
41+
sa.Column("schema", sa.JSON(), nullable=True),
42+
sa.Column("inputs", sa.JSON(), nullable=True),
43+
sa.Column("outputs", sa.JSON(), nullable=True),
44+
sa.Column("run_hash", sa.String(), nullable=True),
45+
sa.Column("image", sa.JSON(), nullable=True),
46+
sa.Column(
47+
"state",
48+
postgresql.ENUM(
49+
"NOT_STARTED",
50+
"PUBLISHED",
51+
"PENDING",
52+
"RUNNING",
53+
"SUCCESS",
54+
"FAILED",
55+
"ABORTED",
56+
name="statetype",
57+
create_type=False,
58+
),
59+
server_default="NOT_STARTED",
60+
nullable=False,
61+
),
62+
sa.Column("errors", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
63+
sa.Column("progress", sa.Numeric(precision=3, scale=2), nullable=True),
64+
sa.Column("start", sa.DateTime(timezone=True), nullable=True),
65+
sa.Column("end", sa.DateTime(timezone=True), nullable=True),
66+
sa.Column("last_heartbeat", sa.DateTime(timezone=True), nullable=True),
67+
sa.Column(
68+
"created",
69+
sa.DateTime(timezone=True),
70+
server_default=sa.text("now()"),
71+
nullable=False,
72+
),
73+
sa.Column(
74+
"modified",
75+
sa.DateTime(timezone=True),
76+
server_default=sa.text("now()"),
77+
nullable=False,
78+
),
79+
sa.Column(
80+
"pricing_info", postgresql.JSONB(astext_type=sa.Text()), nullable=True
81+
),
82+
sa.Column(
83+
"hardware_info", postgresql.JSONB(astext_type=sa.Text()), nullable=True
84+
),
85+
sa.Column(
86+
"submit",
87+
sa.DateTime(timezone=True),
88+
server_default=sa.text("'1900-01-01T00:00:00Z'::timestamptz"),
89+
nullable=True,
90+
),
91+
sa.ForeignKeyConstraint(
92+
["project_id"],
93+
["comp_pipeline.project_id"],
94+
),
95+
sa.ForeignKeyConstraint(
96+
["run_id"],
97+
["comp_runs.run_id"],
98+
name="fk_snapshot_tasks_to_comp_runs",
99+
onupdate="CASCADE",
100+
ondelete="CASCADE",
101+
),
102+
sa.PrimaryKeyConstraint("snapshot_task_id"),
103+
)
104+
# ### end Alembic commands ###
105+
106+
107+
def downgrade():
108+
# ### commands auto generated by Alembic - please adjust! ###
109+
op.drop_table("comp_run_snapshot_tasks")
110+
# ### end Alembic commands ###
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
"""Computational Tasks Table"""
2+
3+
import sqlalchemy as sa
4+
from sqlalchemy.dialects import postgresql
5+
from sqlalchemy.dialects.postgresql import ENUM
6+
7+
from ._common import (
8+
RefActions,
9+
column_created_datetime,
10+
column_modified_datetime,
11+
)
12+
from .base import metadata
13+
from .comp_pipeline import StateType
14+
from .comp_runs import comp_runs
15+
16+
comp_run_snapshot_tasks = sa.Table(
17+
"comp_run_snapshot_tasks",
18+
metadata,
19+
sa.Column(
20+
"snapshot_task_id",
21+
sa.Integer,
22+
primary_key=True,
23+
),
24+
sa.Column(
25+
"run_id",
26+
sa.Integer,
27+
sa.ForeignKey(
28+
comp_runs.c.run_id,
29+
name="fk_snapshot_tasks_to_comp_runs",
30+
onupdate=RefActions.CASCADE,
31+
ondelete=RefActions.CASCADE,
32+
),
33+
nullable=False,
34+
),
35+
sa.Column(
36+
"project_id",
37+
sa.String,
38+
sa.ForeignKey("comp_pipeline.project_id"),
39+
doc="Project that contains the node associated to this task",
40+
),
41+
sa.Column("node_id", sa.String, doc="Node associated to this task"),
42+
sa.Column(
43+
"node_class",
44+
ENUM(
45+
"COMPUTATIONAL",
46+
"INTERACTIVE",
47+
"FRONTEND",
48+
name="nodeclass",
49+
create_type=False, # necessary to avoid alembic nodeclass already exists error
50+
),
51+
doc="Classification of the node associated to this task",
52+
),
53+
sa.Column("job_id", sa.String, doc="Worker job ID for this task"),
54+
sa.Column("internal_id", sa.Integer, doc="DEV: only for development. From 1 to N"),
55+
sa.Column("schema", sa.JSON, doc="Schema for inputs and outputs"),
56+
sa.Column("inputs", sa.JSON, doc="Input values"),
57+
sa.Column("outputs", sa.JSON, doc="Output values"),
58+
sa.Column(
59+
"run_hash",
60+
sa.String,
61+
nullable=True,
62+
doc="Hashes inputs before run. Used to detect changes in inputs.",
63+
),
64+
sa.Column(
65+
"image", sa.JSON, doc="Metadata about service image associated to this node"
66+
),
67+
sa.Column(
68+
"state",
69+
ENUM(
70+
"NOT_STARTED",
71+
"PUBLISHED",
72+
"PENDING",
73+
"RUNNING",
74+
"SUCCESS",
75+
"FAILED",
76+
"ABORTED",
77+
name="statetype",
78+
create_type=False, # necessary to avoid alembic statetype already exists error
79+
),
80+
nullable=False,
81+
server_default=StateType.NOT_STARTED.value,
82+
doc="Current state in the task lifecycle",
83+
),
84+
sa.Column(
85+
"errors",
86+
postgresql.JSONB,
87+
nullable=True,
88+
doc="List[models_library.errors.ErrorDict] with error information"
89+
" for a failing state, otherwise set to None",
90+
),
91+
sa.Column(
92+
"progress",
93+
sa.Numeric(precision=3, scale=2), # numbers from 0.00 and 1.00
94+
nullable=True,
95+
doc="current progress of the task if available",
96+
),
97+
sa.Column(
98+
"start", sa.DateTime(timezone=True), doc="UTC timestamp when task started"
99+
),
100+
sa.Column(
101+
"end", sa.DateTime(timezone=True), doc="UTC timestamp for task completion"
102+
),
103+
sa.Column(
104+
"last_heartbeat",
105+
sa.DateTime(timezone=True),
106+
doc="UTC timestamp for last task running check",
107+
),
108+
column_created_datetime(timezone=True),
109+
column_modified_datetime(timezone=True),
110+
sa.Column(
111+
"pricing_info",
112+
postgresql.JSONB,
113+
nullable=True,
114+
doc="Billing information of this task",
115+
),
116+
sa.Column(
117+
"hardware_info",
118+
postgresql.JSONB,
119+
nullable=True,
120+
doc="Harware information of this task",
121+
),
122+
# deprecated columns must be kept due to legacy services
123+
# utc timestamps for submission/start/end
124+
sa.Column(
125+
"submit",
126+
sa.DateTime(timezone=True),
127+
server_default=sa.text("'1900-01-01T00:00:00Z'::timestamptz"),
128+
doc="[DEPRECATED unused but kept for legacy services and must be filled with a default value of 1 January 1900]",
129+
),
130+
)

packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@
22

33
import sqlalchemy as sa
44
from common_library.json_serialization import json_dumps, json_loads
5-
65
from models_library.projects import ProjectID
6+
from models_library.projects_nodes_io import NodeID
77
from models_library.users import UserID
88
from pydantic import TypeAdapter
99
from servicelib.db_asyncpg_utils import create_async_engine_and_database_ready
1010
from settings_library.node_ports import NodePortsSettings
1111
from simcore_postgres_database.models.comp_tasks import comp_tasks
1212
from simcore_postgres_database.models.projects import projects
13+
from simcore_service_director_v2.modules.db.repositories.comp_runs import (
14+
CompRunsRepository,
15+
)
16+
from simcore_service_director_v2.modules.db.repositories.comp_runs_snapshot_tasks import (
17+
CompRunsSnapshotTasksRepository,
18+
)
1319
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
1420

1521
from .exceptions import NodeNotFound, ProjectNotFoundError
@@ -78,7 +84,10 @@ def __init__(self, db_engine: AsyncEngine | None = None):
7884
self._db_engine = db_engine
7985

8086
async def write_ports_configuration(
81-
self, json_configuration: str, project_id: str, node_uuid: str
87+
self,
88+
json_configuration: str,
89+
project_id: str,
90+
node_uuid: str,
8291
):
8392
message = (
8493
f"Writing port configuration to database for "
@@ -91,7 +100,7 @@ async def write_ports_configuration(
91100
DBContextManager(self._db_engine) as engine,
92101
engine.begin() as connection,
93102
):
94-
# update the necessary parts
103+
# 1. Update comp_tasks table
95104
await connection.execute(
96105
comp_tasks.update()
97106
.where(
@@ -105,6 +114,26 @@ async def write_ports_configuration(
105114
run_hash=node_configuration.get("run_hash"),
106115
)
107116
)
117+
# 2. Get latest run id for the project
118+
_latest_run_id = await CompRunsRepository.instance(
119+
engine
120+
).get_latest_run_id_for_project(
121+
connection, project_id=ProjectID(project_id)
122+
)
123+
# 3. Update comp_run_snapshot_tasks table
124+
await CompRunsSnapshotTasksRepository.instance(
125+
engine
126+
).update_for_run_id_and_node_id(
127+
connection,
128+
run_id=_latest_run_id,
129+
node_id=NodeID(node_uuid),
130+
data={
131+
"schema": node_configuration["schema"],
132+
"inputs": node_configuration["inputs"],
133+
"outputs": node_configuration["outputs"],
134+
"run_hash": node_configuration.get("run_hash"),
135+
},
136+
)
108137

109138
async def get_ports_configuration_from_node_uuid(
110139
self, project_id: str, node_uuid: str

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ async def _try_start_pipeline(
199199
project: ProjectAtDB,
200200
users_repo: UsersRepository,
201201
projects_metadata_repo: ProjectsMetadataRepository,
202+
filtered_comp_tasks_in_db: list[CompTaskAtDB],
202203
) -> None:
203204
if not minimal_dag.nodes():
204205
# 2 options here: either we have cycles in the graph or it's really done
@@ -241,6 +242,7 @@ async def _try_start_pipeline(
241242
)
242243
or {},
243244
use_on_demand_clusters=computation.use_on_demand_clusters,
245+
filtered_comp_tasks_in_db=filtered_comp_tasks_in_db,
244246
)
245247

246248

@@ -340,6 +342,13 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
340342
rabbitmq_rpc_client=rpc_client,
341343
)
342344

345+
# filter the tasks by the effective pipeline
346+
filtered_tasks = [
347+
t
348+
for t in comp_tasks
349+
if f"{t.node_id}" in set(minimal_computational_dag.nodes())
350+
]
351+
343352
if computation.start_pipeline:
344353
await _try_start_pipeline(
345354
request.app,
@@ -350,14 +359,9 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
350359
project=project,
351360
users_repo=users_repo,
352361
projects_metadata_repo=projects_metadata_repo,
362+
filtered_comp_tasks_in_db=filtered_tasks,
353363
)
354364

355-
# filter the tasks by the effective pipeline
356-
filtered_tasks = [
357-
t
358-
for t in comp_tasks
359-
if f"{t.node_id}" in set(minimal_computational_dag.nodes())
360-
]
361365
pipeline_state = utils.get_pipeline_state_from_task_states(filtered_tasks)
362366

363367
# get run details if any

0 commit comments

Comments
 (0)