Skip to content

Commit c2cf892

Browse files
🎨 introduce comp_run_snapshot_tasks table (🗃️) (#7758)
1 parent 125e103 commit c2cf892

File tree

24 files changed

+1191
-107
lines changed

24 files changed

+1191
-107
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""add comp_run_snapshot_tasks table
2+
3+
Revision ID: e89eae27fb3f
4+
Revises: 278daef7e99d
5+
Create Date: 2025-05-29 16:52:00.435268+00:00
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
from sqlalchemy.dialects import postgresql
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "e89eae27fb3f"
15+
down_revision = "278daef7e99d"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
# ### commands auto generated by Alembic - please adjust! ###
22+
op.create_table(
23+
"comp_run_snapshot_tasks",
24+
sa.Column("snapshot_task_id", sa.Integer(), nullable=False),
25+
sa.Column("run_id", sa.Integer(), nullable=False),
26+
sa.Column("project_id", sa.String(), nullable=True),
27+
sa.Column("node_id", sa.String(), nullable=True),
28+
# sa.Column('node_class', sa.Enum('COMPUTATIONAL', 'INTERACTIVE', 'FRONTEND', name='nodeclass'), nullable=True),
29+
sa.Column("job_id", sa.String(), nullable=True),
30+
sa.Column("internal_id", sa.Integer(), nullable=True),
31+
sa.Column("schema", sa.JSON(), nullable=True),
32+
sa.Column("inputs", sa.JSON(), nullable=True),
33+
sa.Column("outputs", sa.JSON(), nullable=True),
34+
sa.Column("run_hash", sa.String(), nullable=True),
35+
sa.Column("image", sa.JSON(), nullable=True),
36+
# sa.Column('state', sa.Enum('NOT_STARTED', 'PUBLISHED', 'PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'ABORTED', 'WAITING_FOR_RESOURCES', 'WAITING_FOR_CLUSTER', name='statetype'), server_default='NOT_STARTED', nullable=False),
37+
sa.Column("errors", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
38+
sa.Column("progress", sa.Numeric(precision=3, scale=2), nullable=True),
39+
sa.Column("start", sa.DateTime(timezone=True), nullable=True),
40+
sa.Column("end", sa.DateTime(timezone=True), nullable=True),
41+
sa.Column("last_heartbeat", sa.DateTime(timezone=True), nullable=True),
42+
sa.Column(
43+
"created",
44+
sa.DateTime(timezone=True),
45+
server_default=sa.text("now()"),
46+
nullable=False,
47+
),
48+
sa.Column(
49+
"modified",
50+
sa.DateTime(timezone=True),
51+
server_default=sa.text("now()"),
52+
nullable=False,
53+
),
54+
sa.Column(
55+
"pricing_info", postgresql.JSONB(astext_type=sa.Text()), nullable=True
56+
),
57+
sa.Column(
58+
"hardware_info", postgresql.JSONB(astext_type=sa.Text()), nullable=True
59+
),
60+
sa.ForeignKeyConstraint(
61+
["run_id"],
62+
["comp_runs.run_id"],
63+
name="fk_snapshot_tasks_to_comp_runs",
64+
onupdate="CASCADE",
65+
ondelete="CASCADE",
66+
),
67+
sa.PrimaryKeyConstraint("snapshot_task_id"),
68+
)
69+
op.add_column(
70+
"comp_runs",
71+
sa.Column(
72+
"dag_adjacency_list",
73+
postgresql.JSONB(astext_type=sa.Text()),
74+
server_default=sa.text("'{}'::jsonb"),
75+
nullable=False,
76+
),
77+
)
78+
# ### end Alembic commands ###
79+
op.execute("ALTER TABLE comp_run_snapshot_tasks ADD COLUMN node_class nodeclass;")
80+
op.execute("ALTER TABLE comp_run_snapshot_tasks ADD COLUMN state statetype;")
81+
82+
op.alter_column(
83+
"comp_run_snapshot_tasks",
84+
"state",
85+
existing_type=postgresql.ENUM(
86+
"NOT_STARTED",
87+
"PUBLISHED",
88+
"PENDING",
89+
"RUNNING",
90+
"SUCCESS",
91+
"FAILED",
92+
"ABORTED",
93+
"WAITING_FOR_RESOURCES",
94+
"WAITING_FOR_CLUSTER",
95+
name="statetype",
96+
),
97+
nullable=False,
98+
)
99+
100+
101+
def downgrade():
102+
# ### commands auto generated by Alembic - please adjust! ###
103+
op.drop_column("comp_runs", "dag_adjacency_list")
104+
op.drop_table("comp_run_snapshot_tasks")
105+
# ### end Alembic commands ###
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Computational Tasks Table"""
2+
3+
import sqlalchemy as sa
4+
from sqlalchemy.dialects import postgresql
5+
6+
from ._common import (
7+
RefActions,
8+
column_created_datetime,
9+
column_modified_datetime,
10+
)
11+
from .base import metadata
12+
from .comp_pipeline import StateType
13+
from .comp_runs import comp_runs
14+
from .comp_tasks import NodeClass
15+
16+
comp_run_snapshot_tasks = sa.Table(
17+
"comp_run_snapshot_tasks",
18+
metadata,
19+
sa.Column(
20+
"snapshot_task_id",
21+
sa.Integer,
22+
primary_key=True,
23+
),
24+
sa.Column(
25+
"run_id",
26+
sa.Integer,
27+
sa.ForeignKey(
28+
comp_runs.c.run_id,
29+
name="fk_snapshot_tasks_to_comp_runs",
30+
onupdate=RefActions.CASCADE,
31+
ondelete=RefActions.CASCADE,
32+
),
33+
nullable=False,
34+
),
35+
sa.Column(
36+
"project_id",
37+
sa.String,
38+
doc="Project that contains the node associated to this task",
39+
),
40+
sa.Column("node_id", sa.String, doc="Node associated to this task"),
41+
sa.Column(
42+
"node_class",
43+
sa.Enum(NodeClass, name="nodeclass"),
44+
doc="Classification of the node associated to this task",
45+
),
46+
sa.Column("job_id", sa.String, doc="Worker job ID for this task"),
47+
sa.Column("internal_id", sa.Integer, doc="DEV: only for development. From 1 to N"),
48+
sa.Column("schema", sa.JSON, doc="Schema for inputs and outputs"),
49+
sa.Column("inputs", sa.JSON, doc="Input values"),
50+
sa.Column("outputs", sa.JSON, doc="Output values"),
51+
sa.Column(
52+
"run_hash",
53+
sa.String,
54+
nullable=True,
55+
doc="Hashes inputs before run. Used to detect changes in inputs.",
56+
),
57+
sa.Column(
58+
"image", sa.JSON, doc="Metadata about service image associated to this node"
59+
),
60+
sa.Column(
61+
"state",
62+
sa.Enum(StateType, name="statetype"),
63+
nullable=False,
64+
server_default=StateType.NOT_STARTED.value,
65+
doc="Current state in the task lifecycle",
66+
),
67+
sa.Column(
68+
"errors",
69+
postgresql.JSONB,
70+
nullable=True,
71+
doc="List[models_library.errors.ErrorDict] with error information"
72+
" for a failing state, otherwise set to None",
73+
),
74+
sa.Column(
75+
"progress",
76+
sa.Numeric(precision=3, scale=2), # numbers from 0.00 and 1.00
77+
nullable=True,
78+
doc="current progress of the task if available",
79+
),
80+
sa.Column(
81+
"start", sa.DateTime(timezone=True), doc="UTC timestamp when task started"
82+
),
83+
sa.Column(
84+
"end", sa.DateTime(timezone=True), doc="UTC timestamp for task completion"
85+
),
86+
sa.Column(
87+
"last_heartbeat",
88+
sa.DateTime(timezone=True),
89+
doc="UTC timestamp for last task running check",
90+
),
91+
column_created_datetime(timezone=True),
92+
column_modified_datetime(timezone=True),
93+
sa.Column(
94+
"pricing_info",
95+
postgresql.JSONB,
96+
nullable=True,
97+
doc="Billing information of this task",
98+
),
99+
sa.Column(
100+
"hardware_info",
101+
postgresql.JSONB,
102+
nullable=True,
103+
doc="Harware information of this task",
104+
),
105+
)

packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,13 @@
9898
nullable=False,
9999
doc="the run uses on demand clusters",
100100
),
101+
sa.Column(
102+
"dag_adjacency_list",
103+
JSONB,
104+
doc="Adjacency list for the pipeline's graph",
105+
server_default=sa.text("'{}'::jsonb"),
106+
nullable=False,
107+
),
101108
sa.UniqueConstraint("project_uuid", "user_id", "iteration"),
102109
sa.Index("ix_comp_runs_user_id", "user_id"),
103110
)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from typing import Any
2+
3+
import sqlalchemy as sa
4+
from pydantic import PositiveInt
5+
from sqlalchemy.engine.row import Row
6+
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
7+
8+
from .models.comp_run_snapshot_tasks import comp_run_snapshot_tasks
9+
from .utils_repos import pass_or_acquire_connection
10+
11+
COMP_RUN_SNAPSHOT_TASKS_DB_COLS = (
12+
comp_run_snapshot_tasks.c.snapshot_task_id,
13+
comp_run_snapshot_tasks.c.run_id,
14+
comp_run_snapshot_tasks.c.project_id,
15+
comp_run_snapshot_tasks.c.node_id,
16+
comp_run_snapshot_tasks.c.node_class,
17+
comp_run_snapshot_tasks.c.job_id,
18+
comp_run_snapshot_tasks.c.internal_id,
19+
comp_run_snapshot_tasks.c.schema,
20+
comp_run_snapshot_tasks.c.inputs,
21+
comp_run_snapshot_tasks.c.outputs,
22+
comp_run_snapshot_tasks.c.run_hash,
23+
comp_run_snapshot_tasks.c.image,
24+
comp_run_snapshot_tasks.c.state,
25+
comp_run_snapshot_tasks.c.errors,
26+
comp_run_snapshot_tasks.c.progress,
27+
comp_run_snapshot_tasks.c.start,
28+
comp_run_snapshot_tasks.c.end,
29+
comp_run_snapshot_tasks.c.last_heartbeat,
30+
comp_run_snapshot_tasks.c.created,
31+
comp_run_snapshot_tasks.c.modified,
32+
comp_run_snapshot_tasks.c.pricing_info,
33+
comp_run_snapshot_tasks.c.hardware_info,
34+
)
35+
36+
37+
async def update_for_run_id_and_node_id(
38+
engine: AsyncEngine,
39+
conn: AsyncConnection | None = None,
40+
*,
41+
run_id: PositiveInt,
42+
node_id: str,
43+
data: dict[str, Any],
44+
) -> Row:
45+
async with pass_or_acquire_connection(engine, connection=conn) as _conn:
46+
result = await _conn.execute(
47+
comp_run_snapshot_tasks.update()
48+
.values(
49+
**data,
50+
modified=sa.func.now(),
51+
)
52+
.where(
53+
(comp_run_snapshot_tasks.c.run_id == run_id)
54+
& (comp_run_snapshot_tasks.c.node_id == node_id)
55+
)
56+
.returning(*COMP_RUN_SNAPSHOT_TASKS_DB_COLS)
57+
)
58+
row = result.one_or_none()
59+
if row is None:
60+
msg = f"update for run_id={run_id} and node_id={node_id} did not return any row"
61+
raise ValueError(msg)
62+
return row
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import logging
2+
from typing import cast
3+
4+
import sqlalchemy as sa
5+
from pydantic import PositiveInt
6+
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
7+
8+
from .models.comp_runs import comp_runs
9+
from .utils_repos import pass_or_acquire_connection
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
14+
async def get_latest_run_id_for_project(
15+
engine: AsyncEngine,
16+
conn: AsyncConnection | None = None,
17+
*,
18+
project_id: str,
19+
) -> PositiveInt | None:
20+
# Get latest run per (project_uuid, user_id)
21+
project_and_user_latest_runs = (
22+
sa.select(
23+
comp_runs.c.project_uuid,
24+
comp_runs.c.user_id,
25+
sa.func.max(comp_runs.c.iteration).label("latest_iteration"),
26+
sa.func.max(comp_runs.c.created).label("created"),
27+
)
28+
.where(comp_runs.c.project_uuid == project_id)
29+
.group_by(comp_runs.c.project_uuid, comp_runs.c.user_id)
30+
.subquery("project_and_user_latest_runs")
31+
)
32+
33+
# Rank users per project by latest run creation time
34+
ranked = sa.select(
35+
project_and_user_latest_runs.c.project_uuid,
36+
project_and_user_latest_runs.c.user_id,
37+
project_and_user_latest_runs.c.latest_iteration,
38+
project_and_user_latest_runs.c.created,
39+
sa.func.row_number()
40+
.over(
41+
partition_by=project_and_user_latest_runs.c.project_uuid,
42+
order_by=project_and_user_latest_runs.c.created.desc(),
43+
)
44+
.label("row_number"),
45+
).subquery("ranked")
46+
47+
# Filter to only the top-ranked (most recent) user per project
48+
filtered_ranked = (
49+
sa.select(
50+
ranked.c.project_uuid,
51+
ranked.c.user_id,
52+
ranked.c.latest_iteration,
53+
)
54+
.where(ranked.c.row_number == 1)
55+
.subquery("filtered_ranked")
56+
)
57+
58+
# Base select query
59+
base_select_query = sa.select(comp_runs.c.run_id).select_from(
60+
filtered_ranked.join(
61+
comp_runs,
62+
sa.and_(
63+
comp_runs.c.project_uuid == filtered_ranked.c.project_uuid,
64+
comp_runs.c.user_id == filtered_ranked.c.user_id,
65+
comp_runs.c.iteration == filtered_ranked.c.latest_iteration,
66+
),
67+
)
68+
)
69+
70+
async with pass_or_acquire_connection(engine, connection=conn) as _conn:
71+
result = await _conn.execute(base_select_query)
72+
row = result.one_or_none()
73+
if not row:
74+
msg = f"get_latest_run_id_for_project did not return any row for project_id={project_id} (MD: I think this should not happen, but if it happens contact MD/SAN)"
75+
_logger.error(msg)
76+
return None
77+
return cast(PositiveInt, row.run_id)

0 commit comments

Comments
 (0)