diff --git a/src/rotator_library/client.py b/src/rotator_library/client.py index 098fff09..8b18dee5 100644 --- a/src/rotator_library/client.py +++ b/src/rotator_library/client.py @@ -3122,7 +3122,9 @@ async def get_quota_stats( ) else: group_stats["total_requests_remaining"] = 0 - group_stats["total_remaining_pct"] = None + # Fallback to avg_remaining_pct when max_requests unavailable + # This handles providers like Firmware that only provide percentage + group_stats["total_remaining_pct"] = group_stats.get("avg_remaining_pct") prov_stats["quota_groups"][group_name] = group_stats @@ -3188,14 +3190,22 @@ async def get_quota_stats( requests_remaining = ( max(0, max_req - req_count) if max_req else 0 ) + + # Determine display format + # Priority: requests (if max known) > percentage (if baseline available) > unknown + if max_req: + display = f"{requests_remaining}/{max_req}" + elif remaining_pct is not None: + display = f"{remaining_pct}%" + else: + display = "?/?" + cred["model_groups"][group_name] = { "remaining_pct": remaining_pct, "requests_used": req_count, "requests_remaining": requests_remaining, "requests_max": max_req, - "display": f"{requests_remaining}/{max_req}" - if max_req - else f"?/?", + "display": display, "is_exhausted": is_exhausted, "reset_time_iso": reset_iso, "models": group_models, diff --git a/src/rotator_library/providers/firmware_provider.py b/src/rotator_library/providers/firmware_provider.py new file mode 100644 index 00000000..e71316fa --- /dev/null +++ b/src/rotator_library/providers/firmware_provider.py @@ -0,0 +1,209 @@ +""" +Firmware.ai Provider with Quota Tracking + +Provider implementation for the Firmware.ai API with 5-hour rolling window quota tracking. +Uses the FirmwareQuotaTracker mixin to fetch quota usage from their API. + +Environment variables: + FIRMWARE_API_BASE: API base URL (default: https://app.firmware.ai/api/v1) + FIRMWARE_API_KEY: API key for authentication + FIRMWARE_QUOTA_REFRESH_INTERVAL: Quota refresh interval in seconds (default: 300) +""" + +import asyncio +import httpx +import os +from typing import Any, Dict, List, Optional, TYPE_CHECKING + +from .provider_interface import ProviderInterface +from .utilities.firmware_quota_tracker import FirmwareQuotaTracker + +if TYPE_CHECKING: + from ..usage_manager import UsageManager + +import logging + +lib_logger = logging.getLogger("rotator_library") + +# Concurrency limit for parallel quota fetches +QUOTA_FETCH_CONCURRENCY = 5 + + +class FirmwareProvider(FirmwareQuotaTracker, ProviderInterface): + """ + Provider implementation for the Firmware.ai API with quota tracking. + """ + + # Quota groups for tracking 5-hour rolling window limits + # Uses a virtual model "firmware/_quota" for credential-level quota tracking + model_quota_groups = { + "firmware_global": ["firmware/_quota"], + } + + def __init__(self, *args, **kwargs): + """Initialize FirmwareProvider with quota tracking.""" + super().__init__(*args, **kwargs) + + # Quota tracking cache and refresh interval + self._quota_cache: Dict[str, Dict[str, Any]] = {} + try: + self._quota_refresh_interval = int( + os.environ.get("FIRMWARE_QUOTA_REFRESH_INTERVAL", "300") + ) + except ValueError: + lib_logger.warning( + "Invalid FIRMWARE_QUOTA_REFRESH_INTERVAL value, using default 300" + ) + self._quota_refresh_interval = 300 + + # API base URL (default to Firmware.ai) + self.api_base = os.environ.get( + "FIRMWARE_API_BASE", "https://app.firmware.ai/api/v1" + ) + + def get_model_quota_group(self, model: str) -> Optional[str]: + """ + Get the quota group for a model. + + All Firmware.ai models share the same credential-level quota pool, + so they all belong to the same quota group. + + Args: + model: Model name (ignored - all models share quota) + + Returns: + Quota group identifier for shared credential-level tracking + """ + return "firmware_global" + + def get_models_in_quota_group(self, group: str) -> List[str]: + """ + Get all models in a quota group. + + For Firmware.ai, we use a virtual model "firmware/_quota" to track the + credential-level 5-hour rolling window quota. + + Args: + group: Quota group name + + Returns: + List of model names in the group + """ + if group == "firmware_global": + return ["firmware/_quota"] + return [] + + def get_usage_reset_config(self, credential: str) -> Optional[Dict[str, Any]]: + """ + Return usage reset configuration for Firmware.ai credentials. + + Firmware.ai uses per_model mode to track usage at the model level, + with 5-hour rolling window quotas managed via the background job. + + Args: + credential: The API key (unused, same config for all) + + Returns: + Configuration with per_model mode and 5-hour window + """ + return { + "mode": "per_model", + "window_seconds": 18000, # 5 hours (5-hour rolling window) + "field_name": "models", + } + + async def get_models(self, api_key: str, client: httpx.AsyncClient) -> List[str]: + """ + Fetch available models from the Firmware.ai API. + + Args: + api_key: Firmware.ai API key + client: HTTP client + + Returns: + List of model names prefixed with 'firmware/' + """ + try: + response = await client.get( + f"{self.api_base.rstrip('/')}/models", + headers={"Authorization": f"Bearer {api_key}"}, + ) + response.raise_for_status() + return [ + f"firmware/{model['id']}" for model in response.json().get("data", []) + ] + except (httpx.RequestError, httpx.HTTPStatusError) as e: + lib_logger.error(f"Failed to fetch Firmware.ai models: {e}") + return [] + + # ========================================================================= + # BACKGROUND JOB CONFIGURATION + # ========================================================================= + + def get_background_job_config(self) -> Optional[Dict[str, Any]]: + """ + Configure periodic quota usage refresh. + + Returns: + Background job configuration for quota refresh + """ + return { + "interval": self._quota_refresh_interval, + "name": "firmware_quota_refresh", + "run_on_start": True, + } + + async def run_background_job( + self, + usage_manager: "UsageManager", + credentials: List[str], + ) -> None: + """ + Refresh quota usage for all credentials in parallel. + + Args: + usage_manager: UsageManager instance + credentials: List of API keys + """ + semaphore = asyncio.Semaphore(QUOTA_FETCH_CONCURRENCY) + + async def refresh_single_credential( + api_key: str, client: httpx.AsyncClient + ) -> None: + async with semaphore: + try: + usage_data = await self.fetch_quota_usage(api_key, client) + + if usage_data.get("status") == "success": + # Update quota cache + self._quota_cache[api_key] = usage_data + + # Calculate values for usage manager + remaining_fraction = usage_data.get("remaining_fraction", 0.0) + reset_ts = usage_data.get("reset_at") + + # Store baseline in usage manager + # Since Firmware.ai uses credential-level quota, we use a virtual model name + await usage_manager.update_quota_baseline( + api_key, + "firmware/_quota", # Virtual model for credential-level tracking + remaining_fraction, + # No max_requests - Firmware.ai doesn't expose this + reset_timestamp=reset_ts, + ) + + lib_logger.debug( + f"Updated Firmware.ai quota baseline: " + f"{remaining_fraction * 100:.1f}% remaining, " + f"active_window={usage_data.get('has_active_window', False)}" + ) + + except Exception as e: + lib_logger.warning(f"Failed to refresh Firmware.ai quota usage: {e}") + + # Fetch all credentials in parallel with shared HTTP client + async with httpx.AsyncClient(timeout=30.0) as client: + tasks = [ + refresh_single_credential(api_key, client) for api_key in credentials + ] + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/src/rotator_library/providers/utilities/firmware_quota_tracker.py b/src/rotator_library/providers/utilities/firmware_quota_tracker.py new file mode 100644 index 00000000..195c48b3 --- /dev/null +++ b/src/rotator_library/providers/utilities/firmware_quota_tracker.py @@ -0,0 +1,255 @@ +""" +Firmware.ai Quota Tracking Mixin + +Provides quota tracking for the Firmware.ai provider using their quota usage API. +Firmware.ai uses a 5-hour rolling window quota system where: +- `used` is already a ratio (0 to 1) indicating quota utilization +- `reset` is an ISO 8601 UTC timestamp, or null when no active window + +API Details: +- Endpoint: GET https://app.firmware.ai/api/v1/quota +- Auth: Authorization: Bearer +- Response: { used: float, reset: string|null } + +Required from provider: + - self.api_base: str (API base URL) + - self._quota_cache: Dict[str, Dict[str, Any]] = {} + - self._quota_refresh_interval: int = 300 +""" + +import logging +import time +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +import httpx + +# Use the shared rotator_library logger +lib_logger = logging.getLogger("rotator_library") + + +class FirmwareQuotaTracker: + """ + Mixin class providing quota tracking functionality for Firmware.ai provider. + + This mixin adds the following capabilities: + - Fetch quota usage from the Firmware.ai API + - Track 5-hour rolling window quota limits + - Parse ISO 8601 reset timestamps + + Usage: + class FirmwareProvider(FirmwareQuotaTracker, ProviderInterface): + ... + + The provider class must initialize these instance attributes in __init__: + self.api_base: str = "https://app.firmware.ai/api/v1" + self._quota_cache: Dict[str, Dict[str, Any]] = {} + self._quota_refresh_interval: int = 300 # 5 min default + """ + + # Type hints for attributes from provider + api_base: str + _quota_cache: Dict[str, Dict[str, Any]] + _quota_refresh_interval: int + + def _get_quota_url(self) -> str: + """Get the quota API URL based on configured api_base.""" + return f"{self.api_base.rstrip('/')}/quota" + + # ========================================================================= + # QUOTA USAGE API + # ========================================================================= + + async def fetch_quota_usage( + self, + api_key: str, + client: Optional[httpx.AsyncClient] = None, + ) -> Dict[str, Any]: + """ + Fetch quota usage from the Firmware.ai API. + + Args: + api_key: Firmware.ai API key + client: Optional HTTP client for connection reuse + + Returns: + { + "status": "success" | "error", + "error": str | None, + "used": float, # 0.0 to 1.0 (from API directly) + "remaining_fraction": float, # 1.0 - used + "reset_at": float | None, # Unix timestamp (seconds) + "has_active_window": bool, # True if reset is not null + "fetched_at": float, + } + """ + try: + headers = { + "accept": "application/json", + "Authorization": f"Bearer {api_key}", + } + + quota_url = self._get_quota_url() + + if client is not None: + response = await client.get( + quota_url, headers=headers, timeout=30 + ) + else: + async with httpx.AsyncClient() as new_client: + response = await new_client.get( + quota_url, headers=headers, timeout=30 + ) + response.raise_for_status() + data = response.json() + + # Parse response - API returns ratio directly + used_raw = data.get("used") + # Validate used is numeric + if not isinstance(used_raw, (int, float)): + lib_logger.warning( + f"Firmware.ai quota API returned non-numeric 'used' value: {used_raw}" + ) + used = 0.0 + else: + used = float(used_raw) + reset_iso = data.get("reset") + + # Calculate remaining (inverse of used), clamped to 0.0-1.0 + remaining_fraction = max(0.0, min(1.0, 1.0 - used)) + + # Parse ISO 8601 reset timestamp + reset_at = None + if reset_iso is not None: + reset_at = self._parse_iso_timestamp(reset_iso) + # Only mark active window if we successfully parsed the timestamp + has_active_window = reset_at is not None + + return { + "status": "success", + "error": None, + "used": used, + "remaining_fraction": remaining_fraction, + "reset_at": reset_at, + "has_active_window": has_active_window, + "fetched_at": time.time(), + } + + except httpx.HTTPStatusError as e: + # Only log status code - error body may contain sensitive data + error_msg = f"HTTP {e.response.status_code}" + lib_logger.warning(f"Failed to fetch Firmware.ai quota: {error_msg}") + return { + "status": "error", + "error": error_msg, + "used": None, + "remaining_fraction": None, # None preserves cached value + "reset_at": None, + "has_active_window": False, + "fetched_at": time.time(), + } + except Exception as e: + # Log exception type only - message may contain sensitive data + lib_logger.warning(f"Failed to fetch Firmware.ai quota: {type(e).__name__}") + return { + "status": "error", + "error": type(e).__name__, + "used": None, + "remaining_fraction": None, # None preserves cached value + "reset_at": None, + "has_active_window": False, + "fetched_at": time.time(), + } + + def _parse_iso_timestamp(self, iso_string: str) -> Optional[float]: + """ + Parse ISO 8601 timestamp to Unix timestamp. + + Args: + iso_string: ISO 8601 formatted timestamp (e.g., "2026-01-20T18:12:03.000Z") + + Returns: + Unix timestamp in seconds, or None if parsing fails + """ + try: + # Handle 'Z' suffix by replacing with UTC offset + if iso_string.endswith("Z"): + iso_string = iso_string.replace("Z", "+00:00") + + dt = datetime.fromisoformat(iso_string) + # Ensure timezone-aware + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + except Exception as e: + lib_logger.warning(f"Failed to parse ISO timestamp '{iso_string}': {e}") + return None + + def get_remaining_fraction(self, usage_data: Dict[str, Any]) -> float: + """ + Calculate remaining quota fraction from usage data. + + Args: + usage_data: Response from fetch_quota_usage() + + Returns: + Remaining fraction (0.0 to 1.0) + """ + return usage_data.get("remaining_fraction", 0.0) + + def get_reset_timestamp(self, usage_data: Dict[str, Any]) -> Optional[float]: + """ + Get the next reset timestamp from usage data. + + Args: + usage_data: Response from fetch_quota_usage() + + Returns: + Unix timestamp when quota resets, or None if no active window + """ + return usage_data.get("reset_at") + + # ========================================================================= + # BACKGROUND JOB SUPPORT + # ========================================================================= + + async def refresh_quota_usage( + self, + api_key: str, + credential_identifier: str, + ) -> Dict[str, Any]: + """ + Refresh and cache quota usage for a credential. + + Args: + api_key: Firmware.ai API key + credential_identifier: Identifier for caching + + Returns: + Usage data from fetch_quota_usage() + """ + usage_data = await self.fetch_quota_usage(api_key) + + if usage_data.get("status") == "success": + self._quota_cache[credential_identifier] = usage_data + + lib_logger.debug( + f"Firmware.ai quota for {credential_identifier}: " + f"{usage_data['remaining_fraction'] * 100:.1f}% remaining, " + f"active_window={usage_data['has_active_window']}" + ) + + return usage_data + + def get_cached_usage(self, credential_identifier: str) -> Optional[Dict[str, Any]]: + """ + Get cached quota usage for a credential. + + Args: + credential_identifier: Identifier used in caching + + Returns: + Copy of cached usage data or None + """ + cached = self._quota_cache.get(credential_identifier) + return dict(cached) if cached else None diff --git a/src/rotator_library/usage_manager.py b/src/rotator_library/usage_manager.py index f349f7bb..dccddaf2 100644 --- a/src/rotator_library/usage_manager.py +++ b/src/rotator_library/usage_manager.py @@ -841,7 +841,7 @@ def _get_provider_from_credential(self, credential: str) -> Optional[str]: - OAuth: "oauth_creds/antigravity_oauth_15.json" -> "antigravity" - OAuth: "C:\\...\\oauth_creds\\gemini_cli_oauth_1.json" -> "gemini_cli" - OAuth filename only: "antigravity_oauth_1.json" -> "antigravity" - - API key style: stored with provider prefix metadata + - API key style: extracted from model names in usage data (e.g., "firmware/model" -> "firmware") Args: credential: The credential identifier (path or key) @@ -889,6 +889,30 @@ def _get_provider_from_credential(self, credential: str) -> Optional[str]: if credential.startswith(prefix): return provider + # Fallback: For raw API keys, extract provider from model names in usage data + # This handles providers like firmware, chutes, nanogpt that use credential-level quota + if self._usage_data and credential in self._usage_data: + cred_data = self._usage_data[credential] + + # Check "models" section first (for per_model mode and quota tracking) + models_data = cred_data.get("models", {}) + if models_data: + # Get first model name and extract provider prefix + first_model = next(iter(models_data.keys()), None) + if first_model and "/" in first_model: + provider = first_model.split("/")[0].lower() + return provider + + # Fallback to "daily" section (legacy structure) + daily_data = cred_data.get("daily", {}) + daily_models = daily_data.get("models", {}) + if daily_models: + # Get first model name and extract provider prefix + first_model = next(iter(daily_models.keys()), None) + if first_model and "/" in first_model: + provider = first_model.split("/")[0].lower() + return provider + return None def _get_provider_instance(self, provider: str) -> Optional[Any]: