Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ This document provides guidelines and best practices for using GitHub Copilot in
- ensure we use `fastapi` >0.100 compatible code
- use f-string formatting
- Only add comments in function if strictly necessary
- use relative imports
- imports should be at top of the file


### Json serialization
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""remove whole row in payload

Revision ID: 278daef7e99d
Revises: 4e7d8719855b
Create Date: 2025-05-22 21:22:11.084001+00:00

"""

from typing import Final

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "278daef7e99d"
down_revision = "4e7d8719855b"
branch_labels = None
depends_on = None

DB_PROCEDURE_NAME: Final[str] = "notify_comp_tasks_changed"
DB_TRIGGER_NAME: Final[str] = f"{DB_PROCEDURE_NAME}_event"
DB_CHANNEL_NAME: Final[str] = "comp_tasks_output_events"


def upgrade():
drop_trigger = sa.DDL(
f"""
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
"""
)

task_output_changed_procedure = sa.DDL(
f"""
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$
DECLARE
record RECORD;
payload JSON;
changes JSONB;
BEGIN
IF (TG_OP = 'DELETE') THEN
record = OLD;
ELSE
record = NEW;
END IF;

SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;

payload = json_build_object(
'table', TG_TABLE_NAME,
'changes', changes,
'action', TG_OP,
'task_id', record.task_id,
'project_id', record.project_id,
'node_id', record.node_id
);

PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);

RETURN NULL;
END;
$$ LANGUAGE plpgsql;
"""
)

task_output_changed_trigger = sa.DDL(
f"""
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
CREATE TRIGGER {DB_TRIGGER_NAME}
AFTER UPDATE OF outputs,state ON comp_tasks
FOR EACH ROW
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state))
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}();
"""
)

op.execute(drop_trigger)
op.execute(task_output_changed_procedure)
op.execute(task_output_changed_trigger)


def downgrade():
drop_trigger = sa.DDL(
f"""
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
"""
)

task_output_changed_procedure = sa.DDL(
f"""
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$
DECLARE
record RECORD;
payload JSON;
changes JSONB;
BEGIN
IF (TG_OP = 'DELETE') THEN
record = OLD;
ELSE
record = NEW;
END IF;

SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;

payload = json_build_object('table', TG_TABLE_NAME,
'changes', changes,
'action', TG_OP,
'data', row_to_json(record));

PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);

