-
Notifications
You must be signed in to change notification settings - Fork 164
feat(BA-4127): Add exponential backoff for Redis consumer reconnection #8387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Implement exponential backoff strategy in RedisConsumer to prevent CPU spinning during Redis connection failures. Changes: - Add _BackoffState dataclass to track per-stream backoff state - Add backoff configuration to RedisConsumerArgs (initial_delay, max_delay, max_attempts) - Implement _handle_backoff() with exponential calculation and jitter - Implement _reset_backoff() to reset state on successful operations - Apply backoff in _read_messages_loop() and _auto_claim_loop() on GlideError and generic Exception Backoff progression: 0.1s → 0.2s → 0.4s → ... → max 30s Jitter: 50-100% of calculated delay to prevent thundering herd Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds an exponential backoff strategy to RedisConsumer to avoid CPU spinning during Redis connection failures.
Changes:
- Introduces per-stream backoff state tracking via
_BackoffState. - Adds backoff configuration fields to
RedisConsumerArgs. - Applies backoff delays on
GlideErrorand generic exceptions in read and autoclaim loops.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| src/ai/backend/common/message_queue/redis_queue/consumer.py | Adds backoff configuration/state and applies exponential backoff with jitter on reconnect/error paths. |
| changes/8387.feature.md | Adds a changelog entry for the new backoff behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._reset_backoff(stream_key) | ||
| continue |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backoff is only reset when claimed is truthy, but a successful autoclaim that returns claimed == False still indicates the operation succeeded. If there were prior errors, the consumer will keep using an inflated backoff even though the connection has recovered. Consider calling _reset_backoff(stream_key) after a successful autoclaim request regardless of whether any messages were claimed (i.e., outside the if claimed: block).
| self._reset_backoff(stream_key) | |
| continue | |
| self._reset_backoff(stream_key) |
| stream_key, self._group_name, message.msg_id, message.payload | ||
| ) | ||
|
|
||
| async def _handle_backoff(self, stream_key: str) -> None: |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR adds backoff_max_attempts to RedisConsumerArgs, but _handle_backoff() never enforces it. This makes the configuration misleading and prevents callers from bounding retries. Consider checking self._backoff_max_attempts after state.increment() and, when exceeded, either raise (to stop the loop), close the consumer, or log and rethrow the last exception so the task fails deterministically.
| if stream_key not in self._backoff_state: | ||
| self._backoff_state[stream_key] = _BackoffState() | ||
|
|
||
| state = self._backoff_state[stream_key] | ||
| state.increment() | ||
|
|
||
| # Calculate delay with exponential backoff | ||
| delay = min( | ||
| self._backoff_initial_delay * (2 ** (state.attempt - 1)), | ||
| self._backoff_max_delay, | ||
| ) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR adds backoff_max_attempts to RedisConsumerArgs, but _handle_backoff() never enforces it. This makes the configuration misleading and prevents callers from bounding retries. Consider checking self._backoff_max_attempts after state.increment() and, when exceeded, either raise (to stop the loop), close the consumer, or log and rethrow the last exception so the task fails deterministically.
| await asyncio.sleep(actual_delay) | ||
|
|
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backoff sleep does not account for shutdown. If self._closed becomes true while sleeping, the task won’t observe it until the sleep completes, which can delay shutdown by up to backoff_max_delay. Consider short-circuiting before sleeping when closed, or sleeping in a way that can be interrupted (e.g., waiting on a shutdown event / task cancellation-aware wait).
| await asyncio.sleep(actual_delay) | |
| # Sleep in small chunks so that shutdown (_closed) can be observed promptly. | |
| if getattr(self, "_closed", False): | |
| return | |
| loop = asyncio.get_running_loop() | |
| deadline = loop.time() + actual_delay | |
| while True: | |
| if getattr(self, "_closed", False): | |
| return | |
| remaining = deadline - loop.time() | |
| if remaining <= 0: | |
| break | |
| # Sleep for at most 1 second at a time to re-check shutdown state. | |
| try: | |
| await asyncio.sleep(min(1.0, remaining)) | |
| except asyncio.CancelledError: | |
| # Allow task cancellation to propagate for cooperative shutdown. | |
| raise |
| last_error_time: float = 0.0 | ||
|
|
||
| def increment(self) -> None: | ||
| """Increment attempt counter and record error time.""" | ||
| self.attempt += 1 | ||
| self.last_error_time = time.monotonic() | ||
|
|
||
| def reset(self) -> None: | ||
| """Reset backoff state after successful operation.""" | ||
| self.attempt = 0 | ||
| self.last_error_time = 0.0 |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last_error_time is written but (in this diff) never read, which adds state and an extra dependency (time.monotonic()) without affecting behavior. Either remove last_error_time until it’s needed, or use it (e.g., for logging, metrics, or to avoid backoff increments within a small time window).
| last_error_time: float = 0.0 | |
| def increment(self) -> None: | |
| """Increment attempt counter and record error time.""" | |
| self.attempt += 1 | |
| self.last_error_time = time.monotonic() | |
| def reset(self) -> None: | |
| """Reset backoff state after successful operation.""" | |
| self.attempt = 0 | |
| self.last_error_time = 0.0 | |
| def increment(self) -> None: | |
| """Increment attempt counter after an error.""" | |
| self.attempt += 1 | |
| def reset(self) -> None: | |
| """Reset backoff state after successful operation.""" | |
| self.attempt = 0 |
Implement exponential backoff strategy in RedisConsumer to prevent CPU spinning during Redis connection failures.
Changes:
max_delay, max_attempts)
Backoff progression: 0.1s → 0.2s → 0.4s → ... → max 30s Jitter: 50-100% of calculated delay to prevent thundering herd
Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com
resolves #NNN (BA-MMM)
Checklist: (if applicable)
ai.backend.testdocsdirectory