Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/8387.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add exponential backoff for Redis consumer reconnection
77 changes: 77 additions & 0 deletions src/ai/backend/common/message_queue/redis_queue/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import asyncio
import hashlib
import logging
import random
import socket
import time
from collections.abc import AsyncGenerator, Iterable
from dataclasses import dataclass
from typing import Optional, Self, override
Expand All @@ -28,6 +30,24 @@
_DEFAULT_READ_COUNT = 64


@dataclass
class _BackoffState:
"""Tracks exponential backoff state for a stream."""

attempt: int = 0
last_error_time: float = 0.0

def increment(self) -> None:
"""Increment attempt counter and record error time."""
self.attempt += 1
self.last_error_time = time.monotonic()

def reset(self) -> None:
"""Reset backoff state after successful operation."""
self.attempt = 0
self.last_error_time = 0.0
Comment on lines +38 to +48
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_error_time is written but (in this diff) never read, which adds state and an extra dependency (time.monotonic()) without affecting behavior. Either remove last_error_time until it’s needed, or use it (e.g., for logging, metrics, or to avoid backoff increments within a small time window).

Suggested change
last_error_time: float = 0.0
def increment(self) -> None:
"""Increment attempt counter and record error time."""
self.attempt += 1
self.last_error_time = time.monotonic()
def reset(self) -> None:
"""Reset backoff state after successful operation."""
self.attempt = 0
self.last_error_time = 0.0
def increment(self) -> None:
"""Increment attempt counter after an error."""
self.attempt += 1
def reset(self) -> None:
"""Reset backoff state after successful operation."""
self.attempt = 0

Copilot uses AI. Check for mistakes.


@dataclass
class RedisConsumerArgs:
stream_keys: Iterable[str]
Expand All @@ -36,6 +56,9 @@ class RedisConsumerArgs:
db: int = 0
autoclaim_idle_timeout: int = _DEFAULT_AUTOCLAIM_IDLE_TIMEOUT
autoclaim_start_id: Optional[str] = None
backoff_initial_delay: float = 0.1 # 100ms first retry
backoff_max_delay: float = 30.0 # cap at 30 seconds
backoff_max_attempts: Optional[int] = None # None = infinite retry


class RedisConsumer(AbstractConsumer):
Expand All @@ -55,6 +78,10 @@ class RedisConsumer(AbstractConsumer):
_autoclaim_idle_timeout: int
_closed: bool
_loop_tasks: list[asyncio.Task]
_backoff_initial_delay: float
_backoff_max_delay: float
_backoff_max_attempts: Optional[int]
_backoff_state: dict[str, _BackoffState]

