|
19 | 19 | from common_library.json_serialization import json_dumps |
20 | 20 | from faker import Faker |
21 | 21 | from models_library.projects_nodes import Node |
| 22 | +from models_library.projects_nodes_io import NodeID |
22 | 23 | from models_library.projects_state import RunningState |
23 | 24 | from pydantic import TypeAdapter |
24 | 25 | from pytest_mock import MockerFixture |
@@ -605,13 +606,19 @@ async def test_running_computation_sends_progress_updates_via_socketio( |
605 | 606 | mocker: MockerFixture, |
606 | 607 | ): |
607 | 608 | assert client.app |
608 | | - socket_io_conn, _ = await create_socketio_connection(None, client) |
| 609 | + socket_io_conn, client_id = await create_socketio_connection(None, client) |
609 | 610 | mock_node_updated_handler = mocker.MagicMock() |
610 | 611 | socket_io_conn.on(SOCKET_IO_NODE_UPDATED_EVENT, handler=mock_node_updated_handler) |
611 | 612 |
|
612 | 613 | project_id = user_project["uuid"] |
613 | 614 |
|
614 | | - # NOTE: no need to open the project, since the messages are sent to the user groups |
| 615 | + # NOTE: we need to open the project so that the computation pipeline messages are transmitted and we get all the node updates |
| 616 | + url_open = client.app.router["open_project"].url_for(project_id=project_id) |
| 617 | + assert url_open == URL(f"/{API_VTAG}/projects/{project_id}:open") |
| 618 | + resp = await client.post(f"{url_open}", json=client_id) |
| 619 | + data, error = await assert_status(resp, expected.ok) |
| 620 | + |
| 621 | + # start the computation |
615 | 622 | url_start = client.app.router["start_computation"].url_for(project_id=project_id) |
616 | 623 | assert url_start == URL(f"/{API_VTAG}/computations/{project_id}:start") |
617 | 624 |
|
@@ -677,6 +684,20 @@ async def test_running_computation_sends_progress_updates_via_socketio( |
677 | 684 | f"Received updates for: {received_progress_node_ids}" |
678 | 685 | ) |
679 | 686 |
|
680 | | - print( |
681 | | - f"✓ Successfully received progress updates for all {len(computational_node_ids)} computational nodes: {computational_node_ids}" |
682 | | - ) |
| 687 | + # check that a node update was sent for each computational node at the end that unlocks the node |
| 688 | + node_id_data_map: dict[NodeID, list[Node]] = {} |
| 689 | + for mock_call in mock_node_updated_handler.call_args_list: |
| 690 | + node_id = NodeID(mock_call[0][0]["node_id"]) |
| 691 | + node_data = TypeAdapter(Node).validate_python(mock_call[0][0]["data"]) |
| 692 | + node_id_data_map.setdefault(node_id, []).append(node_data) |
| 693 | + |
| 694 | + for node_id, node_data_list in node_id_data_map.items(): |
| 695 | + # find the last update for this node |
| 696 | + last_node_data = node_data_list[-1] |
| 697 | + assert last_node_data.state |
| 698 | + assert last_node_data.state.current_status == RunningState.SUCCESS |
| 699 | + assert last_node_data.state.lock_state |
| 700 | + assert last_node_data.state.lock_state.locked is False, ( |
| 701 | + f"expected node {node_id} to be unlocked at the end of the pipeline, " |
| 702 | + "but it is still locked." |
| 703 | + ) |
0 commit comments