@@ -265,20 +265,24 @@ def retrieve_node_progress_from_decoded_message(
265265
266266@dataclass
267267class SocketIOProjectClosedWaiter :
268- logger : logging .Logger
269-
270268 def __call__ (self , message : str ) -> bool :
271- # socket.io encodes messages like so
272- # https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
273- if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
274- decoded_message = decode_socketio_42_message (message )
275- if (
276- (decoded_message .name == _OSparcMessages .PROJECT_STATE_UPDATED .value )
277- and (decoded_message .obj ["data" ]["shareState" ]["status" ] == "CLOSED" )
278- and (decoded_message .obj ["data" ]["shareState" ]["locked" ] is False )
279- ):
280- self .logger .info ("project successfully closed" )
281- return True
269+ with log_context (logging .DEBUG , msg = f"handling websocket { message = } " ) as ctx :
270+ # socket.io encodes messages like so
271+ # https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
272+ if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
273+ decoded_message = decode_socketio_42_message (message )
274+ if (
275+ (
276+ decoded_message .name
277+ == _OSparcMessages .PROJECT_STATE_UPDATED .value
278+ )
279+ and (
280+ decoded_message .obj ["data" ]["shareState" ]["status" ] == "CLOSED"
281+ )
282+ and (decoded_message .obj ["data" ]["shareState" ]["locked" ] is False )
283+ ):
284+ ctx .logger .info ("project successfully closed" )
285+ return True
282286
283287 return False
284288
@@ -304,42 +308,27 @@ def __call__(self, message: str) -> bool:
304308
305309@dataclass
306310class SocketIOWaitNodeForOutputs :
307- logger : logging .Logger
308311 expected_number_of_outputs : int
309312 node_id : str
310313
311314 def __call__ (self , message : str ) -> bool :
312- if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
313- decoded_message = decode_socketio_42_message (message )
314- if decoded_message .name == _OSparcMessages .NODE_UPDATED :
315- assert "data" in decoded_message .obj
316- assert "node_id" in decoded_message .obj
317- if decoded_message .obj ["node_id" ] == self .node_id :
318- assert "outputs" in decoded_message .obj ["data" ]
319-
320- return (
321- len (decoded_message .obj ["data" ]["outputs" ])
322- == self .expected_number_of_outputs
323- )
315+ with log_context (logging .DEBUG , msg = f"handling websocket { message = } " ):
316+ if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
317+ decoded_message = decode_socketio_42_message (message )
318+ if decoded_message .name == _OSparcMessages .NODE_UPDATED :
319+ assert "data" in decoded_message .obj
320+ assert "node_id" in decoded_message .obj
321+ if decoded_message .obj ["node_id" ] == self .node_id :
322+ assert "outputs" in decoded_message .obj ["data" ]
323+
324+ return (
325+ len (decoded_message .obj ["data" ]["outputs" ])
326+ == self .expected_number_of_outputs
327+ )
324328
325329 return False
326330
327331
328- @dataclass
329- class SocketIOOsparcMessagePrinter :
330- include_logger_messages : bool = False
331-
332- def __call__ (self , message : str ) -> None :
333- osparc_messages = [_ .value for _ in _OSparcMessages ]
334- if not self .include_logger_messages :
335- osparc_messages .pop (osparc_messages .index (_OSparcMessages .LOGGER .value ))
336-
337- if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
338- decoded_message : SocketIOEvent = decode_socketio_42_message (message )
339- if decoded_message .name in osparc_messages :
340- print ("WS Message:" , decoded_message .name , decoded_message .obj )
341-
342-
343332_FAIL_FAST_DYNAMIC_SERVICE_STATES : Final [tuple [str , ...]] = ("idle" , "failed" )
344333_SERVICE_ROOT_POINT_STATUS_TIMEOUT : Final [timedelta ] = timedelta (seconds = 30 )
345334
@@ -416,7 +405,6 @@ def _check_service_endpoint(
416405@dataclass
417406class SocketIONodeProgressCompleteWaiter :
418407 node_id : str
419- logger : logging .Logger
420408 max_idle_timeout : timedelta = _SOCKET_IO_NODE_PROGRESS_WAITER_MAX_IDLE_TIMEOUT
421409 _current_progress : dict [NodeProgressType , float ] = field (
422410 default_factory = defaultdict
@@ -426,71 +414,75 @@ class SocketIONodeProgressCompleteWaiter:
426414 _result : bool = False
427415
428416 def __call__ (self , message : str ) -> bool :
429- # socket.io encodes messages like so
430- # https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
431- if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
432- decoded_message = decode_socketio_42_message (message )
433- self ._received_messages .append (decoded_message )
434- if (
435- (decoded_message .name == _OSparcMessages .SERVICE_STATUS .value )
436- and (decoded_message .obj ["service_uuid" ] == self .node_id )
437- and (
438- decoded_message .obj ["service_state" ]
439- in _FAIL_FAST_DYNAMIC_SERVICE_STATES
440- )
441- ):
442- # NOTE: this is a fail fast for dynamic services that fail to start
443- self .logger .error (
444- "❌ node %s failed with state %s, failing fast ❌" ,
417+ with log_context (logging .DEBUG , msg = f"handling websocket { message = } " ) as ctx :
418+ # socket.io encodes messages like so
419+ # https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
420+ if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
421+ decoded_message = decode_socketio_42_message (message )
422+ self ._received_messages .append (decoded_message )
423+ if (
424+ (decoded_message .name == _OSparcMessages .SERVICE_STATUS .value )
425+ and (decoded_message .obj ["service_uuid" ] == self .node_id )
426+ and (
427+ decoded_message .obj ["service_state" ]
428+ in _FAIL_FAST_DYNAMIC_SERVICE_STATES
429+ )
430+ ):
431+ # NOTE: this is a fail fast for dynamic services that fail to start
432+ ctx .logger .error (
433+ "❌ node %s failed with state %s, failing fast ❌" ,
434+ self .node_id ,
435+ decoded_message .obj ["service_state" ],
436+ )
437+ self ._result = False
438+ return True
439+ if decoded_message .name == _OSparcMessages .NODE_PROGRESS .value :
440+ node_progress_event = retrieve_node_progress_from_decoded_message (
441+ decoded_message
442+ )
443+ if node_progress_event .node_id == self .node_id :
444+ new_progress = (
445+ node_progress_event .current_progress
446+ / node_progress_event .total_progress
447+ )
448+ self ._last_progress_time = datetime .now (UTC )
449+ if (
450+ node_progress_event .progress_type
451+ not in self ._current_progress
452+ ) or (
453+ new_progress
454+ != self ._current_progress [node_progress_event .progress_type ]
455+ ):
456+ self ._current_progress [
457+ node_progress_event .progress_type
458+ ] = new_progress
459+
460+ ctx .logger .info (
461+ "Current startup progress [expected %d types]: %s" ,
462+ len (
463+ NodeProgressType .required_types_for_started_service ()
464+ ),
465+ f"{ json .dumps ({k : round (v , 2 ) for k , v in self ._current_progress .items ()})} " ,
466+ )
467+
468+ done = self ._completed_successfully ()
469+ if done :
470+ self ._result = True # NOTE: might have failed but it is not sure. so we set the result to True
471+ ctx .logger .info ("✅ Service start completed successfully!! ✅" )
472+ return done
473+
474+ time_since_last_progress = datetime .now (UTC ) - self ._last_progress_time
475+ if time_since_last_progress > self .max_idle_timeout :
476+ ctx .logger .warning (
477+ "⚠️ %s passed since the last received progress message. "
478+ "The service %s might be stuck, or we missed some messages ⚠️" ,
479+ time_since_last_progress ,
445480 self .node_id ,
446- decoded_message .obj ["service_state" ],
447481 )
448- self ._result = False
482+ self ._result = True
449483 return True
450- if decoded_message .name == _OSparcMessages .NODE_PROGRESS .value :
451- node_progress_event = retrieve_node_progress_from_decoded_message (
452- decoded_message
453- )
454- if node_progress_event .node_id == self .node_id :
455- new_progress = (
456- node_progress_event .current_progress
457- / node_progress_event .total_progress
458- )
459- self ._last_progress_time = datetime .now (UTC )
460- if (
461- node_progress_event .progress_type not in self ._current_progress
462- ) or (
463- new_progress
464- != self ._current_progress [node_progress_event .progress_type ]
465- ):
466- self ._current_progress [node_progress_event .progress_type ] = (
467- new_progress
468- )
469484
470- self .logger .info (
471- "Current startup progress [expected %d types]: %s" ,
472- len (NodeProgressType .required_types_for_started_service ()),
473- f"{ json .dumps ({k : round (v , 2 ) for k , v in self ._current_progress .items ()})} " ,
474- )
475-
476- done = self ._completed_successfully ()
477- if done :
478- self ._result = True # NOTE: might have failed but it is not sure. so we set the result to True
479- self .logger .info ("✅ Service start completed successfully!! ✅" )
480- return done
481-
482- time_since_last_progress = datetime .now (UTC ) - self ._last_progress_time
483- if time_since_last_progress > self .max_idle_timeout :
484- self .logger .warning (
485- "⚠️ %s passed since the last received progress message. "
486- "The service %s might be stuck, or we missed some messages ⚠️" ,
487- time_since_last_progress ,
488- self .node_id ,
489- )
490- self ._result = True
491- return True
492-
493- return False
485+ return False
494486
495487 def _completed_successfully (self ) -> bool :
496488 return all (
@@ -631,7 +623,6 @@ def expected_service_running(
631623 else :
632624 waiter = SocketIONodeProgressCompleteWaiter (
633625 node_id = node_id ,
634- logger = ctx .logger ,
635626 max_idle_timeout = min (
636627 _SOCKET_IO_NODE_PROGRESS_WAITER_MAX_IDLE_TIMEOUT ,
637628 timedelta (seconds = timeout / 1000 - 10 ),
@@ -693,7 +684,6 @@ def wait_for_service_running(
693684 else :
694685 waiter = SocketIONodeProgressCompleteWaiter (
695686 node_id = node_id ,
696- logger = ctx .logger ,
697687 max_idle_timeout = min (
698688 _SOCKET_IO_NODE_PROGRESS_WAITER_MAX_IDLE_TIMEOUT ,
699689 timedelta (seconds = timeout / 1000 - 10 ),
0 commit comments