RETURN NULL;
END;
$$ LANGUAGE plpgsql;
"""
)

task_output_changed_trigger = sa.DDL(
f"""
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
CREATE TRIGGER {DB_TRIGGER_NAME}
AFTER UPDATE OF outputs,state ON comp_tasks
FOR EACH ROW
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state))
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}();
"""
)

op.execute(drop_trigger)
op.execute(task_output_changed_procedure)
op.execute(task_output_changed_trigger)
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,14 @@ class NodeClass(enum.Enum):
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;

payload = json_build_object('table', TG_TABLE_NAME,
'changes', changes,
'action', TG_OP,
'data', row_to_json(record));
payload = json_build_object(
'table', TG_TABLE_NAME,
'changes', changes,
'action', TG_OP,
'task_id', record.task_id,
'project_id', record.project_id,
'node_id', record.node_id
);

PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);

Expand Down
46 changes: 32 additions & 14 deletions packages/postgres-database/tests/test_comp_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import pytest
from aiopg.sa.engine import Engine, SAConnection
from aiopg.sa.result import RowProxy
from simcore_postgres_database.models.comp_pipeline import StateType
from simcore_postgres_database.models.comp_tasks import (
DB_CHANNEL_NAME,
Expand All @@ -20,7 +19,7 @@


@pytest.fixture()
async def db_connection(aiopg_engine: Engine) -> SAConnection:
async def db_connection(aiopg_engine: Engine) -> AsyncIterator[SAConnection]:
async with aiopg_engine.acquire() as conn:
yield conn

Expand All @@ -31,6 +30,7 @@ async def db_notification_queue(
) -> AsyncIterator[asyncio.Queue]:
listen_query = f"LISTEN {DB_CHANNEL_NAME};"
await db_connection.execute(listen_query)
assert db_connection.connection
notifications_queue: asyncio.Queue = db_connection.connection.notifies
assert notifications_queue.empty()
yield notifications_queue
Expand All @@ -51,7 +51,8 @@ async def task(
.values(outputs=json.dumps({}), node_class=task_class)
.returning(literal_column("*"))
)
row: RowProxy = await result.fetchone()
row = await result.fetchone()
assert row
task = dict(row)

assert (
Expand All @@ -73,8 +74,15 @@ async def _assert_notification_queue_status(

assert msg, "notification msg from postgres is empty!"
task_data = json.loads(msg.payload)

for k in ["table", "changes", "action", "data"]:
expected_keys = [
"task_id",
"project_id",
"node_id",
"changes",
"action",
"table",
]
for k in expected_keys:
assert k in task_data, f"invalid structure, expected [{k}] in {task_data}"

tasks.append(task_data)
Expand Down Expand Up @@ -110,20 +118,27 @@ async def test_listen_query(
)
tasks = await _assert_notification_queue_status(db_notification_queue, 1)
assert tasks[0]["changes"] == ["modified", "outputs", "state"]
assert tasks[0]["action"] == "UPDATE"
assert tasks[0]["table"] == "comp_tasks"
assert tasks[0]["task_id"] == task["task_id"]
assert tasks[0]["project_id"] == task["project_id"]
assert tasks[0]["node_id"] == task["node_id"]

assert (
tasks[0]["data"]["outputs"] == updated_output
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}"
"data" not in tasks[0]
), "data is not expected in the notification payload anymore"

# setting the exact same data twice triggers only ONCE
updated_output = {"some new stuff": "it is newer"}
await _update_comp_task_with(db_connection, task, outputs=updated_output)
await _update_comp_task_with(db_connection, task, outputs=updated_output)
tasks = await _assert_notification_queue_status(db_notification_queue, 1)
assert tasks[0]["changes"] == ["modified", "outputs"]
assert (
tasks[0]["data"]["outputs"] == updated_output
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}"

assert tasks[0]["action"] == "UPDATE"
assert tasks[0]["table"] == "comp_tasks"
assert tasks[0]["task_id"] == task["task_id"]
assert tasks[0]["project_id"] == task["project_id"]
assert tasks[0]["node_id"] == task["node_id"]
# updating a number of times with different stuff comes out in FIFO order
NUM_CALLS = 20
update_outputs = []
Expand All @@ -135,7 +150,10 @@ async def test_listen_query(
tasks = await _assert_notification_queue_status(db_notification_queue, NUM_CALLS)

for n, output in enumerate(update_outputs):
assert output
assert tasks[n]["changes"] == ["modified", "outputs"]
assert (
tasks[n]["data"]["outputs"] == output
), f"the data received from the database is {tasks[n]}, expected new output is {output}"
assert tasks[n]["action"] == "UPDATE"
assert tasks[n]["table"] == "comp_tasks"
assert tasks[n]["task_id"] == task["task_id"]
assert tasks[n]["project_id"] == task["project_id"]
assert tasks[n]["node_id"] == task["node_id"]
34 changes: 30 additions & 4 deletions packages/pytest-simcore/src/pytest_simcore/db_entries_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import pytest
import sqlalchemy as sa
from faker import Faker
from models_library.projects import ProjectAtDB
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID
from simcore_postgres_database.models.comp_pipeline import StateType, comp_pipeline
from simcore_postgres_database.models.comp_tasks import comp_tasks
from simcore_postgres_database.models.projects import ProjectType, projects
from simcore_postgres_database.models.users import UserRole, UserStatus, users
from simcore_postgres_database.utils_projects_nodes import (
Expand Down Expand Up @@ -142,9 +143,8 @@ def creator(**pipeline_kwargs) -> dict[str, Any]:
.values(**pipeline_config)
.returning(sa.literal_column("*"))
)
new_pipeline = result.first()
assert new_pipeline
new_pipeline = dict(new_pipeline)
row = result.one()
new_pipeline = row._asdict()
created_pipeline_ids.append(new_pipeline["project_id"])
return new_pipeline

Expand All @@ -157,3 +157,29 @@ def creator(**pipeline_kwargs) -> dict[str, Any]:
comp_pipeline.c.project_id.in_(created_pipeline_ids)
)
)


@pytest.fixture
def comp_task(postgres_db: sa.engine.Engine) -> Iterator[Callable[..., dict[str, Any]]]:
created_task_ids: list[int] = []

def creator(project_id: ProjectID, **task_kwargs) -> dict[str, Any]:
task_config = {"project_id": f"{project_id}"} | task_kwargs
with postgres_db.connect() as conn:
result = conn.execute(
comp_tasks.insert()
.values(**task_config)
.returning(sa.literal_column("*"))
)
row = result.one()
new_task = row._asdict()
created_task_ids.append(new_task["task_id"])
return new_task

yield creator

# cleanup
with postgres_db.connect() as conn:
conn.execute(
comp_tasks.delete().where(comp_tasks.c.task_id.in_(created_task_ids))
)
Loading
Loading