Skip to content

Commit e5d7b4d

Browse files
author
User
committed
feat(high): Add OpenAI API rate limiting with Token Bucket algorithm
Implemented comprehensive rate limiting: - Token Bucket algorithm for RPM and TPM limits - Separate buckets for requests/minute and tokens/minute - Automatic token refilling at constant rate - Graceful degradation with queuing when limits reached - Concurrent request limiting with semaphore Features: - Per-model rate limits (gpt-4o: 30K TPM, gpt-4o-mini: 200K TPM) - Automatic wait time calculation - Statistics tracking (throttle rate, wait times, etc.) - Thread-safe with asyncio locks - Context manager API for easy usage Integration: - Integrated into Vision API (_call_openai_vision_api_with_retry) - Automatic token estimation based on image and schema size - Added to DI container with lazy initialization - Global rate limiter registry for singleton pattern Benefits: - Prevents 429 rate limit errors from OpenAI - Automatic queuing instead of failing requests - Better resource utilization with concurrent limiting - Detailed metrics for monitoring and optimization - Configurable limits per model tier Tests: - 19 comprehensive unit tests for rate limiter - Test token bucket algorithm, refilling, wait times - Test concurrent request limiting - Test per-model configuration - All tests passing Resolves: openai#5 (HIGH priority) Impact: Eliminates rate limit errors and improves API reliability
1 parent d28af64 commit e5d7b4d

File tree

4 files changed

+686
-7
lines changed

4 files changed

+686
-7
lines changed

examples/erni-foto-agency/erni_foto_agency/di_container.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from .performance.cache_manager import ErniCacheManager
2727
from .performance.circuit_breaker import CircuitBreaker
2828
from .performance.cost_optimizer import CostBudget, CostOptimizer
29+
from .performance.rate_limiter import RateLimiter, get_rate_limiter
2930
from .utils.image_processor import ImageProcessor
3031

3132
logger = structlog.get_logger(__name__)
@@ -62,11 +63,18 @@ def record_request(self, agent: str, duration: float, success: bool) -> None: ..
6263

6364
class IImageProcessor(Protocol):
6465
"""Interface for image processing"""
65-
66+
6667
async def process_image(self, image_path: str) -> bytes: ...
6768
async def validate_image(self, image_path: str) -> bool: ...
6869

6970

71+
class IRateLimiter(Protocol):
72+
"""Interface for rate limiting"""
73+
74+
async def acquire(self, estimated_tokens: int) -> Any: ...
75+
def get_stats(self) -> dict[str, Any]: ...
76+
77+
7078
# ============================================================================
7179
# Dependency Injection Container
7280
# ============================================================================
@@ -113,6 +121,8 @@ def __init__(self, config: ErniConfig | None = None):
113121
self._image_processor: ImageProcessor | None = None
114122
self._batch_processor: BatchProcessor | None = None
115123
self._cost_optimizer: CostOptimizer | None = None
124+
self._rate_limiter_gpt4o: RateLimiter | None = None
125+
self._rate_limiter_gpt4o_mini: RateLimiter | None = None
116126

117127
# Agents (lazy-initialized)
118128
self._schema_extractor: SharePointSchemaExtractorAgent | None = None
@@ -238,6 +248,32 @@ def cost_optimizer(self, value: CostOptimizer) -> None:
238248
"""Set cost optimizer (for testing)"""
239249
self._cost_optimizer = value
240250

