diff --git a/README.md b/README.md index 5505427d..47f4aa83 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,7 @@ pip install -r requirements.txt | Mixture of Agents | `moa` | Combines responses from multiple critiques | | Monte Carlo Tree Search | `mcts` | Uses MCTS for decision-making in chat responses | | PV Game | `pvg` | Applies a prover-verifier game approach at inference time | +| [Deep Confidence](optillm/deepconf) | N/A for proxy | Implements confidence-guided reasoning with multiple intensity levels for enhanced accuracy | | CoT Decoding | N/A for proxy | Implements chain-of-thought decoding to elicit reasoning without explicit prompting | | Entropy Decoding | N/A for proxy | Implements adaptive sampling based on the uncertainty of tokens during generation | | Thinkdeeper | N/A for proxy | Implements the `reasoning_effort` param from OpenAI for reasoning models like DeepSeek R1 | @@ -161,6 +162,7 @@ pip install -r requirements.txt | GenSelect | `genselect` | Generative Solution Selection - generates multiple candidates and selects the best based on quality criteria | | Web Search | `web_search` | Performs Google searches using Chrome automation (Selenium) to gather search results and URLs | | [Deep Research](optillm/plugins/deep_research) | `deep_research` | Implements Test-Time Diffusion Deep Researcher (TTD-DR) for comprehensive research reports using iterative refinement | +| [Proxy](optillm/plugins/proxy) | `proxy` | Load balancing and failover across multiple LLM providers with health monitoring and round-robin routing | We support all major LLM providers and models for inference. You need to set the correct environment variable and the proxy will pick the corresponding client. @@ -703,6 +705,7 @@ python -m pytest tests/ ## References - [Eliciting Fine-Tuned Transformer Capabilities via Inference-Time Techniques](https://arxiv.org/abs/2506.08060) - [AutoThink: efficient inference for reasoning LLMs](https://dx.doi.org/10.2139/ssrn.5253327) - [Implementation](optillm/autothink) +- [Deep Think with Confidence: Confidence-guided reasoning and inference-time scaling](https://arxiv.org/abs/2508.15260) - [Implementation](optillm/deepconf) - [Self-Discover: Large Language Models Self-Compose Reasoning Structures ](https://arxiv.org/abs/2402.03620) - [Implementation](optillm/plugings/deepthink) - [CePO: Empowering Llama with Reasoning using Test-Time Compute](https://cerebras.ai/blog/cepo) - [Implementation](optillm/cepo) diff --git a/optillm/__init__.py b/optillm/__init__.py index 129977a6..090f917a 100644 --- a/optillm/__init__.py +++ b/optillm/__init__.py @@ -1,5 +1,5 @@ # Version information -__version__ = "0.2.2" +__version__ = "0.2.3" # Import from server module from .server import ( diff --git a/optillm/plugins/proxy/README.md b/optillm/plugins/proxy/README.md new file mode 100644 index 00000000..395e3e47 --- /dev/null +++ b/optillm/plugins/proxy/README.md @@ -0,0 +1,422 @@ +# OptiLLM Proxy Plugin + +A sophisticated load balancing and failover plugin for OptiLLM that distributes requests across multiple LLM providers. + +## Features + +- πŸ”„ **Load Balancing**: Distribute requests across multiple providers using weighted, round-robin, or failover strategies +- πŸ₯ **Health Monitoring**: Automatic health checks with provider failover +- πŸ”Œ **Universal Compatibility**: Works with any OptiLLM approach or plugin +- 🌍 **Environment Variables**: Secure configuration with environment variable support +- πŸ“Š **Performance Tracking**: Monitor latency and errors per provider +- πŸ—ΊοΈ **Model Mapping**: Map model names to provider-specific deployments + +## Quick Start + +### 1. Basic Setup + +Create `~/.optillm/proxy_config.yaml`: + +```yaml +providers: + - name: primary + base_url: https://api.openai.com/v1 + api_key: ${OPENAI_API_KEY} + weight: 2 + + - name: backup + base_url: https://api.openai.com/v1 + api_key: ${OPENAI_API_KEY_BACKUP} + weight: 1 + +routing: + strategy: weighted +``` + +### 2. Usage Examples + +#### Standalone Proxy +```bash +# Route requests through proxy +curl -X POST http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "proxy-gpt-4", + "messages": [{"role": "user", "content": "Hello"}] + }' +``` + +#### Proxy with Approach/Plugin +```bash +# Use MOA approach with proxy load balancing +curl -X POST http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4", + "messages": [{"role": "user", "content": "Solve this problem"}], + "extra_body": { + "optillm_approach": "proxy", + "proxy_wrap": "moa" + } + }' + +# Use memory plugin with proxy +curl -X POST http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4", + "messages": [{"role": "user", "content": "Remember this"}], + "extra_body": { + "optillm_approach": "proxy", + "proxy_wrap": "memory" + } + }' +``` + +#### Combined Approaches +```bash +# Apply BON sampling, then route through proxy +curl -X POST http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "bon&proxy-gpt-4", + "messages": [{"role": "user", "content": "Generate ideas"}] + }' +``` + +## Configuration Reference + +### Provider Configuration + +Each provider supports the following options: + +```yaml +providers: + - name: provider_name # Required: Unique identifier + base_url: https://api.url/v1 # Required: API endpoint + api_key: ${ENV_VAR} # Required: API key (supports env vars) + weight: 2 # Optional: Weight for weighted routing (default: 1) + fallback_only: false # Optional: Use only when primary providers fail + model_map: # Optional: Map model names + gpt-4: gpt-4-deployment + gpt-3.5-turbo: gpt-35-turbo +``` + +### Routing Strategies + +```yaml +routing: + strategy: weighted # Options: weighted, round_robin, failover + + # Health check configuration + health_check: + enabled: true # Enable/disable health checks + interval: 30 # Seconds between checks + timeout: 5 # Timeout for health check requests +``` + +### Environment Variables + +The configuration supports flexible environment variable interpolation: + +```yaml +# Simple substitution +api_key: ${OPENAI_API_KEY} + +# With default value +base_url: ${CUSTOM_ENDPOINT:-https://api.openai.com/v1} + +# Nested variables +api_key: ${ENV_PREFIX}_API_KEY +``` + +## Advanced Usage + +### Provider Priority + +Control provider selection priority using weights: + +```yaml +providers: + - name: premium + base_url: https://premium.api/v1 + api_key: ${PREMIUM_KEY} + weight: 5 # Gets 5x more traffic + + - name: standard + base_url: https://standard.api/v1 + api_key: ${STANDARD_KEY} + weight: 1 # Baseline traffic +``` + +### Model-Specific Routing + +Different providers may use different model names: + +```yaml +providers: + - name: azure + base_url: ${AZURE_ENDPOINT} + api_key: ${AZURE_KEY} + model_map: + # Request -> Provider mapping + gpt-4: gpt-4-deployment-001 + gpt-4-turbo: gpt-4-turbo-latest + gpt-3.5-turbo: gpt-35-turbo-deployment +``` + +### Failover Configuration + +Set up primary and backup providers: + +```yaml +providers: + # Primary providers (normal traffic) + - name: primary_1 + base_url: https://api1.com/v1 + api_key: ${KEY_1} + weight: 3 + + - name: primary_2 + base_url: https://api2.com/v1 + api_key: ${KEY_2} + weight: 2 + + # Backup provider (only on failure) + - name: emergency_backup + base_url: https://backup.api/v1 + api_key: ${BACKUP_KEY} + fallback_only: true # Only used when all primary providers fail +``` + +## Monitoring and Debugging + +### Logging + +Enable detailed logging for debugging: + +```yaml +monitoring: + log_level: DEBUG # Options: DEBUG, INFO, WARNING, ERROR + track_latency: true + track_errors: true +``` + +### Health Status + +The proxy automatically monitors provider health. Failed providers are: +1. Marked as unhealthy after errors +2. Excluded from routing +3. Periodically rechecked for recovery +4. Automatically restored when healthy + +### Performance Metrics + +When `track_latency` is enabled, the proxy logs: +- Request latency per provider +- Success/failure rates +- Provider selection patterns + +## Troubleshooting + +### Common Issues + +#### 1. "No healthy providers available" +- Check your API keys are correctly set +- Verify base URLs are accessible +- Review health check logs for specific errors +- Ensure at least one provider is configured + +#### 2. "Provider X constantly failing" +- Check provider-specific API limits +- Verify model names in model_map +- Test the provider's endpoint directly +- Review error logs for details + +#### 3. "Proxy not wrapping approach" +- Ensure using correct extra_body format +- Verify approach/plugin name is correct +- Check that target approach is installed + +### Debug Mode + +Enable debug logging to see detailed routing decisions: + +```bash +export OPTILLM_LOG_LEVEL=DEBUG +python optillm.py +``` + +## Best Practices + +1. **Multiple API Keys**: Use different API keys per provider for better rate limit distribution +2. **Weight Tuning**: Adjust weights based on provider performance and cost +3. **Health Intervals**: Balance between quick failure detection (short) and API overhead (long) +4. **Fallback Providers**: Always configure at least one fallback provider +5. **Environment Security**: Never commit API keys; always use environment variables + +## Performance Tuning + +### For High Throughput +```yaml +routing: + strategy: weighted # Better distribution than round_robin + health_check: + interval: 60 # Reduce health check frequency + timeout: 10 # Allow longer timeout for stability +``` + +### For Low Latency +```yaml +routing: + strategy: failover # Always use fastest provider + health_check: + interval: 10 # Quick failure detection + timeout: 2 # Fast timeout +``` + +### For Cost Optimization +```yaml +providers: + - name: cheap_provider + weight: 10 # Prefer cheaper provider + + - name: expensive_provider + weight: 1 # Minimize usage + fallback_only: true # Or only use on failure +``` + +## Integration Examples + +### With Python SDK +```python +from openai import OpenAI + +client = OpenAI( + base_url="http://localhost:8000/v1", + api_key="dummy" +) + +# Proxy wrapping MOA approach +response = client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": "Hello"}], + extra_body={ + "optillm_approach": "proxy", + "proxy_wrap": "moa" + } +) +``` + +### With LangChain +```python +from langchain.llms import OpenAI + +llm = OpenAI( + openai_api_base="http://localhost:8000/v1", + model_name="proxy-gpt-4" +) + +response = llm("What is the meaning of life?") +``` + +## Supported Providers + +The proxy works with any OpenAI-compatible API: + +- βœ… OpenAI +- βœ… Azure OpenAI +- βœ… Anthropic (via LiteLLM) +- βœ… Google AI (via LiteLLM) +- βœ… Cohere (via LiteLLM) +- βœ… Together AI +- βœ… Anyscale +- βœ… Local models (Ollama, LM Studio, llama.cpp) +- βœ… Any OpenAI-compatible endpoint + +## Configuration Examples + +### Multi-Cloud Setup +```yaml +providers: + - name: openai + base_url: https://api.openai.com/v1 + api_key: ${OPENAI_API_KEY} + weight: 3 + + - name: azure + base_url: ${AZURE_ENDPOINT} + api_key: ${AZURE_API_KEY} + weight: 2 + model_map: + gpt-4: gpt-4-deployment + + - name: together + base_url: https://api.together.xyz/v1 + api_key: ${TOGETHER_API_KEY} + weight: 1 +``` + +### Local Development Setup +```yaml +providers: + - name: local_primary + base_url: http://localhost:8080/v1 + api_key: local + weight: 1 + + - name: local_backup + base_url: http://localhost:8081/v1 + api_key: local + weight: 1 + +routing: + strategy: round_robin + health_check: + enabled: false # Disable for local dev +``` + +### Production Setup +```yaml +providers: + - name: prod_primary + base_url: https://api.openai.com/v1 + api_key: ${PROD_OPENAI_KEY_1} + weight: 5 + + - name: prod_secondary + base_url: https://api.openai.com/v1 + api_key: ${PROD_OPENAI_KEY_2} + weight: 3 + + - name: prod_fallback + base_url: ${FALLBACK_ENDPOINT} + api_key: ${FALLBACK_KEY} + weight: 1 + fallback_only: true + +routing: + strategy: weighted + health_check: + enabled: true + interval: 30 + timeout: 5 + +monitoring: + log_level: WARNING + track_latency: true + track_errors: true +``` + +## Contributing + +To add new routing strategies or features: + +1. Implement new strategy in `routing.py` +2. Add strategy to RouterFactory +3. Update documentation +4. Add tests + +## License + +Part of OptiLLM - see main project license. \ No newline at end of file diff --git a/optillm/plugins/proxy/__init__.py b/optillm/plugins/proxy/__init__.py new file mode 100644 index 00000000..17adbc82 --- /dev/null +++ b/optillm/plugins/proxy/__init__.py @@ -0,0 +1,8 @@ +"""OptiLLM Proxy Plugin - Load balancing and failover for LLM providers""" + +from optillm.plugins.proxy.config import ProxyConfig +from optillm.plugins.proxy.client import ProxyClient +from optillm.plugins.proxy.routing import RouterFactory +from optillm.plugins.proxy.health import HealthChecker + +__all__ = ['ProxyConfig', 'ProxyClient', 'RouterFactory', 'HealthChecker'] \ No newline at end of file diff --git a/optillm/plugins/proxy/approach_handler.py b/optillm/plugins/proxy/approach_handler.py new file mode 100644 index 00000000..4dc5976b --- /dev/null +++ b/optillm/plugins/proxy/approach_handler.py @@ -0,0 +1,182 @@ +""" +Dynamic handler for approaches and plugins - no hardcoding. +""" +import importlib +import importlib.util +import logging +import inspect +from typing import Optional, Tuple, Dict, Any +from pathlib import Path + +logger = logging.getLogger(__name__) + +class ApproachHandler: + """Dynamically handles both approaches and plugins""" + + def __init__(self): + self._approaches_cache = {} + self._plugins_cache = {} + self._discovered = False + + def handle(self, name: str, system_prompt: str, initial_query: str, + client, model: str, request_config: dict = None) -> Optional[Tuple[str, int]]: + """ + Try to handle the given name as an approach or plugin. + Returns None if not found, otherwise returns (response, tokens) + """ + # Lazy discovery + if not self._discovered: + self._discover_handlers() + self._discovered = True + + # Check if it's an approach + if name in self._approaches_cache: + logger.info(f"Routing approach '{name}' through proxy") + handler = self._approaches_cache[name] + return self._execute_handler( + handler, system_prompt, initial_query, client, model, request_config + ) + + # Check if it's a plugin + if name in self._plugins_cache: + logger.info(f"Routing plugin '{name}' through proxy") + handler = self._plugins_cache[name] + return self._execute_handler( + handler, system_prompt, initial_query, client, model, request_config + ) + + logger.debug(f"'{name}' not recognized as approach or plugin") + return None + + def _discover_handlers(self): + """Discover available approaches and plugins dynamically""" + + # Discover approaches + self._discover_approaches() + + # Discover plugins + self._discover_plugins() + + logger.info(f"Discovered {len(self._approaches_cache)} approaches, " + f"{len(self._plugins_cache)} plugins") + + def _discover_approaches(self): + """Discover built-in approaches from optillm package""" + approach_modules = { + 'mcts': ('optillm.mcts', 'chat_with_mcts'), + 'bon': ('optillm.bon', 'best_of_n_sampling'), + 'moa': ('optillm.moa', 'mixture_of_agents'), + 'rto': ('optillm.rto', 'round_trip_optimization'), + 'self_consistency': ('optillm.self_consistency', 'advanced_self_consistency_approach'), + 'pvg': ('optillm.pvg', 'inference_time_pv_game'), + 'z3': ('optillm.z3_solver', None), # Special case + 'rstar': ('optillm.rstar', None), # Special case + 'cot_reflection': ('optillm.cot_reflection', 'cot_reflection'), + 'plansearch': ('optillm.plansearch', 'plansearch'), + 'leap': ('optillm.leap', 'leap'), + 're2': ('optillm.reread', 're2_approach'), + 'cepo': ('optillm.cepo.cepo', 'cepo'), # CEPO approach + } + + for name, (module_path, func_name) in approach_modules.items(): + try: + module = importlib.import_module(module_path) + + if name == 'z3': + # Special handling for Z3 + solver_class = getattr(module, 'Z3SymPySolverSystem') + self._approaches_cache[name] = lambda s, q, c, m, **kw: \ + solver_class(s, c, m).process_query(q) + elif name == 'rstar': + # Special handling for RStar + rstar_class = getattr(module, 'RStar') + self._approaches_cache[name] = lambda s, q, c, m, **kw: \ + rstar_class(s, c, m, **kw).solve(q) + elif name == 'cepo': + # Special handling for CEPO which needs special config + cepo_func = getattr(module, func_name) + # We'll pass empty CepoConfig for now - it can be enhanced later + self._approaches_cache[name] = cepo_func + else: + if func_name: + self._approaches_cache[name] = getattr(module, func_name) + + except (ImportError, AttributeError) as e: + logger.debug(f"Could not load approach '{name}': {e}") + + def _discover_plugins(self): + """Discover available plugins dynamically""" + try: + import optillm + import os + import glob + + # Get plugin directories + package_dir = Path(optillm.__file__).parent / 'plugins' + + # Find all Python files in plugins directory + plugin_files = [] + if package_dir.exists(): + plugin_files.extend(glob.glob(str(package_dir / '*.py'))) + + for plugin_file in plugin_files: + if '__pycache__' in plugin_file or '__init__' in plugin_file: + continue + + try: + # Extract module name + module_name = Path(plugin_file).stem + + # Skip self + if module_name == 'proxy_plugin': + continue + + # Import module + spec = importlib.util.spec_from_file_location(module_name, plugin_file) + if spec and spec.loader: + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Check if it has required attributes + if hasattr(module, 'SLUG') and hasattr(module, 'run'): + slug = getattr(module, 'SLUG') + run_func = getattr(module, 'run') + self._plugins_cache[slug] = run_func + + except Exception as e: + logger.debug(f"Could not load plugin from {plugin_file}: {e}") + + except Exception as e: + logger.debug(f"Error discovering plugins: {e}") + + def _execute_handler(self, handler, system_prompt: str, initial_query: str, + client, model: str, request_config: dict = None) -> Tuple[str, int]: + """Execute a handler function with proper signature detection""" + try: + # Check function signature + sig = inspect.signature(handler) + params = sig.parameters + + # Build arguments based on signature + args = [system_prompt, initial_query, client, model] + kwargs = {} + + # Check if handler accepts request_config + if 'request_config' in params: + kwargs['request_config'] = request_config + + # Some handlers may accept additional kwargs + if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()): + # Only add safe kwargs that won't conflict + if request_config: + # Filter out parameters that might conflict + safe_kwargs = {k: v for k, v in request_config.items() + if k not in ['model', 'messages', 'system_prompt', 'initial_query']} + kwargs.update(safe_kwargs) + + # Execute handler + return handler(*args, **kwargs) + + except Exception as e: + logger.error(f"Error executing handler: {e}") + raise \ No newline at end of file diff --git a/optillm/plugins/proxy/client.py b/optillm/plugins/proxy/client.py new file mode 100644 index 00000000..3b5a7e0a --- /dev/null +++ b/optillm/plugins/proxy/client.py @@ -0,0 +1,187 @@ +""" +ProxyClient implementation for load balancing across multiple LLM providers. +""" +import time +import logging +import random +from typing import Dict, List, Any, Optional +from openai import OpenAI, AzureOpenAI +from optillm.plugins.proxy.routing import RouterFactory +from optillm.plugins.proxy.health import HealthChecker + +logger = logging.getLogger(__name__) + +class Provider: + """Wrapper for a provider configuration and client""" + def __init__(self, config: Dict): + self.name = config['name'] + self.base_url = config['base_url'] + self.api_key = config['api_key'] + self.weight = config.get('weight', 1) + self.fallback_only = config.get('fallback_only', False) + self.model_map = config.get('model_map', {}) + self._client = None + self.is_healthy = True + self.last_error = None + self.latencies = [] # Track recent latencies + + @property + def client(self): + """Lazy initialization of OpenAI client""" + if not self._client: + if 'azure' in self.base_url.lower(): + # Handle Azure OpenAI + self._client = AzureOpenAI( + api_key=self.api_key, + azure_endpoint=self.base_url, + api_version="2024-02-01" + ) + else: + # Standard OpenAI-compatible client + self._client = OpenAI( + api_key=self.api_key, + base_url=self.base_url + ) + return self._client + + def map_model(self, model: str) -> str: + """Map requested model to provider-specific name""" + return self.model_map.get(model, model) + + def track_latency(self, latency: float): + """Track request latency""" + self.latencies.append(latency) + if len(self.latencies) > 10: + self.latencies.pop(0) + + def avg_latency(self) -> float: + """Get average latency""" + if not self.latencies: + return 0 + return sum(self.latencies) / len(self.latencies) + +class ProxyClient: + """OpenAI-compatible client that proxies to multiple providers""" + + def __init__(self, config: Dict, fallback_client=None): + self.config = config + self.fallback_client = fallback_client + + # Initialize providers + self.providers = [ + Provider(p) for p in config.get('providers', []) + ] + + # Filter out fallback-only providers for normal routing + self.active_providers = [ + p for p in self.providers if not p.fallback_only + ] + + self.fallback_providers = [ + p for p in self.providers if p.fallback_only + ] + + # Initialize router + strategy = config.get('routing', {}).get('strategy', 'round_robin') + self.router = RouterFactory.create(strategy, self.active_providers) + + # Initialize health checker + health_config = config.get('routing', {}).get('health_check', {}) + self.health_checker = HealthChecker( + providers=self.providers, + enabled=health_config.get('enabled', True), + interval=health_config.get('interval', 30), + timeout=health_config.get('timeout', 5) + ) + + # Start health checking + self.health_checker.start() + + # Monitoring settings + monitoring = config.get('monitoring', {}) + self.track_latency = monitoring.get('track_latency', True) + self.track_errors = monitoring.get('track_errors', True) + + # Create chat namespace + self.chat = self._Chat(self) + + class _Chat: + def __init__(self, proxy_client): + self.proxy_client = proxy_client + self.completions = proxy_client._Completions(proxy_client) + + class _Completions: + def __init__(self, proxy_client): + self.proxy_client = proxy_client + + def create(self, **kwargs): + """Create completion with load balancing and failover""" + model = kwargs.get('model', 'unknown') + attempted_providers = set() + errors = [] + + # Get healthy providers + healthy_providers = [ + p for p in self.proxy_client.active_providers + if p.is_healthy + ] + + if not healthy_providers: + logger.warning("No healthy providers, trying fallback providers") + healthy_providers = self.proxy_client.fallback_providers + + # Try routing through healthy providers + while healthy_providers: + available_providers = [p for p in healthy_providers if p not in attempted_providers] + if not available_providers: + break + + provider = self.proxy_client.router.select(available_providers) + logger.info(f"Router selected provider: {provider.name if provider else 'None'}") + + if not provider: + break + + attempted_providers.add(provider) + + try: + # Map model name if needed + request_kwargs = kwargs.copy() + request_kwargs['model'] = provider.map_model(model) + + # Track timing + start_time = time.time() + + # Make request + logger.debug(f"Routing to {provider.name}") + response = provider.client.chat.completions.create(**request_kwargs) + + # Track success + latency = time.time() - start_time + if self.proxy_client.track_latency: + provider.track_latency(latency) + + logger.info(f"Request succeeded via {provider.name} in {latency:.2f}s") + return response + + except Exception as e: + logger.error(f"Provider {provider.name} failed: {e}") + errors.append((provider.name, str(e))) + + # Mark provider as unhealthy + if self.proxy_client.track_errors: + provider.is_healthy = False + provider.last_error = str(e) + + # All providers failed, try fallback client + if self.proxy_client.fallback_client: + logger.warning("All proxy providers failed, using fallback client") + try: + return self.proxy_client.fallback_client.chat.completions.create(**kwargs) + except Exception as e: + errors.append(("fallback_client", str(e))) + + # Complete failure + error_msg = f"All providers failed. Errors: {errors}" + logger.error(error_msg) + raise Exception(error_msg) \ No newline at end of file diff --git a/optillm/plugins/proxy/config.py b/optillm/plugins/proxy/config.py new file mode 100644 index 00000000..38031f3e --- /dev/null +++ b/optillm/plugins/proxy/config.py @@ -0,0 +1,252 @@ +""" +Configuration management for the proxy plugin. +Handles loading, validation, and environment variable interpolation. +""" +import os +import re +import yaml +import logging +from pathlib import Path +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + +class ProxyConfig: + """Manages proxy configuration with caching and validation.""" + + _cached_config: Optional[Dict[str, Any]] = None + _config_path: Optional[Path] = None + + @classmethod + def load(cls, path: str = None, force_reload: bool = False) -> Dict[str, Any]: + """ + Load and cache configuration. + + Args: + path: Optional path to config file + force_reload: Force reload even if cached + + Returns: + Loaded and validated configuration dictionary + """ + if cls._cached_config and not force_reload: + return cls._cached_config + + if not path: + # Priority order for config files + config_locations = [ + Path.home() / ".optillm" / "proxy_config.yaml", + Path.home() / ".optillm" / "proxy_config.yml", + Path(__file__).parent / "example_config.yaml", + ] + + for config_path in config_locations: + if config_path.exists(): + path = config_path + logger.info(f"Using config from: {path}") + break + else: + # No config found, create default + path = config_locations[0] + cls._create_default(path) + + cls._config_path = Path(path) + + try: + with open(path, 'r') as f: + config = yaml.safe_load(f) or {} + + # Validate structure + if not isinstance(config, dict): + raise ValueError("Configuration must be a dictionary") + + # Interpolate environment variables + config = cls._interpolate_env_vars(config) + + # Apply defaults and validate + config = cls._apply_defaults(config) + config = cls._validate_config(config) + + cls._cached_config = config + logger.debug(f"Loaded config with {len(config.get('providers', []))} providers") + return config + + except Exception as e: + logger.error(f"Failed to load proxy config from {path}: {e}") + return cls._get_minimal_config() + + @classmethod + def reload(cls) -> Dict[str, Any]: + """Force reload configuration from disk.""" + return cls.load(force_reload=True) + + @staticmethod + def _interpolate_env_vars(obj: Any) -> Any: + """ + Recursively replace ${VAR} and ${VAR:-default} with environment values. + + Args: + obj: Object to process (dict, list, str, or other) + + Returns: + Processed object with environment variables replaced + """ + if isinstance(obj, str): + # Pattern for ${VAR} or ${VAR:-default} + pattern = re.compile(r'\$\{([^}]+)\}') + + def replacer(match): + var_expr = match.group(1) + + # Handle ${VAR:-default} syntax + if ':-' in var_expr: + var_name, default = var_expr.split(':-', 1) + value = os.environ.get(var_name.strip(), default) + else: + var_name = var_expr.strip() + value = os.environ.get(var_name) + + if value is None: + logger.warning(f"Environment variable ${{{var_name}}} not set") + return match.group(0) # Keep original + + return value + + return pattern.sub(replacer, obj) + + elif isinstance(obj, dict): + return {k: ProxyConfig._interpolate_env_vars(v) for k, v in obj.items()} + + elif isinstance(obj, list): + return [ProxyConfig._interpolate_env_vars(item) for item in obj] + + return obj + + @staticmethod + def _apply_defaults(config: Dict) -> Dict: + """ + Apply sensible defaults to configuration. + + Args: + config: Configuration dictionary + + Returns: + Configuration with defaults applied + """ + # Ensure main sections exist + config.setdefault('providers', []) + config.setdefault('routing', {}) + config.setdefault('monitoring', {}) + + # Routing defaults + routing = config['routing'] + routing.setdefault('strategy', 'round_robin') + routing.setdefault('health_check', {}) + + health_check = routing['health_check'] + health_check.setdefault('enabled', True) + health_check.setdefault('interval', 30) + health_check.setdefault('timeout', 5) + + # Monitoring defaults + monitoring = config['monitoring'] + monitoring.setdefault('log_level', 'INFO') + monitoring.setdefault('track_latency', True) + monitoring.setdefault('track_errors', True) + + # Provider defaults + for i, provider in enumerate(config['providers']): + provider.setdefault('name', f"provider_{i}") + provider.setdefault('weight', 1) + provider.setdefault('fallback_only', False) + provider.setdefault('model_map', {}) + + return config + + @staticmethod + def _validate_config(config: Dict) -> Dict: + """ + Validate configuration structure and values. + + Args: + config: Configuration to validate + + Returns: + Validated configuration + + Raises: + ValueError: If configuration is invalid + """ + # Validate providers + for provider in config.get('providers', []): + if 'base_url' not in provider: + raise ValueError(f"Provider {provider.get('name', 'unknown')} missing base_url") + if 'api_key' not in provider: + raise ValueError(f"Provider {provider.get('name', 'unknown')} missing api_key") + + # Validate weight + if provider['weight'] <= 0: + logger.warning(f"Provider {provider['name']} has invalid weight {provider['weight']}, setting to 1") + provider['weight'] = 1 + + # Validate routing strategy + valid_strategies = ['weighted', 'round_robin', 'failover'] + strategy = config['routing']['strategy'] + if strategy not in valid_strategies: + logger.warning(f"Invalid routing strategy '{strategy}', using 'round_robin'") + config['routing']['strategy'] = 'round_robin' + + return config + + @staticmethod + def _create_default(path: Path): + """Create default configuration file.""" + path.parent.mkdir(parents=True, exist_ok=True) + + default = """# OptiLLM Proxy Plugin Configuration +# +# This is an auto-generated configuration file. +# Add your LLM provider endpoints and API keys below. +# +# Environment variables are supported: ${VAR_NAME} or ${VAR_NAME:-default_value} + +providers: + # Example OpenAI provider (uncomment and configure) + # - name: openai_primary + # base_url: https://api.openai.com/v1 + # api_key: ${OPENAI_API_KEY} + # weight: 1 + +routing: + strategy: round_robin # Options: weighted, round_robin, failover + health_check: + enabled: true + interval: 30 # seconds + timeout: 5 # seconds + +monitoring: + log_level: INFO + track_latency: true + track_errors: true + +# See proxy/README.md for full documentation +""" + path.write_text(default) + logger.info(f"Created default proxy config at {path}") + logger.info("Please configure your providers in this file") + + @staticmethod + def _get_minimal_config() -> Dict: + """Return minimal working config as fallback.""" + return { + 'providers': [], + 'routing': { + 'strategy': 'round_robin', + 'health_check': {'enabled': False} + }, + 'monitoring': { + 'log_level': 'INFO', + 'track_latency': False, + 'track_errors': True + } + } \ No newline at end of file diff --git a/optillm/plugins/proxy/example_config.yaml b/optillm/plugins/proxy/example_config.yaml new file mode 100644 index 00000000..a1b89346 --- /dev/null +++ b/optillm/plugins/proxy/example_config.yaml @@ -0,0 +1,81 @@ +# OptiLLM Proxy Plugin - Example Configuration +# +# Copy this file to ~/.optillm/proxy_config.yaml and customize for your setup +# Environment variables are supported using ${VAR} or ${VAR:-default} syntax + +# List of LLM providers to load balance across +providers: + # Primary OpenAI endpoint + - name: openai_primary + base_url: https://api.openai.com/v1 + api_key: ${OPENAI_API_KEY} + weight: 3 # Higher weight = more traffic + + # Secondary OpenAI with different API key + - name: openai_secondary + base_url: https://api.openai.com/v1 + api_key: ${OPENAI_API_KEY_2:-${OPENAI_API_KEY}} # Fallback to primary key + weight: 2 + + # Azure OpenAI endpoint + - name: azure_gpt4 + base_url: ${AZURE_ENDPOINT:-https://your-resource.openai.azure.com} + api_key: ${AZURE_API_KEY} + weight: 2 + # Map model names for Azure deployments + model_map: + gpt-4: gpt-4-deployment + gpt-4-turbo: gpt-4-turbo-deployment + gpt-3.5-turbo: gpt-35-turbo-deployment + + # Local LLM server (Ollama, LM Studio, llama.cpp, etc.) + - name: local_llm + base_url: http://localhost:8080/v1 + api_key: dummy # Some servers require a dummy key + weight: 1 + fallback_only: true # Only use when primary providers fail + + # Together AI + - name: together_ai + base_url: https://api.together.xyz/v1 + api_key: ${TOGETHER_API_KEY} + weight: 1 + + # Anyscale Endpoints + - name: anyscale + base_url: https://api.endpoints.anyscale.com/v1 + api_key: ${ANYSCALE_API_KEY} + weight: 1 + +# Routing configuration +routing: + # Strategy for selecting providers + # Options: weighted, round_robin, failover + strategy: weighted + + # Health check configuration + health_check: + enabled: true + interval: 30 # Check every 30 seconds + timeout: 5 # Timeout for health checks + +# Monitoring and logging +monitoring: + log_level: INFO # DEBUG, INFO, WARNING, ERROR + track_latency: true # Track request latencies + track_errors: true # Track and log errors + +# Example usage: +# +# Direct proxy routing: +# model: "proxy-gpt-4" +# +# Proxy with approaches (using extra_body): +# model: "gpt-4" +# extra_body: { +# "optillm_approach": "proxy", +# "proxy_wrap": "moa" +# } +# +# Combined approaches: +# model: "bon&proxy-gpt-4" \ No newline at end of file diff --git a/optillm/plugins/proxy/health.py b/optillm/plugins/proxy/health.py new file mode 100644 index 00000000..40286d0e --- /dev/null +++ b/optillm/plugins/proxy/health.py @@ -0,0 +1,64 @@ +""" +Health checking functionality for providers. +""" +import time +import logging +import threading +from typing import List + +logger = logging.getLogger(__name__) + +class HealthChecker: + """Background health checker for providers""" + + def __init__(self, providers: List, enabled: bool = True, + interval: int = 30, timeout: int = 5): + self.providers = providers + self.enabled = enabled + self.interval = interval + self.timeout = timeout + self.running = False + self.thread = None + + def start(self): + """Start health checking in background""" + if not self.enabled: + return + + self.running = True + self.thread = threading.Thread(target=self._check_loop, daemon=True) + self.thread.start() + logger.info(f"Health checker started (interval: {self.interval}s)") + + def stop(self): + """Stop health checking""" + self.running = False + if self.thread: + self.thread.join(timeout=1) + + def _check_loop(self): + """Main health check loop""" + while self.running: + for provider in self.providers: + self._check_provider(provider) + time.sleep(self.interval) + + def _check_provider(self, provider): + """Check health of a single provider""" + try: + # Simple health check - try to get models + # This creates a minimal request to verify the endpoint is responsive + response = provider.client.models.list() + + # Mark as healthy + if not provider.is_healthy: + logger.info(f"Provider {provider.name} is now healthy") + provider.is_healthy = True + provider.last_error = None + + except Exception as e: + # Mark as unhealthy + if provider.is_healthy: + logger.warning(f"Provider {provider.name} failed health check: {e}") + provider.is_healthy = False + provider.last_error = str(e) \ No newline at end of file diff --git a/optillm/plugins/proxy/routing.py b/optillm/plugins/proxy/routing.py new file mode 100644 index 00000000..e330ab06 --- /dev/null +++ b/optillm/plugins/proxy/routing.py @@ -0,0 +1,116 @@ +""" +Routing strategies for load balancing across providers. +""" +import random +import logging +from typing import List, Optional +from abc import ABC, abstractmethod + +# Configure logging for this module +logger = logging.getLogger(__name__) +# Ensure we show debug messages +logging.basicConfig() +logger.setLevel(logging.DEBUG) + +class Router(ABC): + """Abstract base class for routing strategies""" + + @abstractmethod + def select(self, providers: List) -> Optional: + """Select a provider from the list""" + pass + +class RoundRobinRouter(Router): + """Round-robin routing strategy""" + + def __init__(self, providers: List): + self.all_providers = providers + self.index = 0 + + def select(self, providers: List) -> Optional: + if not providers: + logger.debug("Round-robin: No providers available") + return None + + # If only one provider available, return it + if len(providers) == 1: + logger.debug(f"Round-robin: Only one provider: {providers[0].name}") + return providers[0] + + logger.debug(f"Round-robin: Starting selection, index={self.index}, providers={[p.name for p in providers]}") + + # Find next available provider in round-robin fashion + start_index = self.index + attempts = 0 + while attempts < len(self.all_providers): + # Get provider at current index from all providers + current_provider = self.all_providers[self.index % len(self.all_providers)] + next_index = (self.index + 1) % len(self.all_providers) + + logger.debug(f"Round-robin: Checking provider {current_provider.name} at index {self.index}") + + # Update index for next call + self.index = next_index + + # If this provider is in the available list, use it + if current_provider in providers: + logger.debug(f"Round-robin: Selected provider {current_provider.name}") + return current_provider + + attempts += 1 + + # If we've cycled through all providers, just return first available + logger.debug(f"Round-robin: Fallback to first available: {providers[0].name}") + return providers[0] + +class WeightedRouter(Router): + """Weighted random routing strategy""" + + def __init__(self, providers: List): + self.providers = providers + + def select(self, providers: List) -> Optional: + if not providers: + return None + + # Calculate weights + weights = [p.weight for p in providers] + total_weight = sum(weights) + + if total_weight == 0: + return random.choice(providers) + + # Weighted random selection + rand = random.uniform(0, total_weight) + cumulative = 0 + + for provider, weight in zip(providers, weights): + cumulative += weight + if rand <= cumulative: + return provider + + return providers[-1] + +class FailoverRouter(Router): + """Failover routing - always use first healthy provider""" + + def __init__(self, providers: List): + self.providers = providers + + def select(self, providers: List) -> Optional: + # Return first available provider + return providers[0] if providers else None + +class RouterFactory: + """Factory for creating routers""" + + @staticmethod + def create(strategy: str, providers: List) -> Router: + strategies = { + 'round_robin': RoundRobinRouter, + 'weighted': WeightedRouter, + 'failover': FailoverRouter + } + + router_class = strategies.get(strategy, RoundRobinRouter) + return router_class(providers) \ No newline at end of file diff --git a/optillm/plugins/proxy_plugin.py b/optillm/plugins/proxy_plugin.py new file mode 100644 index 00000000..426ae39d --- /dev/null +++ b/optillm/plugins/proxy_plugin.py @@ -0,0 +1,144 @@ +""" +Proxy Plugin for OptiLLM - Load balancing and failover for LLM providers + +This plugin provides intelligent request routing across multiple LLM providers +with health monitoring, failover, and support for wrapping other approaches. +""" +import logging +from typing import Tuple, Optional +from optillm.plugins.proxy.config import ProxyConfig +from optillm.plugins.proxy.client import ProxyClient +from optillm.plugins.proxy.approach_handler import ApproachHandler + +SLUG = "proxy" +logger = logging.getLogger(__name__) + +# Configure logging based on environment +import os +log_level = os.environ.get('OPTILLM_LOG_LEVEL', 'INFO') +logging.basicConfig(level=getattr(logging, log_level)) + +# Global proxy client cache to maintain state between requests +_proxy_client_cache = {} + +def run(system_prompt: str, initial_query: str, client, model: str, + request_config: dict = None) -> Tuple[str, int]: + """ + Main proxy plugin entry point. + + Supports three usage modes: + 1. Standalone proxy: model="proxy-gpt-4" + 2. Wrapping approach: extra_body={"optillm_approach": "proxy", "proxy_wrap": "moa"} + 3. Combined approach: model="bon&proxy-gpt-4" + + Args: + system_prompt: System message for the LLM + initial_query: User's query + client: Original OpenAI client (used as fallback) + model: Model identifier + request_config: Additional request configuration + + Returns: + Tuple of (response_text, token_count) + """ + try: + # Load configuration + config = ProxyConfig.load() + + if not config.get('providers'): + logger.warning("No providers configured, falling back to original client") + response = client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": initial_query} + ] + ) + return response.choices[0].message.content, response.usage.completion_tokens + + # Create or reuse proxy client to maintain state (important for round-robin) + config_key = str(config) # Simple config-based cache key + if config_key not in _proxy_client_cache: + logger.debug("Creating new proxy client instance") + _proxy_client_cache[config_key] = ProxyClient( + config=config, + fallback_client=client + ) + else: + logger.debug("Reusing existing proxy client instance") + + proxy_client = _proxy_client_cache[config_key] + + # Check for wrapped approach in extra_body (recommended method) + wrapped_approach = None + if request_config: + # Support multiple field names for flexibility + wrapped_approach = ( + request_config.get('proxy_wrap') or + request_config.get('wrapped_approach') or + request_config.get('wrap') + ) + + if wrapped_approach: + logger.info(f"Proxy wrapping approach/plugin: {wrapped_approach}") + handler = ApproachHandler() + result = handler.handle( + wrapped_approach, + system_prompt, + initial_query, + proxy_client, # Use proxy client instead of original + model, + request_config + ) + + if result is not None: + return result + else: + logger.warning(f"Approach/plugin '{wrapped_approach}' not found, using direct proxy") + + # Check if model contains an approach pattern (backward compatibility) + if '-' in model and not wrapped_approach: + parts = model.split('-', 1) + potential_approach = parts[0] + actual_model = parts[1] if len(parts) > 1 else model + + # Try to handle as approach or plugin + handler = ApproachHandler() + result = handler.handle( + potential_approach, + system_prompt, + initial_query, + proxy_client, + actual_model, + request_config + ) + + if result is not None: + logger.info(f"Proxy routing approach/plugin: {potential_approach}") + return result + + # Direct proxy execution + logger.info(f"Direct proxy routing for model: {model}") + response = proxy_client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": initial_query} + ], + **(request_config or {}) + ) + + return response.choices[0].message.content, response.usage.completion_tokens + + except Exception as e: + logger.error(f"Proxy plugin error: {e}", exc_info=True) + # Fallback to original client + logger.info("Falling back to original client") + response = client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": initial_query} + ] + ) + return response.choices[0].message.content, response.usage.completion_tokens \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index df090553..74c6479b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "optillm" -version = "0.2.2" +version = "0.2.3" description = "An optimizing inference proxy for LLMs." readme = "README.md" license = "Apache-2.0" @@ -49,6 +49,7 @@ dependencies = [ "mcp", "adaptive-classifier", "datasets", + "PyYAML", "selenium", "webdriver-manager", # MLX support for Apple Silicon optimization @@ -79,6 +80,9 @@ include = ["optillm*"] [tool.setuptools.package-data] optillm = [ "plugins/*.py", + "plugins/proxy/*.py", + "plugins/proxy/*.yaml", + "plugins/proxy/*.md", "cepo/*.py", "cepo/configs/*.yaml", ] \ No newline at end of file