Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion optillm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Version information
__version__ = "0.2.5"
__version__ = "0.2.6"

# Import from server module
from .server import (
Expand Down
28 changes: 28 additions & 0 deletions optillm/plugins/proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ providers:

routing:
strategy: weighted # Options: weighted, round_robin, failover

timeouts:
request: 30 # Maximum seconds to wait for a provider response
connect: 5 # Maximum seconds to wait for connection

queue:
max_concurrent: 100 # Maximum concurrent requests to prevent overload
timeout: 60 # Maximum seconds a request can wait in queue
```

### 2. Start OptiLLM Server
Expand Down Expand Up @@ -161,6 +169,26 @@ routing:
timeout: 5 # Timeout for health check requests
```

### Timeout and Queue Management

Prevent request queue backup and handle slow/unresponsive backends:

```yaml
timeouts:
request: 30 # Maximum seconds to wait for provider response (default: 30)
connect: 5 # Maximum seconds for initial connection (default: 5)

queue:
max_concurrent: 100 # Maximum concurrent requests (default: 100)
timeout: 60 # Maximum seconds in queue before rejection (default: 60)
```

**How it works:**
- **Request Timeout**: Each request to a provider has a maximum time limit. If exceeded, the request is cancelled and the next provider is tried.
- **Queue Management**: Limits concurrent requests to prevent memory exhaustion. New requests wait up to `queue.timeout` seconds before being rejected.
- **Automatic Failover**: When a provider times out, it's marked unhealthy and the request automatically fails over to the next available provider.
- **Protection**: Prevents slow backends from causing queue buildup that can crash the proxy server.

### Environment Variables

The configuration supports flexible environment variable interpolation:
Expand Down
182 changes: 116 additions & 66 deletions optillm/plugins/proxy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import logging
import random
from typing import Dict, List, Any, Optional
import concurrent.futures
import threading
from openai import OpenAI, AzureOpenAI
from optillm.plugins.proxy.routing import RouterFactory
from optillm.plugins.proxy.health import HealthChecker
Expand Down Expand Up @@ -34,13 +36,15 @@ def client(self):
self._client = AzureOpenAI(
api_key=self.api_key,
azure_endpoint=self.base_url,
api_version="2024-02-01"
api_version="2024-02-01",
max_retries=0 # Disable client retries - we handle them
)
else:
# Standard OpenAI-compatible client
self._client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
base_url=self.base_url,
max_retries=0 # Disable client retries - we handle them
)
return self._client

Expand Down Expand Up @@ -97,6 +101,17 @@ def __init__(self, config: Dict, fallback_client=None):
# Start health checking
self.health_checker.start()

# Timeout settings
timeout_config = config.get('timeouts', {})
self.request_timeout = timeout_config.get('request', 30) # Default 30 seconds
self.connect_timeout = timeout_config.get('connect', 5) # Default 5 seconds

# Queue management settings
queue_config = config.get('queue', {})
self.max_concurrent_requests = queue_config.get('max_concurrent', 100)
self.queue_timeout = queue_config.get('timeout', 60) # Max time in queue
self._request_semaphore = threading.Semaphore(self.max_concurrent_requests)

# Monitoring settings
monitoring = config.get('monitoring', {})
self.track_latency = monitoring.get('track_latency', True)
Expand All @@ -123,74 +138,109 @@ def _filter_kwargs(self, kwargs: dict) -> dict:
}
return {k: v for k, v in kwargs.items() if k not in optillm_params}

def _make_request_with_timeout(self, provider, request_kwargs):
"""Make a request with timeout handling"""
# The OpenAI client now supports timeout natively
try:
response = provider.client.chat.completions.create(**request_kwargs)
return response
except Exception as e:
# Check if it's a timeout error
if "timeout" in str(e).lower() or "timed out" in str(e).lower():
raise TimeoutError(f"Request to {provider.name} timed out after {self.proxy_client.request_timeout}s")
raise e

def create(self, **kwargs):
"""Create completion with load balancing and failover"""
model = kwargs.get('model', 'unknown')
attempted_providers = set()
errors = []

# Get healthy providers
healthy_providers = [
p for p in self.proxy_client.active_providers
if p.is_healthy
]
"""Create completion with load balancing, failover, and timeout handling"""
# Check queue capacity
if not self.proxy_client._request_semaphore.acquire(blocking=True, timeout=self.proxy_client.queue_timeout):
raise TimeoutError(f"Request queue timeout after {self.proxy_client.queue_timeout}s - server overloaded")

if not healthy_providers:
logger.warning("No healthy providers, trying fallback providers")
healthy_providers = self.proxy_client.fallback_providers

# Try routing through healthy providers
while healthy_providers:
available_providers = [p for p in healthy_providers if p not in attempted_providers]
if not available_providers:
break

provider = self.proxy_client.router.select(available_providers)
logger.info(f"Router selected provider: {provider.name if provider else 'None'}")
try:
model = kwargs.get('model', 'unknown')
attempted_providers = set()
errors = []

if not provider:
break

attempted_providers.add(provider)
# Get healthy providers
healthy_providers = [
p for p in self.proxy_client.active_providers
if p.is_healthy
]

try:
# Map model name if needed and filter out OptiLLM-specific parameters
request_kwargs = self._filter_kwargs(kwargs.copy())
request_kwargs['model'] = provider.map_model(model)

# Track timing
start_time = time.time()