def __init__(
self,
Expand Down Expand Up @@ -83,6 +110,12 @@ def __init__(
self._autoclaim_idle_timeout = args.autoclaim_idle_timeout
self._closed = False

# Backoff configuration
self._backoff_initial_delay = args.backoff_initial_delay
self._backoff_max_delay = args.backoff_max_delay
self._backoff_max_attempts = args.backoff_max_attempts
self._backoff_state = {}

start_id = args.autoclaim_start_id or "0-0"
self._loop_tasks = []

Expand Down Expand Up @@ -221,6 +254,7 @@ async def _read_messages_loop(self, stream_key: str) -> None:
while not self._closed:
try:
await self._read_messages(client, stream_key)
self._reset_backoff(stream_key)
except glide.ClosingError:
log.info(
"Client connection closed, stopping read messages loop for stream {}",
Expand All @@ -229,8 +263,10 @@ async def _read_messages_loop(self, stream_key: str) -> None:
break
except glide.GlideError as e:
await self._failover_consumer(stream_key, e)
await self._handle_backoff(stream_key)
except Exception as e:
log.error("Error while reading messages from stream {}: {}", stream_key, e)
await self._handle_backoff(stream_key)
finally:
await client.close()

Expand Down Expand Up @@ -277,6 +313,7 @@ async def _auto_claim_loop(
)
if claimed:
autoclaim_start_id = next_start_id
self._reset_backoff(stream_key)
continue
Comment on lines +316 to 317
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backoff is only reset when claimed is truthy, but a successful autoclaim that returns claimed == False still indicates the operation succeeded. If there were prior errors, the consumer will keep using an inflated backoff even though the connection has recovered. Consider calling _reset_backoff(stream_key) after a successful autoclaim request regardless of whether any messages were claimed (i.e., outside the if claimed: block).

Suggested change
self._reset_backoff(stream_key)
continue
self._reset_backoff(stream_key)

Copilot uses AI. Check for mistakes.
except glide.TimeoutError:
# If the auto claim times out, we just continue to the next iteration
Expand All @@ -288,10 +325,14 @@ async def _auto_claim_loop(
break
except glide.GlideError as e:
await self._failover_consumer(stream_key, e)
await self._handle_backoff(stream_key)
continue
except Exception as e:
log.exception(
"Error while auto claiming messages from stream {}: {}", stream_key, e
)
await self._handle_backoff(stream_key)
continue

await asyncio.sleep(_DEFAULT_AUTOCLAIM_INTERVAL / 1000)

Expand Down Expand Up @@ -342,6 +383,42 @@ async def _retry_message(self, stream_key: str, message: MQMessage) -> None:
stream_key, self._group_name, message.msg_id, message.payload
)

async def _handle_backoff(self, stream_key: str) -> None:
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR adds backoff_max_attempts to RedisConsumerArgs, but _handle_backoff() never enforces it. This makes the configuration misleading and prevents callers from bounding retries. Consider checking self._backoff_max_attempts after state.increment() and, when exceeded, either raise (to stop the loop), close the consumer, or log and rethrow the last exception so the task fails deterministically.

Copilot uses AI. Check for mistakes.
"""
Handle exponential backoff for a stream.

Increments attempt counter, calculates delay with jitter, and sleeps.

Args:
stream_key: The Redis stream key experiencing connection issues
"""
if stream_key not in self._backoff_state:
self._backoff_state[stream_key] = _BackoffState()

state = self._backoff_state[stream_key]
state.increment()

# Calculate delay with exponential backoff
delay = min(
self._backoff_initial_delay * (2 ** (state.attempt - 1)),
self._backoff_max_delay,
)
Comment on lines +395 to +405
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR adds backoff_max_attempts to RedisConsumerArgs, but _handle_backoff() never enforces it. This makes the configuration misleading and prevents callers from bounding retries. Consider checking self._backoff_max_attempts after state.increment() and, when exceeded, either raise (to stop the loop), close the consumer, or log and rethrow the last exception so the task fails deterministically.

Copilot uses AI. Check for mistakes.

# Add jitter (50-100% of calculated delay)
actual_delay = delay * (0.5 + random.random() * 0.5)

await asyncio.sleep(actual_delay)

Comment on lines +410 to +411
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backoff sleep does not account for shutdown. If self._closed becomes true while sleeping, the task won’t observe it until the sleep completes, which can delay shutdown by up to backoff_max_delay. Consider short-circuiting before sleeping when closed, or sleeping in a way that can be interrupted (e.g., waiting on a shutdown event / task cancellation-aware wait).

Suggested change
await asyncio.sleep(actual_delay)
# Sleep in small chunks so that shutdown (_closed) can be observed promptly.
if getattr(self, "_closed", False):
return
loop = asyncio.get_running_loop()
deadline = loop.time() + actual_delay
while True:
if getattr(self, "_closed", False):
return
remaining = deadline - loop.time()
if remaining <= 0:
break
# Sleep for at most 1 second at a time to re-check shutdown state.
try:
await asyncio.sleep(min(1.0, remaining))
except asyncio.CancelledError:
# Allow task cancellation to propagate for cooperative shutdown.
raise

Copilot uses AI. Check for mistakes.
def _reset_backoff(self, stream_key: str) -> None:
"""
Reset backoff state for a stream after successful operation.

Args:
stream_key: The Redis stream key that successfully completed operation
"""
if stream_key in self._backoff_state:
self._backoff_state[stream_key].reset()

async def _failover_consumer(self, stream_key: str, e: Exception) -> None:
"""
Handle consumer failover scenarios.
Expand Down
Loading