Skip to content

Commit 4bb653b

Browse files
committed
streaming: Add resilience primitives
- RetryConfig: Configurable retry behavior with backoff - ExponentialBackoff: Calculate delays with optional jitter - BackPressureConfig: Adaptive rate limiting configuration - AdaptiveRateLimiter: Auto-adjust delays based on responses - ErrorClassifier: Identify transient vs permanent errors
1 parent b174c20 commit 4bb653b

File tree

3 files changed

+879
-0
lines changed

3 files changed

+879
-0
lines changed

src/amp/streaming/resilience.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""
2+
Resilience primitives for production-grade streaming.
3+
4+
Provides retry logic, circuit breaker pattern, and adaptive back pressure
5+
to handle transient failures, rate limiting, and service outages gracefully.
6+
"""
7+
8+
import logging
9+
import random
10+
import threading
11+
import time
12+
from dataclasses import dataclass
13+
from typing import Optional
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
@dataclass
19+
class RetryConfig:
20+
"""Configuration for retry behavior with exponential backoff."""
21+
22+
enabled: bool = True
23+
max_retries: int = 5 # More generous default for production durability
24+
initial_backoff_ms: int = 2000 # Start with 2s delay
25+
max_backoff_ms: int = 120000 # Cap at 2 minutes
26+
backoff_multiplier: float = 2.0
27+
jitter: bool = True # Add randomness to prevent thundering herd
28+
29+
30+
@dataclass
31+
class BackPressureConfig:
32+
"""Configuration for adaptive back pressure / rate limiting."""
33+
34+
enabled: bool = True
35+
initial_delay_ms: int = 0
36+
max_delay_ms: int = 5000
37+
adapt_on_429: bool = True # Slow down on rate limit responses
38+
adapt_on_timeout: bool = True # Slow down on timeouts
39+
recovery_factor: float = 0.9 # How fast to speed up after success (10% speedup)
40+
41+
42+
class ErrorClassifier:
43+
"""Classify errors as transient (retryable) or permanent (fatal)."""
44+
45+
TRANSIENT_PATTERNS = [
46+
'timeout',
47+
'429',
48+
'503',
49+
'504',
50+
'connection reset',
51+
'temporary failure',
52+
'service unavailable',
53+
'too many requests',
54+
'rate limit',
55+
'throttle',
56+
'connection error',
57+
'broken pipe',
58+
'connection refused',
59+
'timed out',
60+
]
61+
62+
@staticmethod
63+
def is_transient(error: str) -> bool:
64+
"""
65+
Determine if an error is transient and worth retrying.
66+
67+
Args:
68+
error: Error message or exception string
69+
70+
Returns:
71+
True if error appears transient, False if permanent
72+
"""
73+
if not error:
74+
return False
75+
76+
error_lower = error.lower()
77+
return any(pattern in error_lower for pattern in ErrorClassifier.TRANSIENT_PATTERNS)
78+
79+
80+
class ExponentialBackoff:
81+
"""
82+
Calculate exponential backoff delays with optional jitter.
83+
84+
Jitter helps prevent thundering herd when many clients retry simultaneously.
85+
"""
86+
87+
def __init__(self, config: RetryConfig):
88+
self.config = config
89+
self.attempt = 0
90+
91+
def next_delay(self) -> Optional[float]:
92+
"""
93+
Calculate next backoff delay in seconds.
94+
95+
Returns:
96+
Delay in seconds, or None if max retries exceeded
97+
"""
98+
if self.attempt >= self.config.max_retries:
99+
return None
100+
101+
# Exponential backoff: initial * (multiplier ^ attempt)
102+
delay_ms = min(
103+
self.config.initial_backoff_ms * (self.config.backoff_multiplier**self.attempt),
104+
self.config.max_backoff_ms,
105+
)
106+
107+
# Add jitter: randomize to 50-150% of calculated delay
108+
if self.config.jitter:
109+
delay_ms *= 0.5 + random.random()
110+
111+
self.attempt += 1
112+
return delay_ms / 1000.0
113+
114+
def reset(self):
115+
"""Reset backoff state for new operation."""
116+
self.attempt = 0
117+
118+
119+
class AdaptiveRateLimiter:
120+
"""
121+
Adaptive rate limiting that adjusts delay based on error responses.
122+
123+
Slows down when seeing rate limits (429) or timeouts.
124+
Speeds up gradually when operations succeed.
125+
"""
126+
127+
def __init__(self, config: BackPressureConfig):
128+
self.config = config
129+
self.current_delay_ms = config.initial_delay_ms
130+
self._lock = threading.Lock()
131+
132+
def wait(self):
133+
"""Wait before next request (applies current delay)."""
134+
if not self.config.enabled:
135+
return
136+
137+
delay_ms = self.current_delay_ms
138+
if delay_ms > 0:
139+
time.sleep(delay_ms / 1000.0)
140+
141+
def record_success(self):
142+
"""Speed up gradually after a successful operation."""
143+
if not self.config.enabled:
144+
return
145+
146+
with self._lock:
147+
# Speed up by recovery_factor (e.g., 10% faster per success)
148+
# Can decrease all the way to zero - only delay when actually needed
149+
self.current_delay_ms = max(0, self.current_delay_ms * self.config.recovery_factor)
150+
151+
def record_rate_limit(self):
152+
"""Slow down significantly after rate limit response (429)."""
153+
if not self.config.enabled or not self.config.adapt_on_429:
154+
return
155+
156+
with self._lock:
157+
# Double the delay + 1 second penalty
158+
self.current_delay_ms = min(self.current_delay_ms * 2 + 1000, self.config.max_delay_ms)
159+
160+
logger.warning(
161+
f'Rate limit detected (429). Adaptive back pressure increased delay to {self.current_delay_ms}ms.'
162+
)
163+
164+
def record_timeout(self):
165+
"""Slow down moderately after timeout."""
166+
if not self.config.enabled or not self.config.adapt_on_timeout:
167+
return
168+
169+
with self._lock:
170+
# 1.5x the delay + 500ms penalty
171+
self.current_delay_ms = min(self.current_delay_ms * 1.5 + 500, self.config.max_delay_ms)
172+
173+
logger.info(f'Timeout detected. Adaptive back pressure increased delay to {self.current_delay_ms}ms.')
174+
175+
def get_current_delay(self) -> int:
176+
"""Get current delay in milliseconds (for monitoring)."""
177+
return int(self.current_delay_ms)

0 commit comments

Comments
 (0)