# Make request
logger.debug(f"Routing to {provider.name}")
response = provider.client.chat.completions.create(**request_kwargs)

# Track success
latency = time.time() - start_time
if self.proxy_client.track_latency:
provider.track_latency(latency)

logger.info(f"Request succeeded via {provider.name} in {latency:.2f}s")
return response
if not healthy_providers:
logger.warning("No healthy providers, trying fallback providers")
healthy_providers = self.proxy_client.fallback_providers

# Try routing through healthy providers
while healthy_providers:
available_providers = [p for p in healthy_providers if p not in attempted_providers]
if not available_providers:
break

provider = self.proxy_client.router.select(available_providers)
logger.info(f"Router selected provider: {provider.name if provider else 'None'}")

except Exception as e:
logger.error(f"Provider {provider.name} failed: {e}")
errors.append((provider.name, str(e)))
if not provider:
break

attempted_providers.add(provider)

# Mark provider as unhealthy
if self.proxy_client.track_errors:
provider.is_healthy = False
provider.last_error = str(e)
try:
# Map model name if needed and filter out OptiLLM-specific parameters
request_kwargs = self._filter_kwargs(kwargs.copy())
request_kwargs['model'] = provider.map_model(model)

# Add timeout to client if supported
request_kwargs['timeout'] = self.proxy_client.request_timeout

# Track timing
start_time = time.time()

# Make request with timeout
logger.debug(f"Routing to {provider.name} with {self.proxy_client.request_timeout}s timeout")
response = self._make_request_with_timeout(provider, request_kwargs)

# Track success
latency = time.time() - start_time
if self.proxy_client.track_latency:
provider.track_latency(latency)

logger.info(f"Request succeeded via {provider.name} in {latency:.2f}s")
return response

except TimeoutError as e:
logger.error(f"Provider {provider.name} timed out: {e}")
errors.append((provider.name, str(e)))

# Mark provider as unhealthy on timeout
if self.proxy_client.track_errors:
provider.is_healthy = False
provider.last_error = f"Timeout: {str(e)}"

except Exception as e:
logger.error(f"Provider {provider.name} failed: {e}")
errors.append((provider.name, str(e)))

# Mark provider as unhealthy
if self.proxy_client.track_errors:
provider.is_healthy = False
provider.last_error = str(e)

# All providers failed, try fallback client
if self.proxy_client.fallback_client:
logger.warning("All proxy providers failed, using fallback client")
try:
return self.proxy_client.fallback_client.chat.completions.create(**self._filter_kwargs(kwargs))
except Exception as e:
errors.append(("fallback_client", str(e)))

# Complete failure
error_msg = f"All providers failed. Errors: {errors}"
logger.error(error_msg)
raise Exception(error_msg)
# All providers failed, try fallback client
if self.proxy_client.fallback_client:
logger.warning("All proxy providers failed, using fallback client")
try:
fallback_kwargs = self._filter_kwargs(kwargs)
fallback_kwargs['timeout'] = self.proxy_client.request_timeout
return self.proxy_client.fallback_client.chat.completions.create(**fallback_kwargs)
except Exception as e:
errors.append(("fallback_client", str(e)))

# Complete failure
error_msg = f"All providers failed. Errors: {errors}"
logger.error(error_msg)
raise Exception(error_msg)

finally:
# Release semaphore to allow next request
self.proxy_client._request_semaphore.release()
28 changes: 28 additions & 0 deletions optillm/plugins/proxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def _apply_defaults(config: Dict) -> Dict:
config.setdefault('providers', [])
config.setdefault('routing', {})
config.setdefault('monitoring', {})
config.setdefault('timeouts', {})
config.setdefault('queue', {})

# Routing defaults
routing = config['routing']
Expand All @@ -154,6 +156,16 @@ def _apply_defaults(config: Dict) -> Dict:
monitoring.setdefault('track_latency', True)
monitoring.setdefault('track_errors', True)

# Timeout defaults
timeouts = config['timeouts']
timeouts.setdefault('request', 30) # 30 seconds for requests
timeouts.setdefault('connect', 5) # 5 seconds for connection

# Queue management defaults
queue = config['queue']
queue.setdefault('max_concurrent', 100) # Max concurrent requests
queue.setdefault('timeout', 60) # Max time waiting in queue

# Provider defaults
for i, provider in enumerate(config['providers']):
provider.setdefault('name', f"provider_{i}")
Expand Down Expand Up @@ -224,6 +236,14 @@ def _create_default(path: Path):
interval: 30 # seconds
timeout: 5 # seconds

timeouts:
request: 30 # Maximum time for a request (seconds)
connect: 5 # Maximum time for connection (seconds)

queue:
max_concurrent: 100 # Maximum concurrent requests
timeout: 60 # Maximum time in queue (seconds)

monitoring:
log_level: INFO
track_latency: true
Expand All @@ -244,6 +264,14 @@ def _get_minimal_config() -> Dict:
'strategy': 'round_robin',
'health_check': {'enabled': False}
},
'timeouts': {
'request': 30,
'connect': 5
},
'queue': {
'max_concurrent': 100,
'timeout': 60
},
'monitoring': {
'log_level': 'INFO',
'track_latency': False,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "optillm"
version = "0.2.5"
version = "0.2.6"
description = "An optimizing inference proxy for LLMs."
readme = "README.md"
license = "Apache-2.0"
Expand Down
Loading