-
Notifications
You must be signed in to change notification settings - Fork 25
Open
Description
Hi,
I'm trying to integrate/evaluate taskiq-redis in some software where all configuration is saved in a configuration object that is only available in some setup method that is called async. Therefor I had a problem initializing the broker since it requires the url in the initialization.
Thus, it would be great if it would be possible to set the url using a configuration method.
As a proof of concept I had AI create such a method, and it seams to work. I'll attach the POC and won't do a merge request since it's not well tested and might need some more improvements.
Attaching the poc:
class BaseRedisBroker(AsyncBroker):
"""Base broker that works with Redis."""
def __init__(
self,
url: Optional[str] = None,
task_id_generator: Optional[Callable[[], str]] = None,
result_backend: Optional[AsyncResultBackend[_T]] = None,
queue_name: str = "taskiq",
max_connection_pool_size: Optional[int] = None,
**connection_kwargs: Any,
) -> None:
"""
Constructs a new broker.
:param url: url to redis (optional, can be set later).
:param task_id_generator: custom task_id generator.
:param result_backend: custom result backend.
:param queue_name: name for a list in redis.
:param max_connection_pool_size: maximum number of connections in pool.
Each worker opens its own connection. Therefore this value has to be
at least number of workers + 1.
:param connection_kwargs: additional arguments for redis BlockingConnectionPool.
"""
super().__init__(
result_backend=result_backend,
task_id_generator=task_id_generator,
)
self._redis_url = url
self._max_connection_pool_size = max_connection_pool_size
self._connection_kwargs = connection_kwargs
self.queue_name = queue_name
self._connection_pool: Optional[_BlockingConnectionPool] = None
if url:
self._create_connection_pool()
def _create_connection_pool(self) -> None:
"""Create connection pool with current settings."""
if not self._redis_url:
raise ValueError("Redis URL must be set before creating connection pool")
self._connection_pool = BlockingConnectionPool.from_url(
url=self._redis_url,
max_connections=self._max_connection_pool_size,
**self._connection_kwargs,
)
@property
def connection_pool(self) -> _BlockingConnectionPool:
"""Get connection pool, creating it if necessary."""
if self._connection_pool is None:
self._create_connection_pool()
return self._connection_pool
def set_redis_url(self, url: str) -> None:
"""
Set Redis URL after initialization.
:param url: Redis connection URL
"""
if self._connection_pool is not None:
raise RuntimeError("Cannot change Redis URL after connection pool has been created")
self._redis_url = url
async def shutdown(self) -> None:
"""Closes redis connection pool."""
await super().shutdown()
if self._connection_pool:
await self._connection_pool.disconnect()
Metadata
Metadata
Assignees
Labels
No labels