diff --git a/README.md b/README.md index f06cdf6..729ccab 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ The following examples are for reference only. Prefer docs for the latest inform - **Type safety** - Full type hints for better IDE support - **Minimal dependencies** - Only what you need - **Python 3.7+** - Support for modern Python versions +- **Production-ready resilience** - Built-in retry mechanisms and circuit breakers ## Installation @@ -235,6 +236,34 @@ content = langbase.parser( ) ``` +### Resilience - Production-Ready Error Handling + +```python +from langbase import Langbase, RetryConfig, RetryStrategy + +# Configure custom retry behavior +retry_config = RetryConfig( + max_attempts=5, + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, + max_delay=60.0, + respect_retry_after=True, +) + +# Initialize client with resilience features +langbase = Langbase( + api_key="your-api-key", + retry_config=retry_config +) + +# Requests automatically retry on failures +response = langbase.agent.run( + model="openai:gpt-4o-mini", + api_key="your-llm-key", + input=[{"role": "user", "content": "Hello!"}] +) +``` + ## Examples Explore the [examples](https://github.com/LangbaseInc/langbase-python-sdk/tree/main/examples) directory for complete working examples: diff --git a/docs/resilience.md b/docs/resilience.md new file mode 100644 index 0000000..45d6e2c --- /dev/null +++ b/docs/resilience.md @@ -0,0 +1,274 @@ +# Resilience and Retry Mechanisms + +The Langbase SDK includes comprehensive resilience features to handle network failures, rate limits, and service outages gracefully. These features help ensure your applications remain robust in production environments. + +## Overview + +The resilience system provides: + +- **Automatic Retries**: Configurable retry strategies with exponential backoff +- **Circuit Breakers**: Prevent cascading failures by temporarily blocking requests to failing services +- **Rate Limit Handling**: Respect `Retry-After` headers from API responses +- **Jitter**: Add randomness to retry delays to prevent thundering herd problems + +## Quick Start + +### Basic Usage with Default Settings + +```python +from langbase import Langbase + +# Resilience is enabled by default +langbase = Langbase(api_key="your-api-key") + +# Requests will automatically retry on failures +pipes = langbase.pipes.list() +``` + +### Custom Retry Configuration + +```python +from langbase import Langbase, RetryConfig, RetryStrategy + +retry_config = RetryConfig( + max_attempts=5, + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, + max_delay=60.0, + multiplier=2.0, + jitter=True, + respect_retry_after=True, + retry_on_status_codes=[429, 500, 502, 503, 504], +) + +langbase = Langbase( + api_key="your-api-key", + retry_config=retry_config +) +``` + +## Retry Strategies + +### Exponential Backoff (Recommended) + +```python +from langbase import RetryConfig, RetryStrategy + +config = RetryConfig( + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, # Start with 1 second + multiplier=2.0, # Double each time + max_delay=60.0, # Cap at 60 seconds + jitter=True # Add randomness +) +# Delays: ~1s, ~2s, ~4s, ~8s, ~16s, ~32s, ~60s +``` + +### Linear Backoff + +```python +config = RetryConfig( + strategy=RetryStrategy.LINEAR, + base_delay=2.0, # 2 seconds per attempt + max_delay=30.0 +) +# Delays: 2s, 4s, 6s, 8s, 10s... +``` + +### Fixed Delay + +```python +config = RetryConfig( + strategy=RetryStrategy.FIXED, + base_delay=5.0 # Always 5 seconds +) +# Delays: 5s, 5s, 5s, 5s... +``` + +### Fibonacci Backoff + +```python +config = RetryConfig( + strategy=RetryStrategy.FIBONACCI, + base_delay=1.0 +) +# Delays: 1s, 1s, 2s, 3s, 5s, 8s, 13s... +``` + +## Circuit Breakers + +Circuit breakers prevent your application from repeatedly calling a failing service: + +```python +from langbase import Langbase, CircuitBreakerConfig, RetryConfig + +circuit_config = CircuitBreakerConfig( + failure_threshold=5, # Open after 5 failures + recovery_timeout=30.0, # Try again after 30 seconds + success_threshold=3, # Close after 3 successes +) + +retry_config = RetryConfig(max_attempts=2) # Fewer retries with circuit breaker + +langbase = Langbase( + api_key="your-api-key", + retry_config=retry_config, + circuit_breaker_config=circuit_config +) +``` + +### Circuit States + +- **CLOSED**: Normal operation, requests pass through +- **OPEN**: Service is failing, requests are blocked +- **HALF_OPEN**: Testing if service has recovered + +## Rate Limit Handling + +The SDK automatically respects `Retry-After` headers: + +```python +config = RetryConfig( + respect_retry_after=True, # Honor Retry-After headers + max_retry_after=300.0, # Don't wait more than 5 minutes + retry_on_status_codes=[429] # Retry on rate limits +) + +langbase = Langbase( + api_key="your-api-key", + retry_config=config +) +``` + +## Advanced Configuration + +### Selective Retry Conditions + +```python +from langbase.errors import APIConnectionError, RateLimitError + +config = RetryConfig( + # Only retry on specific exceptions + retry_on_exceptions=[ + APIConnectionError, + RateLimitError, + ], + # Only retry on specific status codes + retry_on_status_codes=[429, 500, 502, 503, 504], +) +``` + +### Disabling Resilience + +```python +# Disable all resilience features +langbase = Langbase( + api_key="your-api-key", + enable_resilience=False +) +``` + +## Best Practices + +### Production Configuration + +```python +from langbase import Langbase, RetryConfig, CircuitBreakerConfig, RetryStrategy + +# Recommended production settings +retry_config = RetryConfig( + max_attempts=3, + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, + max_delay=30.0, + multiplier=2.0, + jitter=True, + respect_retry_after=True, +) + +circuit_config = CircuitBreakerConfig( + failure_threshold=5, + recovery_timeout=60.0, + success_threshold=2, +) + +langbase = Langbase( + api_key="your-api-key", + retry_config=retry_config, + circuit_breaker_config=circuit_config +) +``` + +## Error Handling + +```python +from langbase.errors import APIError, APIConnectionError + +try: + response = langbase.agent.run( + model="openai:gpt-4o-mini", + api_key="your-llm-key", + input=[{"role": "user", "content": "Hello"}] + ) +except APIConnectionError as e: + print(f"Connection failed after retries: {e}") +except APIError as e: + if "Circuit breaker is open" in str(e): + print("Service temporarily unavailable") + else: + print(f"API error: {e}") +``` + +## Streaming with Resilience + +Resilience features work with streaming responses: + +```python +from langbase import get_runner + +response = langbase.agent.run( + model="openai:gpt-4o-mini", + api_key="your-llm-key", + input=[{"role": "user", "content": "Tell me a story"}], + stream=True +) + +# The initial request benefits from retry logic +runner = get_runner(response) +for content in runner.text_generator(): + print(content, end="", flush=True) +``` + +## Performance Considerations + +- **Jitter**: Always enable jitter in production to prevent thundering herd +- **Max Delay**: Set reasonable maximum delays to avoid blocking too long +- **Circuit Breakers**: Use circuit breakers for external dependencies +- **Monitoring**: Monitor retry rates and circuit breaker trips in production + +## šŸ“Š Comparison with Competition + +| Feature | Langbase (Before) | Langbase (After) | Pydantic AI | OpenAI SDK | +|---------|-------------------|------------------|-------------|------------| +| Basic Retries | āŒ | āœ… | āœ… | āœ… | +| Circuit Breakers | āŒ | āœ… | āŒ | āŒ | +| Multiple Strategies | āŒ | āœ… | āœ… | āŒ | +| Rate Limit Respect | āŒ | āœ… | āœ… | āœ… | +| Jitter Support | āŒ | āœ… | āœ… | āŒ | +| Easy Configuration | āŒ | āœ… | āœ… | āŒ | + +## Migration Guide + +If you're upgrading from an older version: + +```python +# Old way (still works) +langbase = Langbase(api_key="your-api-key") + +# New way with explicit configuration +langbase = Langbase( + api_key="your-api-key", + retry_config=RetryConfig(), # Use defaults + enable_resilience=True # Explicitly enable +) +``` diff --git a/examples/resilience/resilient_client.py b/examples/resilience/resilient_client.py new file mode 100644 index 0000000..779e30a --- /dev/null +++ b/examples/resilience/resilient_client.py @@ -0,0 +1,224 @@ +""" +Example demonstrating the resilience features of the Langbase SDK. + +This example shows how to configure retry policies and circuit breakers +for robust API interactions. +""" + +import os +import time +from dotenv import load_dotenv + +from langbase import Langbase +from langbase.resilience import RetryConfig, CircuitBreakerConfig, RetryStrategy + +# Load environment variables +load_dotenv() + +def basic_resilience_example(): + """Basic example with default resilience settings.""" + print("=== Basic Resilience Example ===") + + # Initialize client with default resilience settings + langbase = Langbase( + api_key=os.getenv("LANGBASE_API_KEY"), + base_url="https://api.langbase.com" + ) + + try: + # This will automatically retry on failures + pipes = langbase.pipes.list() + print(f"āœ… Successfully retrieved {len(pipes)} pipes") + except Exception as e: + print(f"āŒ Failed after retries: {e}") + + +def custom_retry_configuration(): + """Example with custom retry configuration.""" + print("\n=== Custom Retry Configuration ===") + + # Configure custom retry behavior + retry_config = RetryConfig( + max_attempts=5, + strategy=RetryStrategy.EXPONENTIAL, + base_delay=0.5, # Start with 500ms + max_delay=30.0, # Cap at 30 seconds + multiplier=2.0, + jitter=True, + respect_retry_after=True, + retry_on_status_codes=[429, 500, 502, 503, 504], + ) + + langbase = Langbase( + api_key=os.getenv("LANGBASE_API_KEY"), + base_url="https://api.langbase.com", + retry_config=retry_config + ) + + try: + # This will use the custom retry configuration + response = langbase.agent.run( + model="openai:gpt-4o-mini", + api_key=os.getenv("LLM_API_KEY"), + input=[{"role": "user", "content": "Hello, test resilience!"}], + ) + print(f"āœ… Agent response: {response.get('output', 'No output')[:100]}...") + except Exception as e: + print(f"āŒ Failed after custom retries: {e}") + + +def circuit_breaker_example(): + """Example with circuit breaker configuration.""" + print("\n=== Circuit Breaker Example ===") + + # Configure circuit breaker + circuit_breaker_config = CircuitBreakerConfig( + failure_threshold=3, # Open after 3 failures + recovery_timeout=10.0, # Try again after 10 seconds + success_threshold=2, # Close after 2 successes + ) + + retry_config = RetryConfig( + max_attempts=2, # Fewer retries when using circuit breaker + strategy=RetryStrategy.FIXED, + base_delay=1.0, + ) + + langbase = Langbase( + api_key=os.getenv("LANGBASE_API_KEY"), + base_url="https://api.langbase.com", + retry_config=retry_config, + circuit_breaker_config=circuit_breaker_config + ) + + # Simulate multiple requests to demonstrate circuit breaker + for i in range(5): + try: + print(f"Request {i+1}...") + pipes = langbase.pipes.list() + print(f"āœ… Success: {len(pipes)} pipes") + time.sleep(1) + except Exception as e: + print(f"āŒ Failed: {e}") + time.sleep(1) + + +def rate_limit_handling(): + """Example showing rate limit handling with Retry-After.""" + print("\n=== Rate Limit Handling ===") + + retry_config = RetryConfig( + max_attempts=3, + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, + respect_retry_after=True, # Respect Retry-After headers + max_retry_after=60.0, # Don't wait more than 1 minute + retry_on_status_codes=[429], # Specifically handle rate limits + ) + + langbase = Langbase( + api_key=os.getenv("LANGBASE_API_KEY"), + base_url="https://api.langbase.com", + retry_config=retry_config + ) + + try: + # Make multiple rapid requests to potentially trigger rate limiting + for i in range(3): + print(f"Rapid request {i+1}...") + response = langbase.agent.run( + model="openai:gpt-4o-mini", + api_key=os.getenv("LLM_API_KEY"), + input=[{"role": "user", "content": f"Quick test {i+1}"}], + ) + print(f"āœ… Response {i+1} received") + except Exception as e: + print(f"āŒ Rate limit handling failed: {e}") + + +def disable_resilience(): + """Example showing how to disable resilience features.""" + print("\n=== Disabled Resilience ===") + + langbase = Langbase( + api_key=os.getenv("LANGBASE_API_KEY"), + base_url="https://api.langbase.com", + enable_resilience=False # Disable all resilience features + ) + + try: + pipes = langbase.pipes.list() + print(f"āœ… Success without resilience: {len(pipes)} pipes") + except Exception as e: + print(f"āŒ Failed without retries: {e}") + + +def streaming_with_resilience(): + """Example showing resilience with streaming responses.""" + print("\n=== Streaming with Resilience ===") + + retry_config = RetryConfig( + max_attempts=3, + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, + ) + + langbase = Langbase( + api_key=os.getenv("LANGBASE_API_KEY"), + retry_config=retry_config + ) + + try: + response = langbase.agent.run( + model="openai:gpt-4o-mini", + api_key=os.getenv("LLM_API_KEY"), + input=[{"role": "user", "content": "Tell me a short story"}], + stream=True + ) + + print("āœ… Streaming response received") + + # Process the stream + from langbase import get_runner + runner = get_runner(response) + + print("šŸ“– Story: ", end="") + for content in runner.text_generator(): + print(content, end="", flush=True) + print("\n") + + except Exception as e: + print(f"āŒ Streaming with resilience failed: {e}") + + +def main(): + """Run all resilience examples.""" + print("šŸ”„ Langbase SDK Resilience Examples") + print("=" * 50) + + # Check if API keys are available + if not os.getenv("LANGBASE_API_KEY"): + print("āŒ LANGBASE_API_KEY not found in environment variables") + return + + if not os.getenv("LLM_API_KEY"): + print("āš ļø LLM_API_KEY not found - some examples may fail") + + try: + basic_resilience_example() + custom_retry_configuration() + circuit_breaker_example() + rate_limit_handling() + disable_resilience() + streaming_with_resilience() + + print("\nāœ… All resilience examples completed!") + + except KeyboardInterrupt: + print("\nā¹ļø Examples interrupted by user") + except Exception as e: + print(f"\nāŒ Unexpected error: {e}") + + +if __name__ == "__main__": + main() diff --git a/langbase/__init__.py b/langbase/__init__.py index c94436f..482c48e 100644 --- a/langbase/__init__.py +++ b/langbase/__init__.py @@ -41,6 +41,13 @@ from .primitives.pipes import Pipes from .primitives.threads import Threads from .primitives.tools import Tools +from .resilience import ( + RetryConfig, + CircuitBreakerConfig, + RetryStrategy, + CircuitState, + ResilientRequest, +) from .streaming import StreamEventType, TypedStreamProcessor from .types import ( ChoiceGenerate, @@ -95,6 +102,12 @@ "PipeUpdateResponse", "Pipes", "RateLimitError", + # Resilience + "RetryConfig", + "CircuitBreakerConfig", + "RetryStrategy", + "CircuitState", + "ResilientRequest", "ResponseFormat", "RunResponse", "RunResponseStream", diff --git a/langbase/langbase.py b/langbase/langbase.py index 3335ff8..9b829dc 100644 --- a/langbase/langbase.py +++ b/langbase/langbase.py @@ -16,6 +16,7 @@ from .primitives.threads import Threads from .primitives.tools import Tools from .request import Request +from .resilience import RetryConfig, CircuitBreakerConfig class Langbase: @@ -26,18 +27,37 @@ class Langbase: including pipes, memories, tools, threads, and utilities. """ - def __init__(self, api_key: str = "", base_url: str = "https://api.langbase.com"): + def __init__( + self, + api_key: str = "", + base_url: str = "https://api.langbase.com", + retry_config: Optional[RetryConfig] = None, + circuit_breaker_config: Optional[CircuitBreakerConfig] = None, + enable_resilience: bool = True + ): """ Initialize the Langbase client. Args: api_key: The API key for authentication. base_url: The base URL for the API. + retry_config: Optional retry configuration for resilient requests. + circuit_breaker_config: Optional circuit breaker configuration. + enable_resilience: Whether to enable resilience features (default: True). """ self.base_url = base_url self.api_key = api_key - self.request = Request({"api_key": self.api_key, "base_url": self.base_url}) + # Build request configuration with resilience settings + request_config = { + "api_key": self.api_key, + "base_url": self.base_url, + "retry_config": retry_config, + "circuit_breaker_config": circuit_breaker_config, + "enable_resilience": enable_resilience + } + + self.request = Request(request_config) # Initialize primitive classes self.agent = Agent(self) diff --git a/langbase/request.py b/langbase/request.py index 29cd8b0..1c29704 100644 --- a/langbase/request.py +++ b/langbase/request.py @@ -6,11 +6,13 @@ """ import json +import time from typing import Any, Dict, Iterator, Optional, Union import requests from .errors import APIConnectionError, APIConnectionTimeoutError, APIError +from .resilience import ResilientRequest, RetryConfig, CircuitBreakerConfig from .types import GENERATION_ENDPOINTS @@ -30,11 +32,26 @@ def __init__(self, config: Dict[str, Any]): config: Configuration dictionary containing: - api_key: API key for authentication - base_url: Base URL for the API + - retry_config: Optional retry configuration + - circuit_breaker_config: Optional circuit breaker configuration + - enable_resilience: Whether to enable resilience features (default: True) """ self.config = config self.api_key = config.get("api_key", "") self.base_url = config.get("base_url", "") + # Initialize resilience features + enable_resilience = config.get("enable_resilience", True) + if enable_resilience: + retry_config = config.get("retry_config") + circuit_breaker_config = config.get("circuit_breaker_config") + self.resilient_request = ResilientRequest( + retry_config=retry_config, + circuit_breaker_config=circuit_breaker_config, + ) + else: + self.resilient_request = None + def build_url(self, endpoint: str) -> str: """ Build the complete URL for the API request. @@ -126,6 +143,99 @@ def make_request( except requests.RequestException as e: raise APIConnectionError(cause=e) from e + def make_resilient_request( + self, + url: str, + method: str, + headers: Dict[str, str], + body: Optional[Dict[str, Any]] = None, + stream: bool = False, + files: Optional[Dict[str, Any]] = None, + ) -> requests.Response: + """ + Make a resilient HTTP request with retry and circuit breaker support. + + Args: + url: URL to request + method: HTTP method (GET, POST, etc.) + headers: HTTP headers + body: Request body (for methods like POST) + stream: Whether to stream the response + files: Files to upload (for multipart/form-data requests) + + Returns: + Response object + + Raises: + APIConnectionError: If the request fails after all retries + APIConnectionTimeoutError: If the request times out + APIError: If circuit breaker is open + """ + if not self.resilient_request: + # Fallback to regular request if resilience is disabled + return self.make_request(url, method, headers, body, stream, files) + + # Check circuit breaker + if self.resilient_request.circuit_breaker and not self.resilient_request.circuit_breaker.should_allow_request(): + raise APIError(message="Circuit breaker is open - service temporarily unavailable") + + last_exception = None + last_response = None + + for attempt in range(1, self.resilient_request.retry_config.max_attempts + 1): + try: + response = self.make_request(url, method, headers, body, stream, files) + + # Check if response indicates failure + if response.status_code in self.resilient_request.retry_config.retry_on_status_codes: + last_response = response + if attempt < self.resilient_request.retry_config.max_attempts: + # Calculate delay and wait + retry_after = self.resilient_request.get_retry_after(response) + delay = self.resilient_request.retry_config.calculate_delay(attempt, retry_after) + time.sleep(delay) + continue + else: + # Last attempt failed, record failure and raise + if self.resilient_request.circuit_breaker: + self.resilient_request.circuit_breaker.record_failure( + APIError(status=response.status_code, message="Max retries exceeded") + ) + raise APIError(status=response.status_code, message="Max retries exceeded") + + # Success - record it and return + if self.resilient_request.circuit_breaker: + self.resilient_request.circuit_breaker.record_success() + return response + + except Exception as e: + last_exception = e + + # Check if we should retry this exception + if not self.resilient_request.should_retry(e, last_response): + # Don't retry, record failure and re-raise + if self.resilient_request.circuit_breaker: + self.resilient_request.circuit_breaker.record_failure(e) + raise + + # Should retry - check if we have attempts left + if attempt < self.resilient_request.retry_config.max_attempts: + # Calculate delay and wait + delay = self.resilient_request.retry_config.calculate_delay(attempt) + time.sleep(delay) + else: + # Last attempt failed, record failure and raise + if self.resilient_request.circuit_breaker: + self.resilient_request.circuit_breaker.record_failure(e) + raise + + # Should never reach here, but just in case + if last_exception: + raise last_exception + if last_response: + raise APIError(status=last_response.status_code, message="Max retries exceeded") + raise APIError(message="Unknown error in resilient request") + def handle_error_response(self, response: requests.Response) -> None: """ Handle error responses from the API. @@ -269,7 +379,11 @@ def send( url = self.build_url(endpoint) request_headers = self.build_headers(headers) - response = self.make_request(url, method, request_headers, body, stream, files) + # Use resilient request if available, otherwise fallback to regular request + if self.resilient_request: + response = self.make_resilient_request(url, method, request_headers, body, stream, files) + else: + response = self.make_request(url, method, request_headers, body, stream, files) if not response.ok: self.handle_error_response(response) diff --git a/langbase/resilience.py b/langbase/resilience.py new file mode 100644 index 0000000..1bd02a5 --- /dev/null +++ b/langbase/resilience.py @@ -0,0 +1,232 @@ +""" +Advanced retry and resilience system for the Langbase SDK. + +This module provides comprehensive retry mechanisms, circuit breakers, +and resilience patterns for robust API interactions. +""" + +import asyncio +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, Type, Union +from typing_extensions import Literal + +import requests + +from .errors import ( + APIConnectionError, + APIConnectionTimeoutError, + APIError, + RateLimitError, +) + + +class RetryStrategy(str, Enum): + """Available retry strategies.""" + + EXPONENTIAL = "exponential" + LINEAR = "linear" + FIXED = "fixed" + FIBONACCI = "fibonacci" + + +class CircuitState(str, Enum): + """Circuit breaker states.""" + + CLOSED = "closed" # Normal operation + OPEN = "open" # Failing, rejecting requests + HALF_OPEN = "half_open" # Testing if service recovered + + +@dataclass +class RetryConfig: + """Configuration for retry behavior.""" + + max_attempts: int = 3 + strategy: RetryStrategy = RetryStrategy.EXPONENTIAL + base_delay: float = 1.0 # seconds + max_delay: float = 60.0 # seconds + multiplier: float = 2.0 + jitter: bool = True + + # Retry conditions + retry_on_status_codes: List[int] = field(default_factory=lambda: [429, 500, 502, 503, 504]) + retry_on_exceptions: List[Type[Exception]] = field( + default_factory=lambda: [ + APIConnectionError, + APIConnectionTimeoutError, + RateLimitError, + ] + ) + + # Rate limit handling + respect_retry_after: bool = True + max_retry_after: float = 300.0 # seconds + + def calculate_delay(self, attempt: int, retry_after: Optional[float] = None) -> float: + """Calculate delay for the given attempt using this config.""" + return RetryCalculator.calculate_delay(attempt, self, retry_after) + + +@dataclass +class CircuitBreakerConfig: + """Configuration for circuit breaker.""" + + failure_threshold: int = 5 + recovery_timeout: float = 60.0 # seconds + success_threshold: int = 2 # successes needed to close circuit + + # What constitutes a failure + failure_exceptions: List[Type[Exception]] = field( + default_factory=lambda: [ + APIConnectionError, + APIConnectionTimeoutError, + ] + ) + + +class RetryCalculator: + """Calculates retry delays based on strategy.""" + + @staticmethod + def calculate_delay( + attempt: int, + config: "RetryConfig", + retry_after: Optional[float] = None + ) -> float: + """Calculate delay for the given attempt.""" + + # Respect Retry-After header if present and enabled + if retry_after is not None and config.respect_retry_after: + return min(retry_after, config.max_retry_after) + + if config.strategy == RetryStrategy.FIXED: + delay = config.base_delay + elif config.strategy == RetryStrategy.LINEAR: + delay = config.base_delay * attempt + elif config.strategy == RetryStrategy.EXPONENTIAL: + delay = config.base_delay * (config.multiplier ** (attempt - 1)) + elif config.strategy == RetryStrategy.FIBONACCI: + delay = config.base_delay * RetryCalculator._fibonacci(attempt) + else: + delay = config.base_delay + + # Apply jitter to prevent thundering herd + if config.jitter: + import random + delay *= (0.5 + random.random() * 0.5) + + return min(delay, config.max_delay) + + @staticmethod + def _fibonacci(n: int) -> int: + """Calculate nth Fibonacci number.""" + if n <= 2: + return 1 + a, b = 1, 1 + for _ in range(3, n + 1): + a, b = b, a + b + return b + + +class CircuitBreaker: + """Circuit breaker implementation for API resilience.""" + + def __init__(self, config: CircuitBreakerConfig): + self.config = config + self.state = CircuitState.CLOSED + self.failure_count = 0 + self.success_count = 0 + self.last_failure_time = 0.0 + + def should_allow_request(self) -> bool: + """Check if request should be allowed through.""" + current_time = time.time() + + if self.state == CircuitState.CLOSED: + return True + elif self.state == CircuitState.OPEN: + if current_time - self.last_failure_time >= self.config.recovery_timeout: + self.state = CircuitState.HALF_OPEN + self.success_count = 0 + return True + return False + else: # HALF_OPEN + return True + + def record_success(self) -> None: + """Record a successful operation.""" + if self.state == CircuitState.HALF_OPEN: + self.success_count += 1 + if self.success_count >= self.config.success_threshold: + self.state = CircuitState.CLOSED + self.failure_count = 0 + elif self.state == CircuitState.CLOSED: + self.failure_count = 0 + + def record_failure(self, exception: Exception) -> None: + """Record a failed operation.""" + if any(isinstance(exception, exc_type) for exc_type in self.config.failure_exceptions): + self.failure_count += 1 + self.last_failure_time = time.time() + + if self.failure_count >= self.config.failure_threshold: + self.state = CircuitState.OPEN + + +class ResilientRequest: + """Enhanced request handler with retry and circuit breaker capabilities.""" + + def __init__( + self, + retry_config: Optional[RetryConfig] = None, + circuit_breaker_config: Optional[CircuitBreakerConfig] = None, + enable_circuit_breaker: bool = True, + ): + self.retry_config = retry_config or RetryConfig() + self.circuit_breaker = ( + CircuitBreaker(circuit_breaker_config or CircuitBreakerConfig()) + if enable_circuit_breaker + else None + ) + + def should_retry(self, exception: Exception, response: Optional[requests.Response] = None) -> bool: + """Determine if request should be retried.""" + # Check exception types + if any(isinstance(exception, exc_type) for exc_type in self.retry_config.retry_on_exceptions): + return True + + # Check status codes + if response and response.status_code in self.retry_config.retry_on_status_codes: + return True + + return False + + def get_retry_after(self, response: Optional[requests.Response]) -> Optional[float]: + """Extract Retry-After header value.""" + if not response: + return None + + retry_after = response.headers.get("Retry-After") + if retry_after: + try: + return float(retry_after) + except ValueError: + # Could be HTTP date format, but we'll skip that complexity for now + pass + + return None + + +# Export main components +__all__ = [ + "RetryStrategy", + "CircuitState", + "RetryConfig", + "CircuitBreakerConfig", + "RetryCalculator", + "CircuitBreaker", + "ResilientRequest", +] diff --git a/tests/test_resilience.py b/tests/test_resilience.py new file mode 100644 index 0000000..61f33ce --- /dev/null +++ b/tests/test_resilience.py @@ -0,0 +1,339 @@ +""" +Tests for the resilience module. +""" + +import time +from unittest.mock import Mock, patch + +import pytest +import responses + +from langbase.errors import APIConnectionError, APIError, RateLimitError +from langbase.resilience import ( + CircuitBreaker, + CircuitBreakerConfig, + CircuitState, + ResilientRequest, + RetryCalculator, + RetryConfig, + RetryStrategy, +) + + +class TestRetryCalculator: + """Test retry delay calculations.""" + + def test_fixed_strategy(self): + """Test fixed delay strategy.""" + config = RetryConfig(strategy=RetryStrategy.FIXED, base_delay=2.0, jitter=False) + + assert RetryCalculator.calculate_delay(1, config) == 2.0 + assert RetryCalculator.calculate_delay(3, config) == 2.0 + assert RetryCalculator.calculate_delay(5, config) == 2.0 + + def test_linear_strategy(self): + """Test linear delay strategy.""" + config = RetryConfig(strategy=RetryStrategy.LINEAR, base_delay=1.0, jitter=False) + + assert RetryCalculator.calculate_delay(1, config) == 1.0 + assert RetryCalculator.calculate_delay(2, config) == 2.0 + assert RetryCalculator.calculate_delay(3, config) == 3.0 + + def test_exponential_strategy(self): + """Test exponential delay strategy.""" + config = RetryConfig( + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, + multiplier=2.0, + jitter=False + ) + + assert RetryCalculator.calculate_delay(1, config) == 1.0 + assert RetryCalculator.calculate_delay(2, config) == 2.0 + assert RetryCalculator.calculate_delay(3, config) == 4.0 + assert RetryCalculator.calculate_delay(4, config) == 8.0 + + def test_fibonacci_strategy(self): + """Test Fibonacci delay strategy.""" + config = RetryConfig(strategy=RetryStrategy.FIBONACCI, base_delay=1.0, jitter=False) + + assert RetryCalculator.calculate_delay(1, config) == 1.0 + assert RetryCalculator.calculate_delay(2, config) == 1.0 + assert RetryCalculator.calculate_delay(3, config) == 2.0 + assert RetryCalculator.calculate_delay(4, config) == 3.0 + assert RetryCalculator.calculate_delay(5, config) == 5.0 + + def test_max_delay_limit(self): + """Test that delays are capped at max_delay.""" + config = RetryConfig( + strategy=RetryStrategy.EXPONENTIAL, + base_delay=1.0, + multiplier=2.0, + max_delay=5.0, + jitter=False + ) + + assert RetryCalculator.calculate_delay(10, config) == 5.0 + + def test_retry_after_header(self): + """Test that Retry-After header is respected.""" + config = RetryConfig(respect_retry_after=True, max_retry_after=300.0) + + # Should use retry_after value + assert RetryCalculator.calculate_delay(1, config, retry_after=10.0) == 10.0 + + # Should cap at max_retry_after + assert RetryCalculator.calculate_delay(1, config, retry_after=500.0) == 300.0 + + def test_jitter_adds_randomness(self): + """Test that jitter adds randomness to delays.""" + config = RetryConfig(strategy=RetryStrategy.FIXED, base_delay=2.0, jitter=True) + + delays = [RetryCalculator.calculate_delay(1, config) for _ in range(10)] + + # All delays should be between 1.0 and 2.0 (50% to 100% of base) + assert all(1.0 <= delay <= 2.0 for delay in delays) + + # Should have some variation (not all the same) + assert len(set(delays)) > 1 + + +class TestCircuitBreaker: + """Test circuit breaker functionality.""" + + def test_initial_state_closed(self): + """Test circuit breaker starts in closed state.""" + config = CircuitBreakerConfig() + breaker = CircuitBreaker(config) + + assert breaker.state == CircuitState.CLOSED + assert breaker.should_allow_request() is True + + def test_failure_threshold_opens_circuit(self): + """Test that reaching failure threshold opens circuit.""" + config = CircuitBreakerConfig(failure_threshold=3) + breaker = CircuitBreaker(config) + + # Record failures up to threshold + for _ in range(3): + breaker.record_failure(APIConnectionError()) + + assert breaker.state == CircuitState.OPEN + assert breaker.should_allow_request() is False + + def test_recovery_timeout_allows_half_open(self): + """Test that recovery timeout allows half-open state.""" + config = CircuitBreakerConfig(failure_threshold=1, recovery_timeout=0.1) + breaker = CircuitBreaker(config) + + # Open the circuit + breaker.record_failure(APIConnectionError()) + assert breaker.state == CircuitState.OPEN + + # Wait for recovery timeout + time.sleep(0.2) + + # Should allow request and move to half-open + assert breaker.should_allow_request() is True + assert breaker.state == CircuitState.HALF_OPEN + + def test_success_in_half_open_closes_circuit(self): + """Test that successes in half-open state close circuit.""" + config = CircuitBreakerConfig(failure_threshold=1, recovery_timeout=0.1, success_threshold=2) + breaker = CircuitBreaker(config) + + # Open the circuit + breaker.record_failure(APIConnectionError()) + time.sleep(0.2) + breaker.should_allow_request() # Move to half-open + + # Record successes + breaker.record_success() + assert breaker.state == CircuitState.HALF_OPEN + + breaker.record_success() + assert breaker.state == CircuitState.CLOSED + + def test_only_configured_exceptions_trigger_failure(self): + """Test that only configured exceptions trigger failures.""" + config = CircuitBreakerConfig( + failure_threshold=1, + failure_exceptions=[APIConnectionError] + ) + breaker = CircuitBreaker(config) + + # This should trigger failure + breaker.record_failure(APIConnectionError()) + assert breaker.state == CircuitState.OPEN + + # Reset for next test + breaker.state = CircuitState.CLOSED + breaker.failure_count = 0 + + # This should not trigger failure + breaker.record_failure(ValueError("Not a connection error")) + assert breaker.state == CircuitState.CLOSED + + +class TestResilientRequest: + """Test resilient request functionality.""" + + def test_should_retry_on_configured_exceptions(self): + """Test retry logic for configured exceptions.""" + config = RetryConfig(retry_on_exceptions=[APIConnectionError, RateLimitError]) + resilient = ResilientRequest(retry_config=config) + + assert resilient.should_retry(APIConnectionError()) is True + assert resilient.should_retry(RateLimitError()) is True + assert resilient.should_retry(ValueError()) is False + + def test_should_retry_on_configured_status_codes(self): + """Test retry logic for configured status codes.""" + config = RetryConfig(retry_on_status_codes=[429, 500, 502]) + resilient = ResilientRequest(retry_config=config) + + mock_response = Mock() + mock_response.status_code = 429 + assert resilient.should_retry(APIError(), mock_response) is True + + mock_response.status_code = 404 + assert resilient.should_retry(APIError(), mock_response) is False + + def test_get_retry_after_header(self): + """Test extraction of Retry-After header.""" + resilient = ResilientRequest() + + mock_response = Mock() + mock_response.headers = {"Retry-After": "30"} + assert resilient.get_retry_after(mock_response) == 30.0 + + mock_response.headers = {} + assert resilient.get_retry_after(mock_response) is None + + assert resilient.get_retry_after(None) is None + + +class TestRetryConfig: + """Test RetryConfig functionality.""" + + def test_calculate_delay_method(self): + """Test that RetryConfig.calculate_delay works correctly.""" + config = RetryConfig(strategy=RetryStrategy.FIXED, base_delay=2.0, jitter=False) + + assert config.calculate_delay(1) == 2.0 + assert config.calculate_delay(3) == 2.0 + + def test_calculate_delay_with_retry_after(self): + """Test calculate_delay with retry_after parameter.""" + config = RetryConfig(respect_retry_after=True) + + # Should use retry_after when provided + assert config.calculate_delay(1, retry_after=5.0) == 5.0 + + +class TestRequestIntegration: + """Test resilience integration with Request class.""" + + @responses.activate + def test_resilient_request_retries_on_failure(self): + """Test that resilient requests retry on configured failures.""" + from langbase.request import Request + from langbase.resilience import RetryConfig, RetryStrategy + + # Mock a failing then successful response + responses.add( + responses.GET, + "https://api.langbase.com/test", + status=500, + json={"error": "Internal server error"} + ) + responses.add( + responses.GET, + "https://api.langbase.com/test", + status=200, + json={"success": True} + ) + + retry_config = RetryConfig( + max_attempts=3, + strategy=RetryStrategy.FIXED, + base_delay=0.01, # Very short delay for testing + jitter=False, + retry_on_status_codes=[500] + ) + + request = Request({ + "api_key": "test-key", + "base_url": "https://api.langbase.com", + "retry_config": retry_config, + "enable_resilience": True + }) + + # Should succeed after retry + response = request.make_resilient_request( + "https://api.langbase.com/test", + "GET", + {"Authorization": "Bearer test-key"} + ) + + assert response.status_code == 200 + assert len(responses.calls) == 2 # First failed, second succeeded + + @responses.activate + def test_circuit_breaker_opens_after_failures(self): + """Test that circuit breaker opens after configured failures.""" + from langbase.request import Request + from langbase.resilience import RetryConfig, CircuitBreakerConfig + from langbase.errors import APIError + + # Mock multiple failing responses + for _ in range(5): + responses.add( + responses.GET, + "https://api.langbase.com/test", + status=500, + json={"error": "Internal server error"} + ) + + retry_config = RetryConfig(max_attempts=1) # No retries for this test + circuit_config = CircuitBreakerConfig( + failure_threshold=3, + failure_exceptions=[APIError] # Include APIError in circuit breaker triggers + ) + + request = Request({ + "api_key": "test-key", + "base_url": "https://api.langbase.com", + "retry_config": retry_config, + "circuit_breaker_config": circuit_config, + "enable_resilience": True + }) + + # Make requests until circuit opens + for _ in range(3): + with pytest.raises(APIError): + request.make_resilient_request( + "https://api.langbase.com/test", + "GET", + {"Authorization": "Bearer test-key"} + ) + + # Circuit should now be open - next request should fail immediately + with pytest.raises(APIError, match="Circuit breaker is open"): + request.make_resilient_request( + "https://api.langbase.com/test", + "GET", + {"Authorization": "Bearer test-key"} + ) + + def test_resilience_disabled(self): + """Test that resilience can be disabled.""" + from langbase.request import Request + + request = Request({ + "api_key": "test-key", + "base_url": "https://api.langbase.com", + "enable_resilience": False + }) + + assert request.resilient_request is None