diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 663c8e4..5c8e7e8 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -169,6 +169,7 @@ def __init__( approximate: bool = True, idle_timeout: int = 600000, # 10 minutes unacknowledged_batch_size: int = 100, + unacknowledged_lock_timeout: int = 30, xread_count: Optional[int] = 100, additional_streams: Optional[Dict[str, str]] = None, **connection_kwargs: Any, @@ -196,8 +197,10 @@ def __init__( :param xread_count: number of messages to fetch from the stream at once. :param additional_streams: additional streams to read from. Each key is a stream name, value is a consumer id. - :param redeliver_timeout: time in ms to wait before redelivering a message. :param unacknowledged_batch_size: number of unacknowledged messages to fetch. + :param unacknowledged_lock_timeout: time in seconds before auto-releasing + the lock. Useful when the worker crashes or gets killed. + Set to a bigger value if your tasks take a long time to complete. """ super().__init__( url, @@ -217,6 +220,7 @@ def __init__( self.additional_streams = additional_streams or {} self.idle_timeout = idle_timeout self.unacknowledged_batch_size = unacknowledged_batch_size + self.unacknowledged_lock_timeout = unacknowledged_lock_timeout self.count = xread_count async def _declare_consumer_group(self) -> None: @@ -298,6 +302,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: for stream in [self.queue_name, *self.additional_streams.keys()]: lock = redis_conn.lock( f"autoclaim:{self.consumer_group_name}:{stream}", + timeout=self.unacknowledged_lock_timeout, ) if await lock.locked(): continue