251+
@property
252+
def rate_limiter_gpt4o(self) -> RateLimiter:
253+
"""Get rate limiter for GPT-4o (singleton)"""
254+
if self._rate_limiter_gpt4o is None:
255+
self._rate_limiter_gpt4o = get_rate_limiter("gpt-4o")
256+
logger.debug("Rate limiter for gpt-4o created")
257+
return self._rate_limiter_gpt4o
258+
259+
@rate_limiter_gpt4o.setter
260+
def rate_limiter_gpt4o(self, value: RateLimiter) -> None:
261+
"""Set rate limiter for GPT-4o (for testing)"""
262+
self._rate_limiter_gpt4o = value
263+
264+
@property
265+
def rate_limiter_gpt4o_mini(self) -> RateLimiter:
266+
"""Get rate limiter for GPT-4o-mini (singleton)"""
267+
if self._rate_limiter_gpt4o_mini is None:
268+
self._rate_limiter_gpt4o_mini = get_rate_limiter("gpt-4o-mini")
269+
logger.debug("Rate limiter for gpt-4o-mini created")
270+
return self._rate_limiter_gpt4o_mini
271+
272+
@rate_limiter_gpt4o_mini.setter
273+
def rate_limiter_gpt4o_mini(self, value: RateLimiter) -> None:
274+
"""Set rate limiter for GPT-4o-mini (for testing)"""
275+
self._rate_limiter_gpt4o_mini = value
276+
241277
# ========================================================================
242278
# AI Agents
243279
# ========================================================================
@@ -381,6 +417,8 @@ def reset(self) -> None:
381417
self._image_processor = None
382418
self._batch_processor = None
383419
self._cost_optimizer = None
420+
self._rate_limiter_gpt4o = None
421+
self._rate_limiter_gpt4o_mini = None
384422
self._schema_extractor = None
385423
self._vision_analyzer = None
386424
self._sharepoint_uploader = None

examples/erni-foto-agency/erni_foto_agency/erni_agents/structured_vision_analyzer.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from ..performance.cache_manager import ErniCacheManager
3939
from ..performance.circuit_breaker import call_openai_with_circuit_breaker
4040
from ..performance.cost_optimizer import CostBudget, CostOptimizer, ModelType
41+
from ..performance.rate_limiter import get_rate_limiter
4142
from ..utils.image_processor import ImageProcessor
4243
from ..utils.pii_detector import PIIDetector
4344

@@ -563,14 +564,19 @@ async def _call_openai_vision_api_with_retry(
563564
api_key: str,
564565
) -> Any:
565566
"""
566-
Call OpenAI Vision API with retry logic and exponential backoff
567+
Call OpenAI Vision API with retry logic, rate limiting, and exponential backoff
567568
568569
Retry strategy:
569570
- Max 3 attempts
570571
- Exponential backoff: 2s, 4s, 8s (capped at 30s)
571572
- Retry on: timeout, connection errors, rate limits, 5xx errors
572573
- No retry on: 4xx errors (except 429 rate limit)
573574
575+
Rate limiting:
576+
- Token bucket algorithm for RPM and TPM limits
577+
- Automatic queuing when limits reached
578+
- Per-model rate limits (gpt-4o vs gpt-4o-mini)
579+
574580
Args:
575581
base64_image: Base64-encoded image
576582
json_schema: JSON schema for structured output
@@ -585,14 +591,24 @@ async def _call_openai_vision_api_with_retry(
585591
RetryableAPIError: For 5xx and 429 errors
586592
openai.APIStatusError: For non-retryable 4xx errors
587593
"""
594+
# Get rate limiter for this model
595+
rate_limiter = get_rate_limiter(model)
596+
597+
# Estimate tokens for rate limiting
598+
# Vision API typically uses ~1000-2000 tokens per image
599+
# Add tokens for prompt and schema
600+
estimated_tokens = 1500 + len(json_schema) // 4 # Rough estimate
601+
588602
try:
589-
# Create specialized prompt based on fields
590-
system_prompt = _create_system_prompt(fields_to_analyze)
591-
user_prompt = _create_user_prompt(fields_to_analyze)
603+
# Acquire rate limit before making request
604+
async with rate_limiter.acquire(estimated_tokens=estimated_tokens):
605+
# Create specialized prompt based on fields
606+
system_prompt = _create_system_prompt(fields_to_analyze)
607+
user_prompt = _create_user_prompt(fields_to_analyze)
592608

593-
client = openai.AsyncOpenAI(api_key=api_key)
609+
client = openai.AsyncOpenAI(api_key=api_key)
594610

595-
response = await client.chat.completions.create(
611+
response = await client.chat.completions.create(
596612
model=model,
597613
messages=[
598614
{"role": "system", "content": system_prompt},

0 commit comments

Comments
 (0)