File tree Expand file tree Collapse file tree 1 file changed +7
-18
lines changed
services/dask-sidecar/src/simcore_service_dask_sidecar Expand file tree Collapse file tree 1 file changed +7
-18
lines changed Original file line number Diff line number Diff line change @@ -39,25 +39,14 @@ async def _process_messages(self) -> None:
3939 assert self ._message_queue is not None # nosec
4040 assert self ._client is not None # nosec
4141
42- _logger .info ("Starting message processor for RabbitMQ" )
43- try :
42+ with log_context (_logger , logging .INFO , "RabbitMQ message processor" ):
4443 while True :
45- # Get message from queue
46- exchange_name , message_data = await self ._message_queue .get ()
47-
48- try :
49- # Publish to RabbitMQ
50- await self ._client .publish (exchange_name , message_data )
51- except Exception as e :
52- _logger .exception ("Failed to publish message: %s" , str (e ))
53- finally :
54- # Mark task as done
55- self ._message_queue .task_done ()
56- except asyncio .CancelledError :
57- _logger .info ("RabbitMQ message processor shutting down" )
58- raise
59- except Exception :
60- _logger .exception ("Unexpected error in RabbitMQ message processor" )
44+ with log_catch (_logger , reraise = False ):
45+ exchange_name , message_data = await self ._message_queue .get ()
46+ try :
47+ await self ._client .publish (exchange_name , message_data )
48+ finally :
49+ self ._message_queue .task_done ()
6150
6251 def setup (self , worker : distributed .Worker ) -> Awaitable [None ]:
6352 """Called when the plugin is attached to a worker"""
You can’t perform that action at this time.
0 commit comments