Skip to content

Commit 9896813

Browse files
bosdclaude
andcommitted
feat: add health-aware throttling module
Add adaptive throttling based on server response times: Server Health Levels: - HEALTHY: Normal operation, no throttling - DEGRADED: Slight slowdown, add small delays - STRESSED: Significant load, reduce batch sizes - OVERLOADED: Critical, aggressive throttling Features: - Rolling average response time monitoring - Automatic delay adjustment between requests - Dynamic batch size scaling based on health - Hysteresis for health recovery (prevents flapping) - Error recording for server errors (5xx) - Comprehensive statistics tracking Configuration: - Customizable thresholds for each health level - Configurable delays and batch multipliers - Aggressive mode for sensitive servers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 442f61c commit 9896813

File tree

2 files changed

+558
-0
lines changed

2 files changed

+558
-0
lines changed

src/odoo_data_flow/lib/throttle.py

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
"""Health-aware throttling module for adaptive batch processing.
2+
3+
This module provides functionality to monitor server health and automatically
4+
adjust batch sizes and delays to prevent overloading the Odoo server.
5+
"""
6+
7+
import time
8+
from dataclasses import dataclass
9+
from enum import Enum
10+
from typing import Optional
11+
12+
from ..logging_config import log
13+
14+
15+
class ServerHealth(Enum):
16+
"""Server health status levels (ordered by severity)."""
17+
18+
HEALTHY = 0
19+
DEGRADED = 1
20+
STRESSED = 2
21+
OVERLOADED = 3
22+
23+
24+
@dataclass
25+
class ThrottleConfig:
26+
"""Configuration for throttling behavior."""
27+
28+
# Response time thresholds (seconds)
29+
healthy_threshold: float = 2.0 # Below this = healthy
30+
degraded_threshold: float = 5.0 # Below this = degraded
31+
stressed_threshold: float = 10.0 # Below this = stressed
32+
# Above stressed_threshold = overloaded
33+
34+
# Base delays for each health level (seconds)
35+
healthy_delay: float = 0.0
36+
degraded_delay: float = 0.5
37+
stressed_delay: float = 2.0
38+
overloaded_delay: float = 5.0
39+
40+
# Batch size multipliers for each health level
41+
healthy_batch_multiplier: float = 1.0
42+
degraded_batch_multiplier: float = 0.75
43+
stressed_batch_multiplier: float = 0.5
44+
overloaded_batch_multiplier: float = 0.25
45+
46+
# Rolling average window for response times
47+
window_size: int = 5
48+
49+
# Recovery settings
50+
recovery_requests: int = 3 # Consecutive fast responses to improve health
51+
min_batch_size: int = 1
52+
53+
54+
@dataclass
55+
class ThrottleStats:
56+
"""Statistics for throttling operations."""
57+
58+
total_requests: int = 0
59+
healthy_requests: int = 0
60+
degraded_requests: int = 0
61+
stressed_requests: int = 0
62+
overloaded_requests: int = 0
63+
total_delay_added: float = 0.0
64+
batch_size_reductions: int = 0
65+
health_recoveries: int = 0
66+
min_response_time: float = float("inf")
67+
max_response_time: float = 0.0
68+
total_response_time: float = 0.0
69+
70+
@property
71+
def avg_response_time(self) -> float:
72+
"""Calculate average response time."""
73+
if self.total_requests == 0:
74+
return 0.0
75+
return self.total_response_time / self.total_requests
76+
77+
78+
class ThrottleController:
79+
"""Controller for health-aware throttling."""
80+
81+
def __init__(self, config: Optional[ThrottleConfig] = None):
82+
"""Initialize the throttle controller.
83+
84+
Args:
85+
config: Throttling configuration.
86+
"""
87+
self.config = config or ThrottleConfig()
88+
self.stats = ThrottleStats()
89+
self.response_times: list[float] = []
90+
self.current_health = ServerHealth.HEALTHY
91+
self.consecutive_fast_responses = 0
92+
self.current_delay = 0.0
93+
self.batch_size_factor = 1.0
94+
95+
def record_response(self, response_time: float) -> None:
96+
"""Record a response time and update health status.
97+
98+
Args:
99+
response_time: Time taken for the request in seconds.
100+
"""
101+
self.stats.total_requests += 1
102+
self.stats.total_response_time += response_time
103+
self.stats.min_response_time = min(
104+
self.stats.min_response_time, response_time
105+
)
106+
self.stats.max_response_time = max(
107+
self.stats.max_response_time, response_time
108+
)
109+
110+
# Add to rolling window
111+
self.response_times.append(response_time)
112+
if len(self.response_times) > self.config.window_size:
113+
self.response_times.pop(0)
114+
115+
# Update health based on average
116+
self._update_health()
117+
118+
def _update_health(self) -> None:
119+
"""Update health status based on rolling average response time."""
120+
if not self.response_times:
121+
return
122+
123+
avg_time = sum(self.response_times) / len(self.response_times)
124+
old_health = self.current_health
125+
126+
# Determine new health level
127+
if avg_time < self.config.healthy_threshold:
128+
new_health = ServerHealth.HEALTHY
129+
self.consecutive_fast_responses += 1
130+
elif avg_time < self.config.degraded_threshold:
131+
new_health = ServerHealth.DEGRADED
132+
self.consecutive_fast_responses = 0
133+
elif avg_time < self.config.stressed_threshold:
134+
new_health = ServerHealth.STRESSED
135+
self.consecutive_fast_responses = 0
136+
else:
137+
new_health = ServerHealth.OVERLOADED
138+
self.consecutive_fast_responses = 0
139+
140+
# Track health level in stats
141+
if new_health == ServerHealth.HEALTHY:
142+
self.stats.healthy_requests += 1
143+
elif new_health == ServerHealth.DEGRADED:
144+
self.stats.degraded_requests += 1
145+
elif new_health == ServerHealth.STRESSED:
146+
self.stats.stressed_requests += 1
147+
else:
148+
self.stats.overloaded_requests += 1
149+
150+
# Update current health (with hysteresis for recovery)
151+
if new_health.value > old_health.value:
152+
# Health degraded - update immediately
153+
self.current_health = new_health
154+
self._update_throttle_params()
155+
log.debug(
156+
f"Server health degraded: {old_health.value} -> {new_health.value}"
157+
)
158+
elif (
159+
new_health.value < old_health.value
160+
and self.consecutive_fast_responses >= self.config.recovery_requests
161+
):
162+
# Health improved and we have enough consecutive fast responses
163+
self.current_health = new_health
164+
self._update_throttle_params()
165+
self.consecutive_fast_responses = 0
166+
self.stats.health_recoveries += 1
167+
log.debug(
168+
f"Server health recovered: {old_health.value} -> {new_health.value}"
169+
)
170+
171+
def _update_throttle_params(self) -> None:
172+
"""Update delay and batch size based on current health."""
173+
if self.current_health == ServerHealth.HEALTHY:
174+
self.current_delay = self.config.healthy_delay
175+
self.batch_size_factor = self.config.healthy_batch_multiplier
176+
elif self.current_health == ServerHealth.DEGRADED:
177+
self.current_delay = self.config.degraded_delay
178+
self.batch_size_factor = self.config.degraded_batch_multiplier
179+
elif self.current_health == ServerHealth.STRESSED:
180+
self.current_delay = self.config.stressed_delay
181+
self.batch_size_factor = self.config.stressed_batch_multiplier
182+
else:
183+
self.current_delay = self.config.overloaded_delay
184+
self.batch_size_factor = self.config.overloaded_batch_multiplier
185+
186+
def get_delay(self) -> float:
187+
"""Get the recommended delay before next request.
188+
189+
Returns:
190+
Delay in seconds.
191+
"""
192+
return self.current_delay
193+
194+
def get_batch_size(self, original_batch_size: int) -> int:
195+
"""Get the recommended batch size.
196+
197+
Args:
198+
original_batch_size: The original configured batch size.
199+
200+
Returns:
201+
Adjusted batch size.
202+
"""
203+
adjusted = int(original_batch_size * self.batch_size_factor)
204+
if adjusted < original_batch_size:
205+
self.stats.batch_size_reductions += 1
206+
return max(self.config.min_batch_size, adjusted)
207+
208+
def apply_delay(self) -> None:
209+
"""Apply the current delay (sleep)."""
210+
if self.current_delay > 0:
211+
self.stats.total_delay_added += self.current_delay
212+
time.sleep(self.current_delay)
213+
214+
def get_health_status(self) -> dict:
215+
"""Get current health status as a dict.
216+
217+
Returns:
218+
Dict with health status information.
219+
"""
220+
return {
221+
"health": self.current_health,
222+
"avg_response_time": (
223+
sum(self.response_times) / len(self.response_times)
224+
if self.response_times
225+
else 0
226+
),
227+
"current_delay": self.current_delay,
228+
"batch_size_factor": self.batch_size_factor,
229+
}
230+
231+
def record_error(self, is_server_error: bool = False) -> None:
232+
"""Record an error and adjust throttling if needed.
233+
234+
Args:
235+
is_server_error: True if error indicates server overload (5xx).
236+
"""
237+
if is_server_error:
238+
# Treat server errors as very slow responses
239+
self.record_response(self.config.stressed_threshold * 2)
240+
log.debug("Server error recorded, increasing throttle")
241+
242+
243+
def create_throttle_controller(
244+
base_delay: float = 0.0,
245+
aggressive: bool = False,
246+
) -> ThrottleController:
247+
"""Create a throttle controller with preset configurations.
248+
249+
Args:
250+
base_delay: Base delay to add to all operations.
251+
aggressive: If True, use more aggressive throttling.
252+
253+
Returns:
254+
Configured ThrottleController.
255+
"""
256+
if aggressive:
257+
config = ThrottleConfig(
258+
healthy_threshold=1.0,
259+
degraded_threshold=3.0,
260+
stressed_threshold=5.0,
261+
healthy_delay=base_delay,
262+
degraded_delay=base_delay + 1.0,
263+
stressed_delay=base_delay + 3.0,
264+
overloaded_delay=base_delay + 10.0,
265+
healthy_batch_multiplier=1.0,
266+
degraded_batch_multiplier=0.5,
267+
stressed_batch_multiplier=0.25,
268+
overloaded_batch_multiplier=0.1,
269+
)
270+
else:
271+
config = ThrottleConfig(
272+
healthy_delay=base_delay,
273+
degraded_delay=base_delay + 0.5,
274+
stressed_delay=base_delay + 2.0,
275+
overloaded_delay=base_delay + 5.0,
276+
)
277+
return ThrottleController(config)
278+
279+
280+
def display_throttle_stats(stats: ThrottleStats) -> None:
281+
"""Display throttling statistics."""
282+
from rich.console import Console
283+
from rich.panel import Panel
284+
285+
console = Console()
286+
287+
lines = [
288+
f"Total requests: {stats.total_requests}",
289+
f"Avg response time: {stats.avg_response_time:.2f}s",
290+
f"Min/Max response: {stats.min_response_time:.2f}s / "
291+
f"{stats.max_response_time:.2f}s",
292+
"",
293+
"Health distribution:",
294+
f" Healthy: {stats.healthy_requests}",
295+
f" Degraded: {stats.degraded_requests}",
296+
f" Stressed: {stats.stressed_requests}",
297+
f" Overloaded: {stats.overloaded_requests}",
298+
"",
299+
f"Total delay added: {stats.total_delay_added:.2f}s",
300+
f"Batch size reductions: {stats.batch_size_reductions}",
301+
f"Health recoveries: {stats.health_recoveries}",
302+
]
303+
304+
console.print(
305+
Panel(
306+
"\n".join(lines),
307+
title="[bold cyan]Throttling Statistics[/bold cyan]",
308+
expand=False,
309+
)
310+
)

0 commit comments

Comments
 (0)