Skip to content

Commit a295c9a

Browse files
committed
change procedure
1 parent 89f3caf commit a295c9a

File tree

2 files changed

+142
-4
lines changed

2 files changed

+142
-4
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
"""remove whole row in payload
2+
3+
Revision ID: 278daef7e99d
4+
Revises: 4e7d8719855b
5+
Create Date: 2025-05-22 21:22:11.084001+00:00
6+
7+
"""
8+
9+
from typing import Final
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "278daef7e99d"
16+
down_revision = "4e7d8719855b"
17+
branch_labels = None
18+
depends_on = None
19+
20+
DB_PROCEDURE_NAME: Final[str] = "notify_comp_tasks_changed"
21+
DB_TRIGGER_NAME: Final[str] = f"{DB_PROCEDURE_NAME}_event"
22+
DB_CHANNEL_NAME: Final[str] = "comp_tasks_output_events"
23+
24+
25+
def upgrade():
26+
drop_trigger = sa.DDL(
27+
f"""
28+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
29+
"""
30+
)
31+
32+
task_output_changed_procedure = sa.DDL(
33+
f"""
34+
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$
35+
DECLARE
36+
record RECORD;
37+
payload JSON;
38+
changes JSONB;
39+
BEGIN
40+
IF (TG_OP = 'DELETE') THEN
41+
record = OLD;
42+
ELSE
43+
record = NEW;
44+
END IF;
45+
46+
SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes
47+
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
48+
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;
49+
50+
payload = json_build_object(
51+
'table', TG_TABLE_NAME,
52+
'changes', changes,
53+
'action', TG_OP,
54+
'task_id', record.task_id,
55+
'project_id', record.project_id,
56+
'node_id', record.node_id
57+
);
58+
59+
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);
60+
61+
RETURN NULL;
62+
END;
63+
$$ LANGUAGE plpgsql;
64+
"""
65+
)
66+
67+
task_output_changed_trigger = sa.DDL(
68+
f"""
69+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
70+
CREATE TRIGGER {DB_TRIGGER_NAME}
71+
AFTER UPDATE OF outputs,state ON comp_tasks
72+
FOR EACH ROW
73+
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state))
74+
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}();
75+
"""
76+
)
77+
78+
op.execute(drop_trigger)
79+
op.execute(task_output_changed_procedure)
80+
op.execute(task_output_changed_trigger)
81+
82+
83+
def downgrade():
84+
drop_trigger = sa.DDL(
85+
f"""
86+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
87+
"""
88+
)
89+
90+
task_output_changed_procedure = sa.DDL(
91+
f"""
92+
CREATE OR REPLACE FUNCTION {DB_PROCEDURE_NAME}() RETURNS TRIGGER AS $$
93+
DECLARE
94+
record RECORD;
95+
payload JSON;
96+
changes JSONB;
97+
BEGIN
98+
IF (TG_OP = 'DELETE') THEN
99+
record = OLD;
100+
ELSE
101+
record = NEW;
102+
END IF;
103+
104+
SELECT jsonb_agg(pre.key ORDER BY pre.key) INTO changes
105+
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
106+
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;
107+
108+
payload = json_build_object('table', TG_TABLE_NAME,
109+
'changes', changes,
110+
'action', TG_OP,
111+
'data', row_to_json(record));
112+
113+
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);
114+
115+
RETURN NULL;
116+
END;
117+
$$ LANGUAGE plpgsql;
118+
"""
119+
)
120+
121+
task_output_changed_trigger = sa.DDL(
122+
f"""
123+
DROP TRIGGER IF EXISTS {DB_TRIGGER_NAME} on comp_tasks;
124+
CREATE TRIGGER {DB_TRIGGER_NAME}
125+
AFTER UPDATE OF outputs,state ON comp_tasks
126+
FOR EACH ROW
127+
WHEN ((OLD.outputs::jsonb IS DISTINCT FROM NEW.outputs::jsonb OR OLD.state IS DISTINCT FROM NEW.state))
128+
EXECUTE PROCEDURE {DB_PROCEDURE_NAME}();
129+
"""
130+
)
131+
132+
op.execute(drop_trigger)
133+
op.execute(task_output_changed_procedure)
134+
op.execute(task_output_changed_trigger)

packages/postgres-database/src/simcore_postgres_database/models/comp_tasks.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,14 @@ class NodeClass(enum.Enum):
152152
FROM jsonb_each(to_jsonb(OLD)) AS pre, jsonb_each(to_jsonb(NEW)) AS post
153153
WHERE pre.key = post.key AND pre.value IS DISTINCT FROM post.value;
154154
155-
payload = json_build_object('table', TG_TABLE_NAME,
156-
'changes', changes,
157-
'action', TG_OP,
158-
'data', row_to_json(record));
155+
payload = json_build_object(
156+
'table', TG_TABLE_NAME,
157+
'changes', changes,
158+
'action', TG_OP,
159+
'task_id', record.task_id,
160+
'project_id', record.project_id,
161+
'node_id', record.node_id
162+
);
159163
160164
PERFORM pg_notify('{DB_CHANNEL_NAME}', payload::text);
161165

0 commit comments

Comments
 (0)