@@ -169,6 +169,7 @@ def __init__(
169169 approximate : bool = True ,
170170 idle_timeout : int = 600000 , # 10 minutes
171171 unacknowledged_batch_size : int = 100 ,
172+ unacknowledged_lock_timeout : int = 30 ,
172173 xread_count : Optional [int ] = 100 ,
173174 additional_streams : Optional [Dict [str , str ]] = None ,
174175 ** connection_kwargs : Any ,
@@ -196,8 +197,10 @@ def __init__(
196197 :param xread_count: number of messages to fetch from the stream at once.
197198 :param additional_streams: additional streams to read from.
198199 Each key is a stream name, value is a consumer id.
199- :param redeliver_timeout: time in ms to wait before redelivering a message.
200200 :param unacknowledged_batch_size: number of unacknowledged messages to fetch.
201+ :param unacknowledged_lock_timeout: time in seconds before auto-releasing
202+ the lock. Useful when the worker crashes or gets killed.
203+ Set to a bigger value if your tasks take a long time to complete.
201204 """
202205 super ().__init__ (
203206 url ,
@@ -217,6 +220,7 @@ def __init__(
217220 self .additional_streams = additional_streams or {}
218221 self .idle_timeout = idle_timeout
219222 self .unacknowledged_batch_size = unacknowledged_batch_size
223+ self .unacknowledged_lock_timeout = unacknowledged_lock_timeout
220224 self .count = xread_count
221225
222226 async def _declare_consumer_group (self ) -> None :
@@ -298,6 +302,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
298302 for stream in [self .queue_name , * self .additional_streams .keys ()]:
299303 lock = redis_conn .lock (
300304 f"autoclaim:{ self .consumer_group_name } :{ stream } " ,
305+ timeout = self .unacknowledged_lock_timeout ,
301306 )
302307 if await lock .locked ():
303308 continue
0 commit comments