@@ -383,48 +383,62 @@ async def _handle_data_stream(
383383 Args:
384384 comp_id: Id of the requested component.
385385 category: The category of the component.
386+
387+ Raises:
388+ Exception: if an error occurs while handling the data stream.
386389 """
387- stream_senders = []
388- if comp_id in self ._req_streaming_metrics :
389- await self ._check_requested_component_and_metrics (
390- comp_id , category , self ._req_streaming_metrics [comp_id ]
390+ try :
391+ stream_senders = []
392+ if comp_id in self ._req_streaming_metrics :
393+ await self ._check_requested_component_and_metrics (
394+ comp_id , category , self ._req_streaming_metrics [comp_id ]
395+ )
396+ stream_senders = self ._get_metric_senders (
397+ category , self ._req_streaming_metrics [comp_id ]
398+ )
399+ api_data_receiver : Receiver [Any ] = self .comp_data_receivers [comp_id ]
400+
401+ senders_done : asyncio .Event = asyncio .Event ()
402+ pending_messages = 0
403+
404+ def process_msg (data : Any ) -> None :
405+ tasks = []
406+ for extractor , senders in stream_senders :
407+ for sender in senders :
408+ tasks .append (
409+ sender .send (
410+ Sample (data .timestamp , Quantity (extractor (data )))
411+ )
412+ )
413+ asyncio .gather (* tasks )
414+ nonlocal pending_messages
415+ pending_messages -= 1
416+ if pending_messages == 0 :
417+ senders_done .set ()
418+
419+ async for data in api_data_receiver :
420+ pending_messages += 1
421+ senders_done .clear ()
422+ process_msg (data )
423+
424+ while pending_messages > 0 :
425+ await senders_done .wait ()
426+
427+ await asyncio .gather (
428+ * [
429+ self ._registry .close_and_remove (r .get_channel_name ())
430+ for requests in self ._req_streaming_metrics [comp_id ].values ()
431+ for r in requests
432+ ]
391433 )
392- stream_senders = self ._get_metric_senders (
393- category , self ._req_streaming_metrics [comp_id ]
434+ except Exception :
435+ _logger .exception (
436+ "Unexpected error while handling data stream for component %d (%s), "
437+ "component data is not being streamed anymore" ,
438+ comp_id ,
439+ category .name ,
394440 )
395- api_data_receiver : Receiver [Any ] = self .comp_data_receivers [comp_id ]
396-
397- senders_done : asyncio .Event = asyncio .Event ()
398- pending_messages = 0
399-
400- def process_msg (data : Any ) -> None :
401- tasks = []
402- for extractor , senders in stream_senders :
403- for sender in senders :
404- tasks .append (
405- sender .send (Sample (data .timestamp , Quantity (extractor (data ))))
406- )
407- asyncio .gather (* tasks )
408- nonlocal pending_messages
409- pending_messages -= 1
410- if pending_messages == 0 :
411- senders_done .set ()
412-
413- async for data in api_data_receiver :
414- pending_messages += 1
415- senders_done .clear ()
416- process_msg (data )
417-
418- while pending_messages > 0 :
419- await senders_done .wait ()
420-
421- await asyncio .gather (
422- * [
423- self ._registry .close_and_remove (r .get_channel_name ())
424- for requests in self ._req_streaming_metrics [comp_id ].values ()
425- for r in requests
426- ]
427- )
441+ raise
428442
429443 async def _update_streams (
430444 self ,
0 commit comments