Skip to content

Commit c0b3261

Browse files
committed
Merge branch 'master' into bugfix-tracing-exemplars
2 parents 01100b3 + 403186c commit c0b3261

File tree

9 files changed

+395
-197
lines changed

9 files changed

+395
-197
lines changed

.github/copilot-instructions.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ This document provides guidelines and best practices for using GitHub Copilot in
2323
- ensure we use `fastapi` >0.100 compatible code
2424
- use f-string formatting
2525
- Only add comments in function if strictly necessary
26+
- use relative imports
27+
- imports should be at top of the file
2628

2729

2830
### Json serialization
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
"""remove whole row in payload
2+
3+
Revision ID: 278daef7e99d
4+
Revises: 4e7d8719855b
5+
Create Date: 2025-05-22 21:22:11.084001+00:00
6+
7+
"""
8+
9+
from typing import Final
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "278daef7e99d"
16+
down_revision = "4e7d8719855b"
17+
branch_labels = None
18+
depends_on = None
19+
20+
DB_PROCEDURE_NAME: Final[str] = "notify_comp_tasks_changed"
21+
DB_TRIGGER_NAME: Final[str] = f"{DB_PROCEDURE_NAME}_event"
22+
DB_CHANNEL_NAME: Final[str] = "comp_tasks_output_events"
23+
24+
25+
def upgrade():
26+
drop_trigger = sa.DDL(
27+
f"""
28+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
29+
"""
30+
)
31+
32+
task_output_changed_procedure = sa.DDL(
33+
f"""
34+
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$
35+
DECLARE
36+
record RECORD;
37+
payload JSON;
38+
changes JSONB;
39+
BEGIN
40+
IF (TG_OP = 'DELETE') THEN
41+
record = OLD;
42+
ELSE
43+
record = NEW;
44+
END IF;
45+
46+
SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes
47+
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
48+
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;
49+
50+
payload = json_build_object(
51+
'table', TG_TABLE_NAME,
52+
'changes', changes,
53+
'action', TG_OP,
54+
'task_id', record.task_id,
55+
'project_id', record.project_id,
56+
'node_id', record.node_id
57+
);
58+
59+
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);
60+
61+
RETURN NULL;
62+
END;
63+
$$ LANGUAGE plpgsql;
64+
"""
65+
)
66+
67+
task_output_changed_trigger = sa.DDL(
68+
f"""
69+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
70+
CREATE TRIGGER {DB_TRIGGER_NAME}
71+
AFTER UPDATE OF outputs,state ON comp_tasks
72+
FOR EACH ROW
73+
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state))
74+
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}();
75+
"""
76+
)
77+
78+
op.execute(drop_trigger)
79+
op.execute(task_output_changed_procedure)
80+
op.execute(task_output_changed_trigger)
81+
82+
83+
def downgrade():
84+
drop_trigger = sa.DDL(
85+
f"""
86+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
87+
"""
88+
)
89+
90+
task_output_changed_procedure = sa.DDL(
91+
f"""
92+
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$
93+
DECLARE
94+
record RECORD;
95+
payload JSON;
96+
changes JSONB;
97+
BEGIN
98+
IF (TG_OP = 'DELETE') THEN
99+
record = OLD;
100+
ELSE
101+
record = NEW;
102+
END IF;
103+
104+
SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes
105+
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
106+
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;
107+
108+
payload = json_build_object('table', TG_TABLE_NAME,
109+
'changes', changes,
110+
'action', TG_OP,
111+
'data', row_to_json(record));
112+
113+
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);
114+
115+
RETURN NULL;
116+
END;
117+
$$ LANGUAGE plpgsql;
118+
"""
119+
)
120+
121+
task_output_changed_trigger = sa.DDL(
122+
f"""
123+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
124+
CREATE TRIGGER {DB_TRIGGER_NAME}
125+
AFTER UPDATE OF outputs,state ON comp_tasks
126+
FOR EACH ROW
127+
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state))
128+
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}();
129+
"""
130+
)
131+
132+
op.execute(drop_trigger)
133+
op.execute(task_output_changed_procedure)
134+
op.execute(task_output_changed_trigger)

packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,14 @@ class NodeClass(enum.Enum):
152152
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
153153
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;
154154
155-
payload = json_build_object('table', TG_TABLE_NAME,
156-
'changes', changes,
157-
'action', TG_OP,
158-
'data', row_to_json(record));
155+
payload = json_build_object(
156+
'table', TG_TABLE_NAME,
157+
'changes', changes,
158+
'action', TG_OP,
159+
'task_id', record.task_id,
160+
'project_id', record.project_id,
161+
'node_id', record.node_id
162+
);
159163
160164
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);
161165

packages/postgres-database/tests/test_comp_tasks.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import pytest
1111
from aiopg.sa.engine import Engine, SAConnection
12-
from aiopg.sa.result import RowProxy
1312
from simcore_postgres_database.models.comp_pipeline import StateType
1413
from simcore_postgres_database.models.comp_tasks import (
1514
DB_CHANNEL_NAME,
@@ -20,7 +19,7 @@
2019

2120

2221
@pytest.fixture()
23-
async def db_connection(aiopg_engine: Engine) -> SAConnection:
22+
async def db_connection(aiopg_engine: Engine) -> AsyncIterator[SAConnection]:
2423
async with aiopg_engine.acquire() as conn:
2524
yield conn
2625

@@ -31,6 +30,7 @@ async def db_notification_queue(
3130
) -> AsyncIterator[asyncio.Queue]:
3231
listen_query = f"LISTEN {DB_CHANNEL_NAME};"
3332
await db_connection.execute(listen_query)
33+
assert db_connection.connection
3434
notifications_queue: asyncio.Queue = db_connection.connection.notifies
3535
assert notifications_queue.empty()
3636
yield notifications_queue
@@ -51,7 +51,8 @@ async def task(
5151
.values(outputs=json.dumps({}), node_class=task_class)
5252
.returning(literal_column("*"))
5353
)
54-
row: RowProxy = await result.fetchone()
54+
row = await result.fetchone()
55+
assert row
5556
task = dict(row)
5657

5758
assert (
@@ -73,8 +74,15 @@ async def _assert_notification_queue_status(
7374

7475
assert msg, "notification msg from postgres is empty!"
7576
task_data = json.loads(msg.payload)
76-
77-
for k in ["table", "changes", "action", "data"]:
77+
expected_keys = [
78+
"task_id",
79+
"project_id",
80+
"node_id",
81+
"changes",
82+
"action",
83+
"table",
84+
]
85+
for k in expected_keys:
7886
assert k in task_data, f"invalid structure, expected [{k}] in {task_data}"
7987

8088
tasks.append(task_data)
@@ -110,20 +118,27 @@ async def test_listen_query(
110118
)
111119
tasks = await _assert_notification_queue_status(db_notification_queue, 1)
112120
assert tasks[0]["changes"] == ["modified", "outputs", "state"]
121+
assert tasks[0]["action"] == "UPDATE"
122+
assert tasks[0]["table"] == "comp_tasks"
123+
assert tasks[0]["task_id"] == task["task_id"]
124+
assert tasks[0]["project_id"] == task["project_id"]
125+
assert tasks[0]["node_id"] == task["node_id"]
126+
113127
assert (
114-
tasks[0]["data"]["outputs"] == updated_output
115-
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}"
128+
"data" not in tasks[0]
129+
), "data is not expected in the notification payload anymore"
116130

117131
# setting the exact same data twice triggers only ONCE
118132
updated_output = {"some new stuff": "it is newer"}
119133
await _update_comp_task_with(db_connection, task, outputs=updated_output)
120134
await _update_comp_task_with(db_connection, task, outputs=updated_output)
121135
tasks = await _assert_notification_queue_status(db_notification_queue, 1)
122136
assert tasks[0]["changes"] == ["modified", "outputs"]
123-
assert (
124-
tasks[0]["data"]["outputs"] == updated_output
125-
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}"
126-
137+
assert tasks[0]["action"] == "UPDATE"
138+
assert tasks[0]["table"] == "comp_tasks"
139+
assert tasks[0]["task_id"] == task["task_id"]
140+
assert tasks[0]["project_id"] == task["project_id"]
141+
assert tasks[0]["node_id"] == task["node_id"]
127142
# updating a number of times with different stuff comes out in FIFO order
128143
NUM_CALLS = 20
129144
update_outputs = []
@@ -135,7 +150,10 @@ async def test_listen_query(
135150
tasks = await _assert_notification_queue_status(db_notification_queue, NUM_CALLS)
136151

137152
for n, output in enumerate(update_outputs):
153+
assert output
138154
assert tasks[n]["changes"] == ["modified", "outputs"]
139-
assert (
140-
tasks[n]["data"]["outputs"] == output
141-
), f"the data received from the database is {tasks[n]}, expected new output is {output}"
155+
assert tasks[n]["action"] == "UPDATE"
156+
assert tasks[n]["table"] == "comp_tasks"
157+
assert tasks[n]["task_id"] == task["task_id"]
158+
assert tasks[n]["project_id"] == task["project_id"]
159+
assert tasks[n]["node_id"] == task["node_id"]

packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
import pytest
1111
import sqlalchemy as sa
1212
from faker import Faker
13-
from models_library.projects import ProjectAtDB
13+
from models_library.projects import ProjectAtDB, ProjectID
1414
from models_library.projects_nodes_io import NodeID
1515
from simcore_postgres_database.models.comp_pipeline import StateType, comp_pipeline
16+
from simcore_postgres_database.models.comp_tasks import comp_tasks
1617
from simcore_postgres_database.models.projects import ProjectType, projects
1718
from simcore_postgres_database.models.users import UserRole, UserStatus, users
1819
from simcore_postgres_database.utils_projects_nodes import (
@@ -142,9 +143,8 @@ def creator(**pipeline_kwargs) -> dict[str, Any]:
142143
.values(**pipeline_config)
143144
.returning(sa.literal_column("*"))
144145
)
145-
new_pipeline = result.first()
146-
assert new_pipeline
147-
new_pipeline = dict(new_pipeline)
146+
row = result.one()
147+
new_pipeline = row._asdict()
148148
created_pipeline_ids.append(new_pipeline["project_id"])
149149
return new_pipeline
150150

@@ -157,3 +157,29 @@ def creator(**pipeline_kwargs) -> dict[str, Any]:
157157
comp_pipeline.c.project_id.in_(created_pipeline_ids)
158158
)
159159
)
160+
161+
162+
@pytest.fixture
163+
def comp_task(postgres_db: sa.engine.Engine) -> Iterator[Callable[..., dict[str, Any]]]:
164+
created_task_ids: list[int] = []
165+
166+
def creator(project_id: ProjectID, **task_kwargs) -> dict[str, Any]:
167+
task_config = {"project_id": f"{project_id}"} | task_kwargs
168+
with postgres_db.connect() as conn:
169+
result = conn.execute(
170+
comp_tasks.insert()
171+
.values(**task_config)
172+
.returning(sa.literal_column("*"))
173+
)
174+
row = result.one()
175+
new_task = row._asdict()
176+
created_task_ids.append(new_task["task_id"])
177+
return new_task
178+
179+
yield creator
180+
181+
# cleanup
182+
with postgres_db.connect() as conn:
183+
conn.execute(
184+
comp_tasks.delete().where(comp_tasks.c.task_id.in_(created_task_ids))
185+
)

0 commit comments

Comments
 (0)