Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion taskiq_redis/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def __init__(
approximate: bool = True,
idle_timeout: int = 600000, # 10 minutes
unacknowledged_batch_size: int = 100,
unacknowledged_lock_timeout: int = 30,
xread_count: Optional[int] = 100,
additional_streams: Optional[Dict[str, str]] = None,
**connection_kwargs: Any,
Expand Down Expand Up @@ -196,8 +197,10 @@ def __init__(
:param xread_count: number of messages to fetch from the stream at once.
:param additional_streams: additional streams to read from.
Each key is a stream name, value is a consumer id.
:param redeliver_timeout: time in ms to wait before redelivering a message.
:param unacknowledged_batch_size: number of unacknowledged messages to fetch.
:param unacknowledged_lock_timeout: time in seconds before auto-releasing
the lock. Useful when the worker crashes or gets killed.
Set to a bigger value if your tasks take a long time to complete.
"""
super().__init__(
url,
Expand All @@ -217,6 +220,7 @@ def __init__(
self.additional_streams = additional_streams or {}
self.idle_timeout = idle_timeout
self.unacknowledged_batch_size = unacknowledged_batch_size
self.unacknowledged_lock_timeout = unacknowledged_lock_timeout
self.count = xread_count

async def _declare_consumer_group(self) -> None:
Expand Down Expand Up @@ -298,6 +302,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
for stream in [self.queue_name, *self.additional_streams.keys()]:
lock = redis_conn.lock(
f"autoclaim:{self.consumer_group_name}:{stream}",
timeout=self.unacknowledged_lock_timeout,
)
if await lock.locked():
continue
Expand Down