99
1010import pytest
1111from aiopg .sa .engine import Engine , SAConnection
12- from aiopg .sa .result import RowProxy
1312from simcore_postgres_database .models .comp_pipeline import StateType
1413from simcore_postgres_database .models .comp_tasks import (
1514 DB_CHANNEL_NAME ,
2019
2120
2221@pytest .fixture ()
23- async def db_connection (aiopg_engine : Engine ) -> SAConnection :
22+ async def db_connection (aiopg_engine : Engine ) -> AsyncIterator [ SAConnection ] :
2423 async with aiopg_engine .acquire () as conn :
2524 yield conn
2625
@@ -31,6 +30,7 @@ async def db_notification_queue(
3130) -> AsyncIterator [asyncio .Queue ]:
3231 listen_query = f"LISTEN { DB_CHANNEL_NAME } ;"
3332 await db_connection .execute (listen_query )
33+ assert db_connection .connection
3434 notifications_queue : asyncio .Queue = db_connection .connection .notifies
3535 assert notifications_queue .empty ()
3636 yield notifications_queue
@@ -51,7 +51,8 @@ async def task(
5151 .values (outputs = json .dumps ({}), node_class = task_class )
5252 .returning (literal_column ("*" ))
5353 )
54- row : RowProxy = await result .fetchone ()
54+ row = await result .fetchone ()
55+ assert row
5556 task = dict (row )
5657
5758 assert (
@@ -73,8 +74,15 @@ async def _assert_notification_queue_status(
7374
7475 assert msg , "notification msg from postgres is empty!"
7576 task_data = json .loads (msg .payload )
76-
77- for k in ["table" , "changes" , "action" , "data" ]:
77+ expected_keys = [
78+ "task_id" ,
79+ "project_id" ,
80+ "node_id" ,
81+ "changes" ,
82+ "action" ,
83+ "table" ,
84+ ]
85+ for k in expected_keys :
7886 assert k in task_data , f"invalid structure, expected [{ k } ] in { task_data } "
7987
8088 tasks .append (task_data )
@@ -110,20 +118,27 @@ async def test_listen_query(
110118 )
111119 tasks = await _assert_notification_queue_status (db_notification_queue , 1 )
112120 assert tasks [0 ]["changes" ] == ["modified" , "outputs" , "state" ]
121+ assert tasks [0 ]["action" ] == "UPDATE"
122+ assert tasks [0 ]["table" ] == "comp_tasks"
123+ assert tasks [0 ]["task_id" ] == task ["task_id" ]
124+ assert tasks [0 ]["project_id" ] == task ["project_id" ]
125+ assert tasks [0 ]["node_id" ] == task ["node_id" ]
126+
113127 assert (
114- tasks [ 0 ][ "data" ][ "outputs" ] == updated_output
115- ), f"the data received from the database is { tasks [ 0 ] } , expected new output is { updated_output } "
128+ "data" not in tasks [ 0 ]
129+ ), " data is not expected in the notification payload anymore "
116130
117131 # setting the exact same data twice triggers only ONCE
118132 updated_output = {"some new stuff" : "it is newer" }
119133 await _update_comp_task_with (db_connection , task , outputs = updated_output )
120134 await _update_comp_task_with (db_connection , task , outputs = updated_output )
121135 tasks = await _assert_notification_queue_status (db_notification_queue , 1 )
122136 assert tasks [0 ]["changes" ] == ["modified" , "outputs" ]
123- assert (
124- tasks [0 ]["data" ]["outputs" ] == updated_output
125- ), f"the data received from the database is { tasks [0 ]} , expected new output is { updated_output } "
126-
137+ assert tasks [0 ]["action" ] == "UPDATE"
138+ assert tasks [0 ]["table" ] == "comp_tasks"
139+ assert tasks [0 ]["task_id" ] == task ["task_id" ]
140+ assert tasks [0 ]["project_id" ] == task ["project_id" ]
141+ assert tasks [0 ]["node_id" ] == task ["node_id" ]
127142 # updating a number of times with different stuff comes out in FIFO order
128143 NUM_CALLS = 20
129144 update_outputs = []
@@ -135,7 +150,10 @@ async def test_listen_query(
135150 tasks = await _assert_notification_queue_status (db_notification_queue , NUM_CALLS )
136151
137152 for n , output in enumerate (update_outputs ):
153+ assert output
138154 assert tasks [n ]["changes" ] == ["modified" , "outputs" ]
139- assert (
140- tasks [n ]["data" ]["outputs" ] == output
141- ), f"the data received from the database is { tasks [n ]} , expected new output is { output } "
155+ assert tasks [0 ]["action" ] == "UPDATE"
156+ assert tasks [0 ]["table" ] == "comp_tasks"
157+ assert tasks [0 ]["task_id" ] == task ["task_id" ]
158+ assert tasks [0 ]["project_id" ] == task ["project_id" ]
159+ assert tasks [0 ]["node_id" ] == task ["node_id" ]
0 commit comments