|
2 | 2 | import logging |
3 | 3 | from asyncio import AbstractEventLoop |
4 | 4 | from collections.abc import Awaitable |
| 5 | +from typing import Final |
5 | 6 |
|
6 | 7 | import distributed |
7 | 8 | from servicelib.logging_utils import log_catch, log_context |
|
12 | 13 |
|
13 | 14 | _logger = logging.getLogger(__name__) |
14 | 15 |
|
| 16 | +_RABBITMQ_CONFIGURATION_ERROR: Final[str] = ( |
| 17 | + "RabbitMQ client is not available. Please check the configuration." |
| 18 | +) |
| 19 | + |
15 | 20 |
|
16 | 21 | class RabbitMQPlugin(distributed.WorkerPlugin): |
17 | 22 | """Dask Worker Plugin for RabbitMQ integration""" |
@@ -73,21 +78,15 @@ async def _() -> None: |
73 | 78 | def get_client(self) -> RabbitMQClient: |
74 | 79 | """Returns the RabbitMQ client or raises an error if not available""" |
75 | 80 | if not self._client: |
76 | | - raise ConfigurationError( |
77 | | - msg="RabbitMQ client is not available. Please check the configuration." |
78 | | - ) |
| 81 | + raise ConfigurationError(msg=_RABBITMQ_CONFIGURATION_ERROR) |
79 | 82 | return self._client |
80 | 83 |
|
81 | 84 |
|
82 | 85 | def get_rabbitmq_client(worker: distributed.Worker) -> RabbitMQClient: |
83 | 86 | """Returns the RabbitMQ client or raises an error if not available""" |
84 | 87 | if not worker.plugins: |
85 | | - raise ConfigurationError( |
86 | | - msg="RabbitMQ client is not available. Please check the configuration." |
87 | | - ) |
| 88 | + raise ConfigurationError(msg=_RABBITMQ_CONFIGURATION_ERROR) |
88 | 89 | rabbitmq_plugin = worker.plugins.get(RabbitMQPlugin.name) |
89 | 90 | if not isinstance(rabbitmq_plugin, RabbitMQPlugin): |
90 | | - raise ConfigurationError( |
91 | | - msg="RabbitMQ client is not available. Please check the configuration." |
92 | | - ) |
| 91 | + raise ConfigurationError(msg=_RABBITMQ_CONFIGURATION_ERROR) |
93 | 92 | return rabbitmq_plugin.get_client() |
0 commit comments