2222from servicelib .utils import logged_gather
2323
2424from ..projects import _projects_service
25- from ..projects .exceptions import ProjectNotFoundError
2625from ..rabbitmq import get_rabbitmq_client
2726from ..socketio .messages import (
2827 SOCKET_IO_EVENT ,
2928 SOCKET_IO_LOG_EVENT ,
30- SOCKET_IO_NODE_UPDATED_EVENT ,
3129 SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT ,
3230 send_message_to_standard_group ,
3331 send_message_to_user ,
4341APP_WALLET_SUBSCRIPTION_LOCK_KEY : Final [str ] = "wallet_subscription_lock"
4442
4543
46- async def _convert_to_node_update_event (
44+ async def _notify_comp_node_progress (
4745 app : web .Application , message : ProgressRabbitMessageNode
48- ) -> SocketMessageDict | None :
49- try :
46+ ) -> None :
47+ with log_catch ( _logger , reraise = False ) :
5048 project = await _projects_service .get_project_for_user (
51- app , f"{ message .project_id } " , message .user_id
49+ app , f"{ message .project_id } " , message .user_id , include_state = True
50+ )
51+ await _projects_service .notify_project_node_update (
52+ app , project , message .node_id , None
5253 )
53- if f"{ message .node_id } " in project ["workbench" ]:
54- # update the project node progress with the latest value
55- project ["workbench" ][f"{ message .node_id } " ].update (
56- {"progress" : round (message .report .percent_value * 100.0 )}
57- )
58- return SocketMessageDict (
59- event_type = SOCKET_IO_NODE_UPDATED_EVENT ,
60- data = {
61- "project_id" : message .project_id ,
62- "node_id" : message .node_id ,
63- "data" : project ["workbench" ][f"{ message .node_id } " ],
64- },
65- )
66- _logger .warning ("node not found: '%s'" , message .model_dump ())
67- except ProjectNotFoundError :
68- _logger .warning ("project not found: '%s'" , message .model_dump ())
69- return None
7054
7155
7256async def _progress_message_parser (app : web .Application , data : bytes ) -> bool :
@@ -80,10 +64,9 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
8064 message = WebSocketProjectProgress .from_rabbit_message (
8165 rabbit_message
8266 ).to_socket_dict ()
83-
8467 elif rabbit_message .progress_type is ProgressType .COMPUTATION_RUNNING :
85- message = await _convert_to_node_update_event (app , rabbit_message )
86-
68+ await _notify_comp_node_progress (app , rabbit_message )
69+ return True
8770 else :
8871 message = WebSocketNodeProgress .from_rabbit_message (
8972 rabbit_message
0 commit comments