33import asyncio
44import hashlib
55import logging
6+ import random
67import socket
8+ import time
79from collections .abc import AsyncGenerator , Iterable
810from dataclasses import dataclass
911from typing import Optional , Self , override
2830_DEFAULT_READ_COUNT = 64
2931
3032
33+ @dataclass
34+ class _BackoffState :
35+ """Tracks exponential backoff state for a stream."""
36+
37+ attempt : int = 0
38+ last_error_time : float = 0.0
39+
40+ def increment (self ) -> None :
41+ """Increment attempt counter and record error time."""
42+ self .attempt += 1
43+ self .last_error_time = time .monotonic ()
44+
45+ def reset (self ) -> None :
46+ """Reset backoff state after successful operation."""
47+ self .attempt = 0
48+ self .last_error_time = 0.0
49+
50+
3151@dataclass
3252class RedisConsumerArgs :
3353 stream_keys : Iterable [str ]
@@ -36,6 +56,9 @@ class RedisConsumerArgs:
3656 db : int = 0
3757 autoclaim_idle_timeout : int = _DEFAULT_AUTOCLAIM_IDLE_TIMEOUT
3858 autoclaim_start_id : Optional [str ] = None
59+ backoff_initial_delay : float = 0.1 # 100ms first retry
60+ backoff_max_delay : float = 30.0 # cap at 30 seconds
61+ backoff_max_attempts : Optional [int ] = None # None = infinite retry
3962
4063
4164class RedisConsumer (AbstractConsumer ):
@@ -55,6 +78,10 @@ class RedisConsumer(AbstractConsumer):
5578 _autoclaim_idle_timeout : int
5679 _closed : bool
5780 _loop_tasks : list [asyncio .Task ]
81+ _backoff_initial_delay : float
82+ _backoff_max_delay : float
83+ _backoff_max_attempts : Optional [int ]
84+ _backoff_state : dict [str , _BackoffState ]
5885
5986 def __init__ (
6087 self ,
@@ -83,6 +110,12 @@ def __init__(
83110 self ._autoclaim_idle_timeout = args .autoclaim_idle_timeout
84111 self ._closed = False
85112
113+ # Backoff configuration
114+ self ._backoff_initial_delay = args .backoff_initial_delay
115+ self ._backoff_max_delay = args .backoff_max_delay
116+ self ._backoff_max_attempts = args .backoff_max_attempts
117+ self ._backoff_state = {}
118+
86119 start_id = args .autoclaim_start_id or "0-0"
87120 self ._loop_tasks = []
88121
@@ -221,6 +254,7 @@ async def _read_messages_loop(self, stream_key: str) -> None:
221254 while not self ._closed :
222255 try :
223256 await self ._read_messages (client , stream_key )
257+ self ._reset_backoff (stream_key )
224258 except glide .ClosingError :
225259 log .info (
226260 "Client connection closed, stopping read messages loop for stream {}" ,
@@ -229,8 +263,10 @@ async def _read_messages_loop(self, stream_key: str) -> None:
229263 break
230264 except glide .GlideError as e :
231265 await self ._failover_consumer (stream_key , e )
266+ await self ._handle_backoff (stream_key )
232267 except Exception as e :
233268 log .error ("Error while reading messages from stream {}: {}" , stream_key , e )
269+ await self ._handle_backoff (stream_key )
234270 finally :
235271 await client .close ()
236272
@@ -277,6 +313,7 @@ async def _auto_claim_loop(
277313 )
278314 if claimed :
279315 autoclaim_start_id = next_start_id
316+ self ._reset_backoff (stream_key )
280317 continue
281318 except glide .TimeoutError :
282319 # If the auto claim times out, we just continue to the next iteration
@@ -288,10 +325,14 @@ async def _auto_claim_loop(
288325 break
289326 except glide .GlideError as e :
290327 await self ._failover_consumer (stream_key , e )
328+ await self ._handle_backoff (stream_key )
329+ continue
291330 except Exception as e :
292331 log .exception (
293332 "Error while auto claiming messages from stream {}: {}" , stream_key , e
294333 )
334+ await self ._handle_backoff (stream_key )
335+ continue
295336
296337 await asyncio .sleep (_DEFAULT_AUTOCLAIM_INTERVAL / 1000 )
297338
@@ -342,6 +383,42 @@ async def _retry_message(self, stream_key: str, message: MQMessage) -> None:
342383 stream_key , self ._group_name , message .msg_id , message .payload
343384 )
344385
386+ async def _handle_backoff (self , stream_key : str ) -> None :
387+ """
388+ Handle exponential backoff for a stream.
389+
390+ Increments attempt counter, calculates delay with jitter, and sleeps.
391+
392+ Args:
393+ stream_key: The Redis stream key experiencing connection issues
394+ """
395+ if stream_key not in self ._backoff_state :
396+ self ._backoff_state [stream_key ] = _BackoffState ()
397+
398+ state = self ._backoff_state [stream_key ]
399+ state .increment ()
400+
401+ # Calculate delay with exponential backoff
402+ delay = min (
403+ self ._backoff_initial_delay * (2 ** (state .attempt - 1 )),
404+ self ._backoff_max_delay ,
405+ )
406+
407+ # Add jitter (50-100% of calculated delay)
408+ actual_delay = delay * (0.5 + random .random () * 0.5 )
409+
410+ await asyncio .sleep (actual_delay )
411+
412+ def _reset_backoff (self , stream_key : str ) -> None :
413+ """
414+ Reset backoff state for a stream after successful operation.
415+
416+ Args:
417+ stream_key: The Redis stream key that successfully completed operation
418+ """
419+ if stream_key in self ._backoff_state :
420+ self ._backoff_state [stream_key ].reset ()
421+
345422 async def _failover_consumer (self , stream_key : str , e : Exception ) -> None :
346423 """
347424 Handle consumer failover scenarios.
0 commit comments