11import logging
2+ from collections .abc import Awaitable
23
34import distributed
45from models_library .rabbitmq_messages import RabbitMessageBase
@@ -21,32 +22,42 @@ class RabbitMQPlugin(distributed.WorkerPlugin):
2122 def __init__ (self , settings : RabbitSettings ):
2223 self ._settings = settings
2324
24- async def setup (self , worker : distributed .Worker ) -> None :
25+ def setup (self , worker : distributed .Worker ) -> Awaitable [ None ] :
2526 """Called when the plugin is attached to a worker"""
26- if not self ._settings :
27- _logger .warning ("RabbitMQ client is de-activated (no settings provided)" )
28- return
29-
30- with log_context (
31- _logger ,
32- logging .INFO ,
33- f"RabbitMQ client initialization for worker { worker .address } " ,
34- ):
35- await wait_till_rabbitmq_responsive (self ._settings .dsn )
36- self ._client = RabbitMQClient (
37- client_name = "dask-sidecar" , settings = self ._settings
38- )
3927
40- async def teardown (self , worker : distributed .Worker ) -> None :
28+ async def _ () -> None :
29+ if not self ._settings :
30+ _logger .warning (
31+ "RabbitMQ client is de-activated (no settings provided)"
32+ )
33+ return
34+
35+ with log_context (
36+ _logger ,
37+ logging .INFO ,
38+ f"RabbitMQ client initialization for worker { worker .address } " ,
39+ ):
40+ await wait_till_rabbitmq_responsive (self ._settings .dsn )
41+ self ._client = RabbitMQClient (
42+ client_name = "dask-sidecar" , settings = self ._settings
43+ )
44+
45+ return _ ()
46+
47+ def teardown (self , worker : distributed .Worker ) -> Awaitable [None ]:
4148 """Called when the worker shuts down or the plugin is removed"""
42- with log_context (
43- _logger ,
44- logging .INFO ,
45- f"RabbitMQ client teardown for worker { worker .address } " ,
46- ):
47- if self ._client :
48- await self ._client .close ()
49- self ._client = None
49+
50+ async def _ () -> None :
51+ with log_context (
52+ _logger ,
53+ logging .INFO ,
54+ f"RabbitMQ client teardown for worker { worker .address } " ,
55+ ):
56+ if self ._client :
57+ await self ._client .close ()
58+ self ._client = None
59+
60+ return _ ()
5061
5162 def get_client (self ) -> RabbitMQClient :
5263 """Returns the RabbitMQ client or raises an error if not available"""
0 commit comments