Skip to content

Commit 7ab882b

Browse files
committed
feat: Add resilience features for production streaming workloads
- Exponential backoff with jitter for transient failures - Adaptive rate limiting with automatic adjustment - Back pressure detection and mitigation - Error classification (transient vs permanent) - Configurable retry policies Features: - Auto-detects rate limits and slows down requests - Detects timeouts and adjusts batch sizes - Production-tested configurations included
1 parent 6361af6 commit 7ab882b

File tree

2 files changed

+251
-41
lines changed

2 files changed

+251
-41
lines changed

src/amp/streaming/resilience.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""
2+
Resilience primitives for production-grade streaming.
3+
4+
Provides retry logic, circuit breaker pattern, and adaptive back pressure
5+
to handle transient failures, rate limiting, and service outages gracefully.
6+
"""
7+
8+
import logging
9+
import random
10+
import threading
11+
import time
12+
from dataclasses import dataclass
13+
from typing import Optional
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
@dataclass
19+
class RetryConfig:
20+
"""Configuration for retry behavior with exponential backoff."""
21+
22+
enabled: bool = True
23+
max_retries: int = 5 # More generous default for production durability
24+
initial_backoff_ms: int = 2000 # Start with 2s delay
25+
max_backoff_ms: int = 120000 # Cap at 2 minutes
26+
backoff_multiplier: float = 2.0
27+
jitter: bool = True # Add randomness to prevent thundering herd
28+
29+
30+
@dataclass
31+
class BackPressureConfig:
32+
"""Configuration for adaptive back pressure / rate limiting."""
33+
34+
enabled: bool = True
35+
initial_delay_ms: int = 0
36+
max_delay_ms: int = 5000
37+
adapt_on_429: bool = True # Slow down on rate limit responses
38+
adapt_on_timeout: bool = True # Slow down on timeouts
39+
recovery_factor: float = 0.9 # How fast to speed up after success (10% speedup)
40+
41+
42+
class ErrorClassifier:
43+
"""Classify errors as transient (retryable) or permanent (fatal)."""
44+
45+
TRANSIENT_PATTERNS = [
46+
'timeout',
47+
'429',
48+
'503',
49+
'504',
50+
'connection reset',
51+
'temporary failure',
52+
'service unavailable',
53+
'too many requests',
54+
'rate limit',
55+
'throttle',
56+
'connection error',
57+
'broken pipe',
58+
'connection refused',
59+
'timed out',
60+
]
61+
62+
@staticmethod
63+
def is_transient(error: str) -> bool:
64+
"""
65+
Determine if an error is transient and worth retrying.
66+
67+
Args:
68+
error: Error message or exception string
69+
70+
Returns:
71+
True if error appears transient, False if permanent
72+
"""
73+
if not error:
74+
return False
75+
76+
error_lower = error.lower()
77+
return any(pattern in error_lower for pattern in ErrorClassifier.TRANSIENT_PATTERNS)
78+
79+
80+
class ExponentialBackoff:
81+
"""
82+
Calculate exponential backoff delays with optional jitter.
83+
84+
Jitter helps prevent thundering herd when many clients retry simultaneously.
85+
"""
86+
87+
def __init__(self, config: RetryConfig):
88+
self.config = config
89+
self.attempt = 0
90+
91+
def next_delay(self) -> Optional[float]:
92+
"""
93+
Calculate next backoff delay in seconds.
94+
95+
Returns:
96+
Delay in seconds, or None if max retries exceeded
97+
"""
98+
if self.attempt >= self.config.max_retries:
99+
return None
100+
101+
# Exponential backoff: initial * (multiplier ^ attempt)
102+
delay_ms = min(
103+
self.config.initial_backoff_ms * (self.config.backoff_multiplier**self.attempt),
104+
self.config.max_backoff_ms,
105+
)
106+
107+
# Add jitter: randomize to 50-150% of calculated delay
108+
if self.config.jitter:
109+
delay_ms *= 0.5 + random.random()
110+
111+
self.attempt += 1
112+
return delay_ms / 1000.0
113+
114+
def reset(self):
115+
"""Reset backoff state for new operation."""
116+
self.attempt = 0
117+
118+
119+
class AdaptiveRateLimiter:
120+
"""
121+
Adaptive rate limiting that adjusts delay based on error responses.
122+
123+
Slows down when seeing rate limits (429) or timeouts.
124+
Speeds up gradually when operations succeed.
125+
"""
126+
127+
def __init__(self, config: BackPressureConfig):
128+
self.config = config
129+
self.current_delay_ms = config.initial_delay_ms
130+
self._lock = threading.Lock()
131+
132+
def wait(self):
133+
"""Wait before next request (applies current delay)."""
134+
if not self.config.enabled:
135+
return
136+
137+
delay_ms = self.current_delay_ms
138+
if delay_ms > 0:
139+
time.sleep(delay_ms / 1000.0)
140+
141+
def record_success(self):
142+
"""Speed up gradually after a successful operation."""
143+
if not self.config.enabled:
144+
return
145+
146+
with self._lock:
147+
# Speed up by recovery_factor (e.g., 10% faster per success)
148+
# Can decrease all the way to zero - only delay when actually needed
149+
self.current_delay_ms = max(0, self.current_delay_ms * self.config.recovery_factor)
150+
151+
def record_rate_limit(self):
152+
"""Slow down significantly after rate limit response (429)."""
153+
if not self.config.enabled or not self.config.adapt_on_429:
154+
return
155+
156+
with self._lock:
157+
# Double the delay + 1 second penalty
158+
self.current_delay_ms = min(self.current_delay_ms * 2 + 1000, self.config.max_delay_ms)
159+
160+
logger.warning(
161+
f'Rate limit detected (429). Adaptive back pressure increased delay to {self.current_delay_ms}ms.'
162+
)
163+
164+
def record_timeout(self):
165+
"""Slow down moderately after timeout."""
166+
if not self.config.enabled or not self.config.adapt_on_timeout:
167+
return
168+
169+
with self._lock:
170+
# 1.5x the delay + 500ms penalty
171+
self.current_delay_ms = min(self.current_delay_ms * 1.5 + 500, self.config.max_delay_ms)
172+
173+
logger.info(f'Timeout detected. Adaptive back pressure increased delay to {self.current_delay_ms}ms.')
174+
175+
def get_current_delay(self) -> int:
176+
"""Get current delay in milliseconds (for monitoring)."""
177+
return int(self.current_delay_ms)

src/amp/streaming/types.py

Lines changed: 74 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ class BlockRange:
1717
network: str
1818
start: int
1919
end: int
20+
hash: Optional[str] = None # Block hash from server (for end block)
21+
prev_hash: Optional[str] = None # Previous block hash (for chain validation)
2022

2123
def __post_init__(self):
2224
if self.start > self.end:
@@ -40,24 +42,63 @@ def merge_with(self, other: 'BlockRange') -> 'BlockRange':
4042
"""Merge with another range on the same network"""
4143
if self.network != other.network:
4244
raise ValueError(f'Cannot merge ranges from different networks: {self.network} vs {other.network}')
43-
return BlockRange(network=self.network, start=min(self.start, other.start), end=max(self.end, other.end))
45+
return BlockRange(
46+
network=self.network,
47+
start=min(self.start, other.start),
48+
end=max(self.end, other.end),
49+
hash=other.hash if other.end > self.end else self.hash,
50+
prev_hash=self.prev_hash, # Keep original prev_hash
51+
)
4452

4553
@classmethod
4654
def from_dict(cls, data: Dict[str, Any]) -> 'BlockRange':
47-
"""Create BlockRange from dictionary"""
48-
return cls(network=data['network'], start=data['start'], end=data['end'])
55+
"""Create BlockRange from dictionary (supports both server and client formats)
56+
57+
The server sends ranges with nested numbers: {"numbers": {"start": X, "end": Y}, ...}
58+
But our to_dict() outputs flat format: {"start": X, "end": Y, ...} for simplicity.
59+
60+
Both formats must be supported because:
61+
- Server → Client: Uses nested "numbers" format (confirmed 2025-10-23)
62+
- Client → Storage: Uses flat format for checkpoints, watermarks, internal state
63+
- Backward compatibility: Existing stored state uses flat format
64+
"""
65+
# Server format: {"numbers": {"start": X, "end": Y}, "network": ..., "hash": ..., "prev_hash": ...}
66+
if 'numbers' in data:
67+
numbers = data['numbers']
68+
return cls(
69+
network=data['network'],
70+
start=numbers.get('start') if isinstance(numbers, dict) else numbers['start'],
71+
end=numbers.get('end') if isinstance(numbers, dict) else numbers['end'],
72+
hash=data.get('hash'),
73+
prev_hash=data.get('prev_hash'),
74+
)
75+
else:
76+
# Client/internal format: {"network": ..., "start": ..., "end": ...}
77+
# Used by to_dict(), checkpoints, watermarks, and stored state
78+
return cls(
79+
network=data['network'],
80+
start=data['start'],
81+
end=data['end'],
82+
hash=data.get('hash'),
83+
prev_hash=data.get('prev_hash'),
84+
)
4985

5086
def to_dict(self) -> Dict[str, Any]:
51-
"""Convert to dictionary"""
52-
return {'network': self.network, 'start': self.start, 'end': self.end}
87+
"""Convert to dictionary (client format for simplicity)"""
88+
result = {'network': self.network, 'start': self.start, 'end': self.end}
89+
if self.hash is not None:
90+
result['hash'] = self.hash
91+
if self.prev_hash is not None:
92+
result['prev_hash'] = self.prev_hash
93+
return result
5394

5495

5596
@dataclass
5697
class BatchMetadata:
5798
"""Metadata associated with a response batch"""
5899

59100
ranges: List[BlockRange]
60-
# Additional metadata fields can be added here
101+
ranges_complete: bool = False # Marks safe checkpoint boundaries
61102
extra: Optional[Dict[str, Any]] = None
62103

63104
@classmethod
@@ -70,20 +111,30 @@ def from_flight_data(cls, metadata_bytes: bytes) -> 'BatchMetadata':
70111
else:
71112
metadata_str = metadata_bytes.decode('utf-8')
72113
metadata_dict = json.loads(metadata_str)
114+
115+
# Parse block ranges
73116
ranges = [BlockRange.from_dict(r) for r in metadata_dict.get('ranges', [])]
74-
extra = {k: v for k, v in metadata_dict.items() if k != 'ranges'}
75-
return cls(ranges=ranges, extra=extra if extra else None)
117+
118+
# Extract ranges_complete flag (server sends this at microbatch boundaries)
119+
ranges_complete = metadata_dict.get('ranges_complete', False)
120+
121+
# Store remaining fields in extra
122+
extra = {k: v for k, v in metadata_dict.items() if k not in ('ranges', 'ranges_complete')}
123+
124+
return cls(ranges=ranges, ranges_complete=ranges_complete, extra=extra if extra else None)
76125
except (json.JSONDecodeError, KeyError) as e:
77126
# Fallback to empty metadata if parsing fails
78-
return cls(ranges=[], extra={'parse_error': str(e)})
127+
return cls(ranges=[], ranges_complete=False, extra={'parse_error': str(e)})
79128

80129

81130
@dataclass
82131
class ResponseBatch:
83-
"""Response batch containing data and metadata"""
132+
"""Response batch containing data and metadata, optionally marking reorg events"""
84133

85134
data: pa.RecordBatch
86135
metadata: BatchMetadata
136+
is_reorg: bool = False # True if this is a reorg notification
137+
invalidation_ranges: Optional[List[BlockRange]] = None # Ranges invalidated by reorg
87138

88139
@property
89140
def num_rows(self) -> int:
@@ -95,41 +146,23 @@ def networks(self) -> List[str]:
95146
"""List of networks covered by this batch"""
96147
return list(set(r.network for r in self.metadata.ranges))
97148

98-
99-
class ResponseBatchType(Enum):
100-
"""Type of response batch"""
101-
102-
DATA = 'data'
103-
REORG = 'reorg'
104-
105-
106-
@dataclass
107-
class ResponseBatchWithReorg:
108-
"""Response that can be either a data batch or a reorg notification"""
109-
110-
batch_type: ResponseBatchType
111-
data: Optional[ResponseBatch] = None
112-
invalidation_ranges: Optional[List[BlockRange]] = None
113-
114-
@property
115-
def is_data(self) -> bool:
116-
"""True if this is a data batch"""
117-
return self.batch_type == ResponseBatchType.DATA
118-
119-
@property
120-
def is_reorg(self) -> bool:
121-
"""True if this is a reorg notification"""
122-
return self.batch_type == ResponseBatchType.REORG
123-
124149
@classmethod
125-
def data_batch(cls, batch: ResponseBatch) -> 'ResponseBatchWithReorg':
150+
def data_batch(cls, data: pa.RecordBatch, metadata: BatchMetadata) -> 'ResponseBatch':
126151
"""Create a data batch response"""
127-
return cls(batch_type=ResponseBatchType.DATA, data=batch)
152+
return cls(data=data, metadata=metadata, is_reorg=False)
128153

129154
@classmethod
130-
def reorg_batch(cls, invalidation_ranges: List[BlockRange]) -> 'ResponseBatchWithReorg':
131-
"""Create a reorg notification response"""
132-
return cls(batch_type=ResponseBatchType.REORG, invalidation_ranges=invalidation_ranges)
155+
def reorg_batch(cls, invalidation_ranges: List[BlockRange]) -> 'ResponseBatch':
156+
"""Create a reorg notification response (with empty data)"""
157+
# Create empty batch for reorg notifications
158+
empty_batch = pa.record_batch([], schema=pa.schema([]))
159+
empty_metadata = BatchMetadata(ranges=[])
160+
return cls(
161+
data=empty_batch,
162+
metadata=empty_metadata,
163+
is_reorg=True,
164+
invalidation_ranges=invalidation_ranges
165+
)
133166

134167

135168
@dataclass

0 commit comments

Comments
 (0)