diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md
index 5dafe6b8..f7ebbde5 100644
--- a/DOCUMENTATION.md
+++ b/DOCUMENTATION.md
@@ -10,6 +10,7 @@ The project is a monorepo containing two primary components:
* **Batch Manager**: Optimizes high-volume embedding requests.
* **Detailed Logger**: Provides per-request file logging for debugging.
* **OpenAI-Compatible Endpoints**: `/v1/chat/completions`, `/v1/embeddings`, etc.
+ * **Anthropic-Compatible Endpoints**: `/v1/messages`, `/v1/messages/count_tokens` for Claude Code and other Anthropic API clients.
* **Model Filter GUI**: Visual interface for configuring model ignore/whitelist rules per provider (see Section 6).
2. **The Resilience Library (`rotator_library`)**: This is the core engine that provides high availability. It is consumed by the proxy app to manage a pool of API keys, handle errors gracefully, and ensure requests are completed successfully even when individual keys or provider endpoints face issues.
@@ -816,6 +817,108 @@ When a custom cap triggers a cooldown longer than the exhaustion threshold, it a
**Defaults:** See `src/rotator_library/config/defaults.py` for all configurable defaults.
+### 2.21. Anthropic API Compatibility (`anthropic_compat/`)
+
+A translation layer that enables Anthropic API clients (like Claude Code) to use any OpenAI-compatible provider through the proxy.
+
+#### Architecture
+
+The module consists of three components:
+
+| File | Purpose |
+|------|---------|
+| `models.py` | Pydantic models for Anthropic request/response formats (`AnthropicMessagesRequest`, `AnthropicMessage`, `AnthropicTool`, etc.) |
+| `translator.py` | Bidirectional format translation functions |
+| `streaming.py` | SSE format conversion for streaming responses |
+
+#### Request Translation (`translate_anthropic_request`)
+
+Converts Anthropic Messages API requests to OpenAI Chat Completions format:
+
+**Message Conversion:**
+- Anthropic `system` field → OpenAI system message
+- `content` blocks (text, image, tool_use, tool_result) → OpenAI format
+- Image blocks with base64 data → OpenAI `image_url` with data URI
+- Document blocks (PDF, etc.) → OpenAI `image_url` format
+
+**Tool Conversion:**
+- Anthropic `tools` with `input_schema` → OpenAI `tools` with `parameters`
+- `tool_choice.type: "any"` → `"required"`
+- `tool_choice.type: "tool"` → `{"type": "function", "function": {"name": ...}}`
+
+**Thinking Configuration:**
+- `thinking.type: "enabled"` → `reasoning_effort: "high"` + `thinking_budget`
+- `thinking.type: "disabled"` → `reasoning_effort: "disable"`
+- Opus models default to thinking enabled
+
+**Special Handling:**
+- Reorders assistant content blocks: thinking → text → tool_use
+- Injects `[Continue]` prompt for fresh thinking turns
+- Preserves thinking signatures for multi-turn conversations
+
+#### Response Translation (`openai_to_anthropic_response`)
+
+Converts OpenAI Chat Completions responses to Anthropic Messages format:
+
+**Content Blocks:**
+- `reasoning_content` → thinking block with signature
+- `content` → text block
+- `tool_calls` → tool_use blocks with parsed JSON input
+
+**Field Mapping:**
+- `finish_reason: "stop"` → `stop_reason: "end_turn"`
+- `finish_reason: "length"` → `stop_reason: "max_tokens"`
+- `finish_reason: "tool_calls"` → `stop_reason: "tool_use"`
+
+**Usage Translation:**
+- `prompt_tokens` minus `cached_tokens` → `input_tokens`
+- `completion_tokens` → `output_tokens`
+- `prompt_tokens_details.cached_tokens` → `cache_read_input_tokens`
+
+#### Streaming Wrapper (`anthropic_streaming_wrapper`)
+
+Converts OpenAI SSE streaming format to Anthropic's event-based format:
+
+**Event Types Generated:**
+```
+message_start → Initial message metadata
+content_block_start → Start of text/thinking/tool_use block
+content_block_delta → Incremental content (text_delta, thinking_delta, input_json_delta)
+content_block_stop → End of content block
+message_delta → Final metadata (stop_reason, usage)
+message_stop → End of message
+```
+
+**Features:**
+- Accumulates tool call arguments across chunks
+- Handles thinking/reasoning content from `delta.reasoning_content`
+- Proper block indexing for multiple content blocks
+- Cache token handling in usage statistics
+- Error recovery with proper message structure
+
+#### Client Integration
+
+The `RotatingClient` provides two methods for Anthropic compatibility:
+
+```python
+async def anthropic_messages(self, request, raw_request=None, pre_request_callback=None):
+ """Handle Anthropic Messages API requests."""
+ # 1. Translate Anthropic request to OpenAI format
+ # 2. Call acompletion() with translated request
+ # 3. Convert response back to Anthropic format
+ # 4. For streaming: wrap with anthropic_streaming_wrapper
+
+async def anthropic_count_tokens(self, request):
+ """Count tokens for Anthropic-format request."""
+ # Translates messages and tools, then uses token_count()
+```
+
+#### Authentication
+
+The proxy accepts both Anthropic and OpenAI authentication styles:
+- `x-api-key` header (Anthropic style)
+- `Authorization: Bearer` header (OpenAI style)
+
### 3.5. Antigravity (`antigravity_provider.py`)
The most sophisticated provider implementation, supporting Google's internal Antigravity API for Gemini 3 and Claude models (including **Claude Opus 4.5**, Anthropic's most powerful model).
diff --git a/README.md b/README.md
index cd650e5e..a7c3c438 100644
--- a/README.md
+++ b/README.md
@@ -4,19 +4,20 @@
**One proxy. Any LLM provider. Zero code changes.**
-A self-hosted proxy that provides a single, OpenAI-compatible API endpoint for all your LLM providers. Works with any application that supports custom OpenAI base URLs—no code changes required in your existing tools.
+A self-hosted proxy that provides OpenAI and Anthropic compatible API endpoints for all your LLM providers. Works with any application that supports custom OpenAI or Anthropic base URLs—including Claude Code, Opencode, and more—no code changes required in your existing tools.
This project consists of two components:
-1. **The API Proxy** — A FastAPI application providing a universal `/v1/chat/completions` endpoint
+1. **The API Proxy** — A FastAPI application providing universal `/v1/chat/completions` (OpenAI) and `/v1/messages` (Anthropic) endpoints
2. **The Resilience Library** — A reusable Python library for intelligent API key management, rotation, and failover
---
## Why Use This?
-- **Universal Compatibility** — Works with any app supporting OpenAI-compatible APIs: Opencode, Continue, Roo/Kilo Code, JanitorAI, SillyTavern, custom applications, and more
+- **Universal Compatibility** — Works with any app supporting OpenAI or Anthropic APIs: Claude Code, Opencode, Continue, Roo/Kilo Code, Cursor, JanitorAI, SillyTavern, custom applications, and more
- **One Endpoint, Many Providers** — Configure Gemini, OpenAI, Anthropic, and [any LiteLLM-supported provider](https://docs.litellm.ai/docs/providers) once. Access them all through a single API key
+- **Anthropic API Compatible** — Use Claude Code or any Anthropic SDK client with non-Anthropic providers like Gemini, OpenAI, or custom models
- **Built-in Resilience** — Automatic key rotation, failover on errors, rate limit handling, and intelligent cooldowns
- **Exclusive Provider Support** — Includes custom providers not available elsewhere: **Antigravity** (Gemini 3 + Claude Sonnet/Opus 4.5), **Gemini CLI**, **Qwen Code**, and **iFlow**
@@ -177,12 +178,57 @@ In your configuration file (e.g., `config.json`):
+
+Claude Code
+
+Claude Code natively supports custom Anthropic API endpoints. The recommended setup is to edit your Claude Code `settings.json`:
+
+```json
+{
+ "env": {
+ "ANTHROPIC_AUTH_TOKEN": "your-proxy-api-key",
+ "ANTHROPIC_BASE_URL": "http://127.0.0.1:8000",
+ "ANTHROPIC_DEFAULT_OPUS_MODEL": "gemini/gemini-3-pro",
+ "ANTHROPIC_DEFAULT_SONNET_MODEL": "gemini/gemini-3-flash",
+ "ANTHROPIC_DEFAULT_HAIKU_MODEL": "openai/gpt-5-mini"
+ }
+}
+```
+
+Now you can use Claude Code with Gemini, OpenAI, or any other configured provider.
+
+
+
+
+Anthropic Python SDK
+
+```python
+from anthropic import Anthropic
+
+client = Anthropic(
+ base_url="http://127.0.0.1:8000",
+ api_key="your-proxy-api-key"
+)
+
+# Use any provider through Anthropic's API format
+response = client.messages.create(
+ model="gemini/gemini-3-flash", # provider/model format
+ max_tokens=1024,
+ messages=[{"role": "user", "content": "Hello!"}]
+)
+print(response.content[0].text)
+```
+
+
+
### API Endpoints
| Endpoint | Description |
|----------|-------------|
| `GET /` | Status check — confirms proxy is running |
-| `POST /v1/chat/completions` | Chat completions (main endpoint) |
+| `POST /v1/chat/completions` | Chat completions (OpenAI format) |
+| `POST /v1/messages` | Chat completions (Anthropic format) — Claude Code compatible |
+| `POST /v1/messages/count_tokens` | Count tokens for Anthropic-format requests |
| `POST /v1/embeddings` | Text embeddings |
| `GET /v1/models` | List all available models with pricing & capabilities |
| `GET /v1/models/{model_id}` | Get details for a specific model |
diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py
index a2b20bcf..4d8dba99 100644
--- a/src/proxy_app/main.py
+++ b/src/proxy_app/main.py
@@ -1,4 +1,5 @@
import time
+import uuid
# Phase 1: Minimal imports for arg parsing and TUI
import asyncio
@@ -106,7 +107,7 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import StreamingResponse
+ from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.security import APIKeyHeader
print(" → Loading core dependencies...")
@@ -220,6 +221,13 @@ class EnrichedModelList(BaseModel):
data: List[EnrichedModelCard]
+# --- Anthropic API Models (imported from library) ---
+from rotator_library.anthropic_compat import (
+ AnthropicMessagesRequest,
+ AnthropicCountTokensRequest,
+)
+
+
# Calculate total loading time
_elapsed = time.time() - _start_time
print(
@@ -680,6 +688,27 @@ async def verify_api_key(auth: str = Depends(api_key_header)):
return auth
+# --- Anthropic API Key Header ---
+anthropic_api_key_header = APIKeyHeader(name="x-api-key", auto_error=False)
+
+
+async def verify_anthropic_api_key(
+ x_api_key: str = Depends(anthropic_api_key_header),
+ auth: str = Depends(api_key_header),
+):
+ """
+ Dependency to verify API key for Anthropic endpoints.
+ Accepts either x-api-key header (Anthropic style) or Authorization Bearer (OpenAI style).
+ """
+ # Check x-api-key first (Anthropic style)
+ if x_api_key and x_api_key == PROXY_API_KEY:
+ return x_api_key
+ # Fall back to Bearer token (OpenAI style)
+ if auth and auth == f"Bearer {PROXY_API_KEY}":
+ return auth
+ raise HTTPException(status_code=401, detail="Invalid or missing API Key")
+
+
async def streaming_response_wrapper(
request: Request,
request_data: dict,
@@ -980,6 +1009,163 @@ async def chat_completions(
raise HTTPException(status_code=500, detail=str(e))
+# --- Anthropic Messages API Endpoint ---
+@app.post("/v1/messages")
+async def anthropic_messages(
+ request: Request,
+ body: AnthropicMessagesRequest,
+ client: RotatingClient = Depends(get_rotating_client),
+ _=Depends(verify_anthropic_api_key),
+):
+ """
+ Anthropic-compatible Messages API endpoint.
+
+ Accepts requests in Anthropic's format and returns responses in Anthropic's format.
+ Internally translates to OpenAI format for processing via LiteLLM.
+
+ This endpoint is compatible with Claude Code and other Anthropic API clients.
+ """
+ # Initialize raw I/O logger if enabled (for debugging proxy boundary)
+ logger = RawIOLogger() if ENABLE_RAW_LOGGING else None
+
+ # Log raw Anthropic request if raw logging is enabled
+ if logger:
+ logger.log_request(
+ headers=dict(request.headers),
+ body=body.model_dump(exclude_none=True),
+ )
+
+ try:
+ # Log the request to console
+ log_request_to_console(
+ url=str(request.url),
+ headers=dict(request.headers),
+ client_info=(
+ request.client.host if request.client else "unknown",
+ request.client.port if request.client else 0,
+ ),
+ request_data=body.model_dump(exclude_none=True),
+ )
+
+ # Use the library method to handle the request
+ result = await client.anthropic_messages(body, raw_request=request)
+
+ if body.stream:
+ # Streaming response
+ return StreamingResponse(
+ result,
+ media_type="text/event-stream",
+ headers={
+ "Cache-Control": "no-cache",
+ "Connection": "keep-alive",
+ "X-Accel-Buffering": "no",
+ },
+ )
+ else:
+ # Non-streaming response
+ if logger:
+ logger.log_final_response(
+ status_code=200,
+ headers=None,
+ body=result,
+ )
+ return JSONResponse(content=result)
+
+ except (
+ litellm.InvalidRequestError,
+ ValueError,
+ litellm.ContextWindowExceededError,
+ ) as e:
+ error_response = {
+ "type": "error",
+ "error": {"type": "invalid_request_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=400, detail=error_response)
+ except litellm.AuthenticationError as e:
+ error_response = {
+ "type": "error",
+ "error": {"type": "authentication_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=401, detail=error_response)
+ except litellm.RateLimitError as e:
+ error_response = {
+ "type": "error",
+ "error": {"type": "rate_limit_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=429, detail=error_response)
+ except (litellm.ServiceUnavailableError, litellm.APIConnectionError) as e:
+ error_response = {
+ "type": "error",
+ "error": {"type": "api_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=503, detail=error_response)
+ except litellm.Timeout as e:
+ error_response = {
+ "type": "error",
+ "error": {"type": "api_error", "message": f"Request timed out: {str(e)}"},
+ }
+ raise HTTPException(status_code=504, detail=error_response)
+ except Exception as e:
+ logging.error(f"Anthropic messages endpoint error: {e}")
+ if logger:
+ logger.log_final_response(
+ status_code=500,
+ headers=None,
+ body={"error": str(e)},
+ )
+ error_response = {
+ "type": "error",
+ "error": {"type": "api_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=500, detail=error_response)
+
+
+# --- Anthropic Count Tokens Endpoint ---
+@app.post("/v1/messages/count_tokens")
+async def anthropic_count_tokens(
+ request: Request,
+ body: AnthropicCountTokensRequest,
+ client: RotatingClient = Depends(get_rotating_client),
+ _=Depends(verify_anthropic_api_key),
+):
+ """
+ Anthropic-compatible count_tokens endpoint.
+
+ Counts the number of tokens that would be used by a Messages API request.
+ This is useful for estimating costs and managing context windows.
+
+ Accepts requests in Anthropic's format and returns token count in Anthropic's format.
+ """
+ try:
+ # Use the library method to handle the request
+ result = await client.anthropic_count_tokens(body)
+ return JSONResponse(content=result)
+
+ except (
+ litellm.InvalidRequestError,
+ ValueError,
+ litellm.ContextWindowExceededError,
+ ) as e:
+ error_response = {
+ "type": "error",
+ "error": {"type": "invalid_request_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=400, detail=error_response)
+ except litellm.AuthenticationError as e:
+ error_response = {
+ "type": "error",
+ "error": {"type": "authentication_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=401, detail=error_response)
+ except Exception as e:
+ logging.error(f"Anthropic count_tokens endpoint error: {e}")
+ error_response = {
+ "type": "error",
+ "error": {"type": "api_error", "message": str(e)},
+ }
+ raise HTTPException(status_code=500, detail=error_response)
+
+
@app.post("/v1/embeddings")
async def embeddings(
request: Request,
diff --git a/src/rotator_library/README.md b/src/rotator_library/README.md
index c7b3c866..22d2bf6e 100644
--- a/src/rotator_library/README.md
+++ b/src/rotator_library/README.md
@@ -5,6 +5,7 @@ A robust, asynchronous, and thread-safe Python library for managing a pool of AP
## Key Features
- **Asynchronous by Design**: Built with `asyncio` and `httpx` for high-performance, non-blocking I/O.
+- **Anthropic API Compatibility**: Built-in translation layer (`anthropic_compat`) enables Anthropic API clients (like Claude Code) to use any supported provider.
- **Advanced Concurrency Control**: A single API key can be used for multiple concurrent requests. By default, it supports concurrent requests to *different* models. With configuration (`MAX_CONCURRENT_REQUESTS_PER_KEY_`), it can also support multiple concurrent requests to the *same* model using the same key.
- **Smart Key Management**: Selects the optimal key for each request using a tiered, model-aware locking strategy to distribute load evenly and maximize availability.
- **Configurable Rotation Strategy**: Choose between deterministic least-used selection (perfect balance) or default weighted random selection (unpredictable, harder to fingerprint).
@@ -173,6 +174,61 @@ Fetches a list of available models for a specific provider, applying any configu
Fetches a dictionary of all available models, grouped by provider, or as a single flat list if `grouped=False`.
+#### `async def anthropic_messages(self, request, raw_request=None, pre_request_callback=None) -> Any:`
+
+Handle Anthropic Messages API requests. Accepts requests in Anthropic's format, translates them to OpenAI format internally, processes them through `acompletion`, and returns responses in Anthropic's format.
+
+- **Parameters**:
+ - `request`: An `AnthropicMessagesRequest` object (from `anthropic_compat.models`)
+ - `raw_request`: Optional raw request object for client disconnect checks
+ - `pre_request_callback`: Optional async callback before each API request
+- **Returns**:
+ - For non-streaming: dict in Anthropic Messages format
+ - For streaming: AsyncGenerator yielding Anthropic SSE format strings
+
+#### `async def anthropic_count_tokens(self, request) -> dict:`
+
+Handle Anthropic count_tokens API requests. Counts the number of tokens that would be used by a Messages API request.
+
+- **Parameters**: `request` - An `AnthropicCountTokensRequest` object
+- **Returns**: Dict with `input_tokens` count in Anthropic format
+
+## Anthropic API Compatibility
+
+The library includes a translation layer (`anthropic_compat`) that enables Anthropic API clients to use any OpenAI-compatible provider.
+
+### Usage
+
+```python
+from rotator_library.anthropic_compat import (
+ AnthropicMessagesRequest,
+ AnthropicCountTokensRequest,
+ translate_anthropic_request,
+ openai_to_anthropic_response,
+ anthropic_streaming_wrapper,
+)
+
+# Create an Anthropic-format request
+request = AnthropicMessagesRequest(
+ model="gemini/gemini-2.5-flash",
+ max_tokens=1024,
+ messages=[{"role": "user", "content": "Hello!"}]
+)
+
+# Use with RotatingClient
+async with RotatingClient(api_keys=api_keys) as client:
+ response = await client.anthropic_messages(request)
+ print(response["content"][0]["text"])
+```
+
+### Features
+
+- **Full Message Translation**: Converts between Anthropic and OpenAI message formats including text, images, tool_use, and tool_result blocks
+- **Extended Thinking Support**: Translates Anthropic's `thinking` configuration to `reasoning_effort` for providers that support it
+- **Streaming SSE Conversion**: Converts OpenAI streaming chunks to Anthropic's SSE event format (`message_start`, `content_block_delta`, etc.)
+- **Cache Token Handling**: Properly translates `prompt_tokens_details.cached_tokens` to Anthropic's `cache_read_input_tokens`
+- **Tool Call Support**: Full support for tool definitions and tool use/result blocks
+
## Credential Tool
The library includes a utility to manage credentials easily:
diff --git a/src/rotator_library/__init__.py b/src/rotator_library/__init__.py
index 7944443f..b05e4707 100644
--- a/src/rotator_library/__init__.py
+++ b/src/rotator_library/__init__.py
@@ -8,6 +8,7 @@
from .providers import PROVIDER_PLUGINS
from .providers.provider_interface import ProviderInterface
from .model_info_service import ModelInfoService, ModelInfo, ModelMetadata
+ from . import anthropic_compat
__all__ = [
"RotatingClient",
@@ -15,11 +16,12 @@
"ModelInfoService",
"ModelInfo",
"ModelMetadata",
+ "anthropic_compat",
]
def __getattr__(name):
- """Lazy-load PROVIDER_PLUGINS and ModelInfoService to speed up module import."""
+ """Lazy-load PROVIDER_PLUGINS, ModelInfoService, and anthropic_compat to speed up module import."""
if name == "PROVIDER_PLUGINS":
from .providers import PROVIDER_PLUGINS
@@ -36,4 +38,8 @@ def __getattr__(name):
from .model_info_service import ModelMetadata
return ModelMetadata
+ if name == "anthropic_compat":
+ from . import anthropic_compat
+
+ return anthropic_compat
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/src/rotator_library/anthropic_compat/__init__.py b/src/rotator_library/anthropic_compat/__init__.py
new file mode 100644
index 00000000..8572ac79
--- /dev/null
+++ b/src/rotator_library/anthropic_compat/__init__.py
@@ -0,0 +1,67 @@
+"""
+Anthropic API compatibility module for rotator_library.
+
+This module provides format translation between Anthropic's Messages API
+and OpenAI's Chat Completions API, enabling any OpenAI-compatible provider
+to work with Anthropic clients like Claude Code.
+
+Usage:
+ from rotator_library.anthropic_compat import (
+ AnthropicMessagesRequest,
+ AnthropicMessagesResponse,
+ translate_anthropic_request,
+ openai_to_anthropic_response,
+ anthropic_streaming_wrapper,
+ )
+"""
+
+from .models import (
+ AnthropicTextBlock,
+ AnthropicImageSource,
+ AnthropicImageBlock,
+ AnthropicToolUseBlock,
+ AnthropicToolResultBlock,
+ AnthropicMessage,
+ AnthropicTool,
+ AnthropicThinkingConfig,
+ AnthropicMessagesRequest,
+ AnthropicUsage,
+ AnthropicMessagesResponse,
+ AnthropicCountTokensRequest,
+ AnthropicCountTokensResponse,
+)
+
+from .translator import (
+ anthropic_to_openai_messages,
+ anthropic_to_openai_tools,
+ anthropic_to_openai_tool_choice,
+ openai_to_anthropic_response,
+ translate_anthropic_request,
+)
+
+from .streaming import anthropic_streaming_wrapper
+
+__all__ = [
+ # Models
+ "AnthropicTextBlock",
+ "AnthropicImageSource",
+ "AnthropicImageBlock",
+ "AnthropicToolUseBlock",
+ "AnthropicToolResultBlock",
+ "AnthropicMessage",
+ "AnthropicTool",
+ "AnthropicThinkingConfig",
+ "AnthropicMessagesRequest",
+ "AnthropicUsage",
+ "AnthropicMessagesResponse",
+ "AnthropicCountTokensRequest",
+ "AnthropicCountTokensResponse",
+ # Translator functions
+ "anthropic_to_openai_messages",
+ "anthropic_to_openai_tools",
+ "anthropic_to_openai_tool_choice",
+ "openai_to_anthropic_response",
+ "translate_anthropic_request",
+ # Streaming
+ "anthropic_streaming_wrapper",
+]
diff --git a/src/rotator_library/anthropic_compat/models.py b/src/rotator_library/anthropic_compat/models.py
new file mode 100644
index 00000000..c579f2e2
--- /dev/null
+++ b/src/rotator_library/anthropic_compat/models.py
@@ -0,0 +1,144 @@
+"""
+Pydantic models for the Anthropic Messages API.
+
+These models define the request and response formats for Anthropic's Messages API,
+enabling compatibility with Claude Code and other Anthropic API clients.
+"""
+
+from typing import Any, List, Optional, Union
+from pydantic import BaseModel
+
+
+# --- Content Blocks ---
+class AnthropicTextBlock(BaseModel):
+ """Anthropic text content block."""
+
+ type: str = "text"
+ text: str
+
+
+class AnthropicImageSource(BaseModel):
+ """Anthropic image source for base64 images."""
+
+ type: str = "base64"
+ media_type: str
+ data: str
+
+
+class AnthropicImageBlock(BaseModel):
+ """Anthropic image content block."""
+
+ type: str = "image"
+ source: AnthropicImageSource
+
+
+class AnthropicToolUseBlock(BaseModel):
+ """Anthropic tool use content block."""
+
+ type: str = "tool_use"
+ id: str
+ name: str
+ input: dict
+
+
+class AnthropicToolResultBlock(BaseModel):
+ """Anthropic tool result content block."""
+
+ type: str = "tool_result"
+ tool_use_id: str
+ content: Union[str, List[Any]]
+ is_error: Optional[bool] = None
+
+
+# --- Message and Tool Definitions ---
+class AnthropicMessage(BaseModel):
+ """Anthropic message format."""
+
+ role: str
+ content: Union[
+ str,
+ List[
+ Union[
+ AnthropicTextBlock,
+ AnthropicImageBlock,
+ AnthropicToolUseBlock,
+ AnthropicToolResultBlock,
+ dict,
+ ]
+ ],
+ ]
+
+
+class AnthropicTool(BaseModel):
+ """Anthropic tool definition."""
+
+ name: str
+ description: Optional[str] = None
+ input_schema: dict
+
+
+class AnthropicThinkingConfig(BaseModel):
+ """Anthropic thinking configuration."""
+
+ type: str # "enabled" or "disabled"
+ budget_tokens: Optional[int] = None
+
+
+# --- Messages Request ---
+class AnthropicMessagesRequest(BaseModel):
+ """Anthropic Messages API request format."""
+
+ model: str
+ messages: List[AnthropicMessage]
+ max_tokens: int
+ system: Optional[Union[str, List[dict]]] = None
+ temperature: Optional[float] = None
+ top_p: Optional[float] = None
+ top_k: Optional[int] = None
+ stop_sequences: Optional[List[str]] = None
+ stream: Optional[bool] = False
+ tools: Optional[List[AnthropicTool]] = None
+ tool_choice: Optional[dict] = None
+ metadata: Optional[dict] = None
+ thinking: Optional[AnthropicThinkingConfig] = None
+
+
+# --- Messages Response ---
+class AnthropicUsage(BaseModel):
+ """Anthropic usage statistics."""
+
+ input_tokens: int
+ output_tokens: int
+ cache_creation_input_tokens: Optional[int] = None
+ cache_read_input_tokens: Optional[int] = None
+
+
+class AnthropicMessagesResponse(BaseModel):
+ """Anthropic Messages API response format."""
+
+ id: str
+ type: str = "message"
+ role: str = "assistant"
+ content: List[Union[AnthropicTextBlock, AnthropicToolUseBlock, dict]]
+ model: str
+ stop_reason: Optional[str] = None
+ stop_sequence: Optional[str] = None
+ usage: AnthropicUsage
+
+
+# --- Count Tokens ---
+class AnthropicCountTokensRequest(BaseModel):
+ """Anthropic count_tokens API request format."""
+
+ model: str
+ messages: List[AnthropicMessage]
+ system: Optional[Union[str, List[dict]]] = None
+ tools: Optional[List[AnthropicTool]] = None
+ tool_choice: Optional[dict] = None
+ thinking: Optional[AnthropicThinkingConfig] = None
+
+
+class AnthropicCountTokensResponse(BaseModel):
+ """Anthropic count_tokens API response format."""
+
+ input_tokens: int
diff --git a/src/rotator_library/anthropic_compat/streaming.py b/src/rotator_library/anthropic_compat/streaming.py
new file mode 100644
index 00000000..870dae17
--- /dev/null
+++ b/src/rotator_library/anthropic_compat/streaming.py
@@ -0,0 +1,430 @@
+"""
+Streaming wrapper for converting OpenAI streaming format to Anthropic streaming format.
+
+This module provides a framework-agnostic streaming wrapper that converts
+OpenAI SSE (Server-Sent Events) format to Anthropic's streaming format.
+"""
+
+import json
+import logging
+import uuid
+from typing import AsyncGenerator, Callable, Optional, Awaitable, Any, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from ..transaction_logger import TransactionLogger
+
+logger = logging.getLogger("rotator_library.anthropic_compat")
+
+
+async def anthropic_streaming_wrapper(
+ openai_stream: AsyncGenerator[str, None],
+ original_model: str,
+ request_id: Optional[str] = None,
+ is_disconnected: Optional[Callable[[], Awaitable[bool]]] = None,
+ transaction_logger: Optional["TransactionLogger"] = None,
+) -> AsyncGenerator[str, None]:
+ """
+ Convert OpenAI streaming format to Anthropic streaming format.
+
+ This is a framework-agnostic wrapper that can be used with any async web framework.
+ Instead of taking a FastAPI Request object, it accepts an optional callback function
+ to check for client disconnection.
+
+ Anthropic SSE events:
+ - message_start: Initial message metadata
+ - content_block_start: Start of a content block
+ - content_block_delta: Content chunk
+ - content_block_stop: End of a content block
+ - message_delta: Final message metadata (stop_reason, usage)
+ - message_stop: End of message
+
+ Args:
+ openai_stream: AsyncGenerator yielding OpenAI SSE format strings
+ original_model: The model name to include in responses
+ request_id: Optional request ID (auto-generated if not provided)
+ is_disconnected: Optional async callback that returns True if client disconnected
+ transaction_logger: Optional TransactionLogger for logging the final Anthropic response
+
+ Yields:
+ SSE format strings in Anthropic's streaming format
+ """
+ if request_id is None:
+ request_id = f"msg_{uuid.uuid4().hex[:24]}"
+
+ message_started = False
+ content_block_started = False
+ thinking_block_started = False
+ current_block_index = 0
+ tool_calls_by_index = {} # Track tool calls by their index
+ tool_block_indices = {} # Track which block index each tool call uses
+ input_tokens = 0
+ output_tokens = 0
+ cached_tokens = 0 # Track cached tokens for proper Anthropic format
+ accumulated_text = "" # Track accumulated text for logging
+ accumulated_thinking = "" # Track accumulated thinking for logging
+ stop_reason_final = "end_turn" # Track final stop reason for logging
+
+ try:
+ async for chunk_str in openai_stream:
+ # Check for client disconnection if callback provided
+ if is_disconnected is not None and await is_disconnected():
+ break
+
+ if not chunk_str.strip() or not chunk_str.startswith("data:"):
+ continue
+
+ data_content = chunk_str[len("data:") :].strip()
+ if data_content == "[DONE]":
+ # CRITICAL: Send message_start if we haven't yet (e.g., empty response)
+ # Claude Code and other clients require message_start before message_stop
+ if not message_started:
+ # Build usage with cached tokens properly handled
+ usage_dict = {
+ "input_tokens": input_tokens - cached_tokens,
+ "output_tokens": 0,
+ }
+ if cached_tokens > 0:
+ usage_dict["cache_read_input_tokens"] = cached_tokens
+ usage_dict["cache_creation_input_tokens"] = 0
+
+ message_start = {
+ "type": "message_start",
+ "message": {
+ "id": request_id,
+ "type": "message",
+ "role": "assistant",
+ "content": [],
+ "model": original_model,
+ "stop_reason": None,
+ "stop_sequence": None,
+ "usage": usage_dict,
+ },
+ }
+ yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n"
+ message_started = True
+
+ # Close any open thinking block
+ if thinking_block_started:
+ yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n'
+ current_block_index += 1
+ thinking_block_started = False
+
+ # Close any open text block
+ if content_block_started:
+ yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n'
+ current_block_index += 1
+ content_block_started = False
+
+ # Close all open tool_use blocks
+ for tc_index in sorted(tool_block_indices.keys()):
+ block_idx = tool_block_indices[tc_index]
+ yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {block_idx}}}\n\n'
+
+ # Determine stop_reason based on whether we had tool calls
+ stop_reason = "tool_use" if tool_calls_by_index else "end_turn"
+ stop_reason_final = stop_reason
+
+ # Build final usage dict with cached tokens
+ final_usage = {"output_tokens": output_tokens}
+ if cached_tokens > 0:
+ final_usage["cache_read_input_tokens"] = cached_tokens
+ final_usage["cache_creation_input_tokens"] = 0
+
+ # Send message_delta with final info
+ yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "{stop_reason}", "stop_sequence": null}}, "usage": {json.dumps(final_usage)}}}\n\n'
+
+ # Send message_stop
+ yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n'
+
+ # Log final Anthropic response if logger provided
+ if transaction_logger:
+ # Build content blocks for logging
+ content_blocks = []
+ if accumulated_thinking:
+ content_blocks.append(
+ {
+ "type": "thinking",
+ "thinking": accumulated_thinking,
+ }
+ )
+ if accumulated_text:
+ content_blocks.append(
+ {
+ "type": "text",
+ "text": accumulated_text,
+ }
+ )
+ # Add tool use blocks
+ for tc_index in sorted(tool_calls_by_index.keys()):
+ tc = tool_calls_by_index[tc_index]
+ # Parse arguments JSON string to dict
+ try:
+ input_data = json.loads(tc.get("arguments", "{}"))
+ except json.JSONDecodeError:
+ input_data = {}
+ content_blocks.append(
+ {
+ "type": "tool_use",
+ "id": tc.get("id", ""),
+ "name": tc.get("name", ""),
+ "input": input_data,
+ }
+ )
+
+ # Build usage for logging
+ log_usage = {
+ "input_tokens": input_tokens - cached_tokens,
+ "output_tokens": output_tokens,
+ }
+ if cached_tokens > 0:
+ log_usage["cache_read_input_tokens"] = cached_tokens
+ log_usage["cache_creation_input_tokens"] = 0
+
+ anthropic_response = {
+ "id": request_id,
+ "type": "message",
+ "role": "assistant",
+ "content": content_blocks,
+ "model": original_model,
+ "stop_reason": stop_reason_final,
+ "stop_sequence": None,
+ "usage": log_usage,
+ }
+ transaction_logger.log_response(
+ anthropic_response,
+ filename="anthropic_response.json",
+ )
+
+ break
+
+ try:
+ chunk = json.loads(data_content)
+ except json.JSONDecodeError:
+ continue
+
+ # Extract usage if present
+ # Note: Google's promptTokenCount INCLUDES cached tokens, but Anthropic's
+ # input_tokens EXCLUDES cached tokens. We extract cached tokens and subtract.
+ if "usage" in chunk and chunk["usage"]:
+ usage = chunk["usage"]
+ input_tokens = usage.get("prompt_tokens", input_tokens)
+ output_tokens = usage.get("completion_tokens", output_tokens)
+ # Extract cached tokens from prompt_tokens_details
+ if usage.get("prompt_tokens_details"):
+ cached_tokens = usage["prompt_tokens_details"].get(
+ "cached_tokens", cached_tokens
+ )
+
+ # Send message_start on first chunk
+ if not message_started:
+ # Build usage with cached tokens properly handled for Anthropic format
+ usage_dict = {
+ "input_tokens": input_tokens - cached_tokens,
+ "output_tokens": 0,
+ }
+ if cached_tokens > 0:
+ usage_dict["cache_read_input_tokens"] = cached_tokens
+ usage_dict["cache_creation_input_tokens"] = 0
+
+ message_start = {
+ "type": "message_start",
+ "message": {
+ "id": request_id,
+ "type": "message",
+ "role": "assistant",
+ "content": [],
+ "model": original_model,
+ "stop_reason": None,
+ "stop_sequence": None,
+ "usage": usage_dict,
+ },
+ }
+ yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n"
+ message_started = True
+
+ choices = chunk.get("choices") or []
+ if not choices:
+ continue
+
+ delta = choices[0].get("delta", {})
+
+ # Handle reasoning/thinking content (from OpenAI-style reasoning_content)
+ reasoning_content = delta.get("reasoning_content")
+ if reasoning_content:
+ if not thinking_block_started:
+ # Start a thinking content block
+ block_start = {
+ "type": "content_block_start",
+ "index": current_block_index,
+ "content_block": {"type": "thinking", "thinking": ""},
+ }
+ yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n"
+ thinking_block_started = True
+
+ # Send thinking delta
+ block_delta = {
+ "type": "content_block_delta",
+ "index": current_block_index,
+ "delta": {"type": "thinking_delta", "thinking": reasoning_content},
+ }
+ yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n"
+ # Accumulate thinking for logging
+ accumulated_thinking += reasoning_content
+
+ # Handle text content
+ content = delta.get("content")
+ if content:
+ # If we were in a thinking block, close it first
+ if thinking_block_started and not content_block_started:
+ yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n'
+ current_block_index += 1
+ thinking_block_started = False
+
+ if not content_block_started:
+ # Start a text content block
+ block_start = {
+ "type": "content_block_start",
+ "index": current_block_index,
+ "content_block": {"type": "text", "text": ""},
+ }
+ yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n"
+ content_block_started = True
+
+ # Send content delta
+ block_delta = {
+ "type": "content_block_delta",
+ "index": current_block_index,
+ "delta": {"type": "text_delta", "text": content},
+ }
+ yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n"
+ # Accumulate text for logging
+ accumulated_text += content
+
+ # Handle tool calls
+ # Use `or []` to handle providers that send "tool_calls": null
+ tool_calls = delta.get("tool_calls") or []
+ for tc in tool_calls:
+ tc_index = tc.get("index", 0)
+
+ if tc_index not in tool_calls_by_index:
+ # Close previous thinking block if open
+ if thinking_block_started:
+ yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n'
+ current_block_index += 1
+ thinking_block_started = False
+
+ # Close previous text block if open
+ if content_block_started:
+ yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n'
+ current_block_index += 1
+ content_block_started = False
+
+ # Start new tool use block
+ tool_calls_by_index[tc_index] = {
+ "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"),
+ "name": tc.get("function", {}).get("name", ""),
+ "arguments": "",
+ }
+ # Track which block index this tool call uses
+ tool_block_indices[tc_index] = current_block_index
+
+ block_start = {
+ "type": "content_block_start",
+ "index": current_block_index,
+ "content_block": {
+ "type": "tool_use",
+ "id": tool_calls_by_index[tc_index]["id"],
+ "name": tool_calls_by_index[tc_index]["name"],
+ "input": {},
+ },
+ }
+ yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n"
+ # Increment for the next block
+ current_block_index += 1
+
+ # Accumulate arguments
+ func = tc.get("function", {})
+ if func.get("name"):
+ tool_calls_by_index[tc_index]["name"] = func["name"]
+ if func.get("arguments"):
+ tool_calls_by_index[tc_index]["arguments"] += func["arguments"]
+
+ # Send partial JSON delta using the correct block index for this tool
+ block_delta = {
+ "type": "content_block_delta",
+ "index": tool_block_indices[tc_index],
+ "delta": {
+ "type": "input_json_delta",
+ "partial_json": func["arguments"],
+ },
+ }
+ yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n"
+
+ # Note: We intentionally ignore finish_reason here.
+ # Block closing is handled when we receive [DONE] to avoid
+ # premature closes with providers that send finish_reason on each chunk.
+
+ except Exception as e:
+ logger.error(f"Error in Anthropic streaming wrapper: {e}")
+
+ # If we haven't sent message_start yet, send it now so the client can display the error
+ # Claude Code and other clients may ignore events that come before message_start
+ if not message_started:
+ # Build usage with cached tokens properly handled
+ usage_dict = {
+ "input_tokens": input_tokens - cached_tokens,
+ "output_tokens": 0,
+ }
+ if cached_tokens > 0:
+ usage_dict["cache_read_input_tokens"] = cached_tokens
+ usage_dict["cache_creation_input_tokens"] = 0
+
+ message_start = {
+ "type": "message_start",
+ "message": {
+ "id": request_id,
+ "type": "message",
+ "role": "assistant",
+ "content": [],
+ "model": original_model,
+ "stop_reason": None,
+ "stop_sequence": None,
+ "usage": usage_dict,
+ },
+ }
+ yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n"
+
+ # Send the error as a text content block so it's visible to the user
+ error_message = f"Error: {str(e)}"
+ error_block_start = {
+ "type": "content_block_start",
+ "index": current_block_index,
+ "content_block": {"type": "text", "text": ""},
+ }
+ yield f"event: content_block_start\ndata: {json.dumps(error_block_start)}\n\n"
+
+ error_block_delta = {
+ "type": "content_block_delta",
+ "index": current_block_index,
+ "delta": {"type": "text_delta", "text": error_message},
+ }
+ yield f"event: content_block_delta\ndata: {json.dumps(error_block_delta)}\n\n"
+
+ yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n'
+
+ # Build final usage with cached tokens
+ final_usage = {"output_tokens": 0}
+ if cached_tokens > 0:
+ final_usage["cache_read_input_tokens"] = cached_tokens
+ final_usage["cache_creation_input_tokens"] = 0
+
+ # Send message_delta and message_stop to properly close the stream
+ yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "end_turn", "stop_sequence": null}}, "usage": {json.dumps(final_usage)}}}\n\n'
+ yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n'
+
+ # Also send the formal error event for clients that handle it
+ error_event = {
+ "type": "error",
+ "error": {"type": "api_error", "message": str(e)},
+ }
+ yield f"event: error\ndata: {json.dumps(error_event)}\n\n"
diff --git a/src/rotator_library/anthropic_compat/translator.py b/src/rotator_library/anthropic_compat/translator.py
new file mode 100644
index 00000000..875d19b6
--- /dev/null
+++ b/src/rotator_library/anthropic_compat/translator.py
@@ -0,0 +1,626 @@
+"""
+Format translation functions between Anthropic and OpenAI API formats.
+
+This module provides functions to convert requests and responses between
+Anthropic's Messages API format and OpenAI's Chat Completions API format.
+This enables any OpenAI-compatible provider to work with Anthropic clients.
+"""
+
+import json
+import uuid
+from typing import Any, Dict, List, Optional, Union
+
+from .models import AnthropicMessagesRequest
+
+MIN_THINKING_SIGNATURE_LENGTH = 100
+
+# =============================================================================
+# THINKING BUDGET TO REASONING EFFORT MAPPING
+# =============================================================================
+
+# Budget thresholds for reasoning effort levels (based on token counts)
+# These map Anthropic's budget_tokens to OpenAI-style reasoning_effort levels
+THINKING_BUDGET_THRESHOLDS = {
+ "minimal": 4096,
+ "low": 8192,
+ "low_medium": 12288,
+ "medium": 16384,
+ "medium_high": 24576,
+ "high": 32768,
+}
+
+# Providers that support granular reasoning effort levels (low_medium, medium_high, etc.)
+# Other providers will receive simplified levels (low, medium, high)
+GRANULAR_REASONING_PROVIDERS = {"antigravity"}
+
+
+def _budget_to_reasoning_effort(budget_tokens: int, model: str) -> str:
+ """
+ Map Anthropic thinking budget_tokens to a reasoning_effort level.
+
+ Args:
+ budget_tokens: The thinking budget in tokens from the Anthropic request
+ model: The model name (used to determine if provider supports granular levels)
+
+ Returns:
+ A reasoning_effort level string (e.g., "low", "medium", "high")
+ """
+ # Determine granular level based on budget
+ if budget_tokens <= THINKING_BUDGET_THRESHOLDS["minimal"]:
+ granular_level = "minimal"
+ elif budget_tokens <= THINKING_BUDGET_THRESHOLDS["low"]:
+ granular_level = "low"
+ elif budget_tokens <= THINKING_BUDGET_THRESHOLDS["low_medium"]:
+ granular_level = "low_medium"
+ elif budget_tokens <= THINKING_BUDGET_THRESHOLDS["medium"]:
+ granular_level = "medium"
+ elif budget_tokens <= THINKING_BUDGET_THRESHOLDS["medium_high"]:
+ granular_level = "medium_high"
+ else:
+ granular_level = "high"
+
+ # Check if provider supports granular levels
+ provider = model.split("/")[0].lower() if "/" in model else ""
+ if provider in GRANULAR_REASONING_PROVIDERS:
+ return granular_level
+
+ # Simplify to basic levels for non-granular providers
+ simplify_map = {
+ "minimal": "low",
+ "low": "low",
+ "low_medium": "medium",
+ "medium": "medium",
+ "medium_high": "high",
+ "high": "high",
+ }
+ return simplify_map.get(granular_level, "medium")
+
+
+def _reorder_assistant_content(content: List[dict]) -> List[dict]:
+ """
+ Reorder assistant message content blocks to ensure correct order:
+ 1. Thinking blocks come first (required when thinking is enabled)
+ 2. Text blocks come in the middle (filtering out empty ones)
+ 3. Tool_use blocks come at the end (required before tool_result)
+
+ This matches Anthropic's expected ordering and prevents API errors.
+ """
+ if not isinstance(content, list) or len(content) <= 1:
+ return content
+
+ thinking_blocks = []
+ text_blocks = []
+ tool_use_blocks = []
+ other_blocks = []
+
+ for block in content:
+ if not isinstance(block, dict):
+ other_blocks.append(block)
+ continue
+
+ block_type = block.get("type", "")
+
+ if block_type in ("thinking", "redacted_thinking"):
+ # Sanitize thinking blocks - remove cache_control and other extra fields
+ sanitized = {
+ "type": block_type,
+ "thinking": block.get("thinking", ""),
+ }
+ if block.get("signature"):
+ sanitized["signature"] = block["signature"]
+ thinking_blocks.append(sanitized)
+
+ elif block_type == "tool_use":
+ tool_use_blocks.append(block)
+
+ elif block_type == "text":
+ # Only keep text blocks with meaningful content
+ text = block.get("text", "")
+ if text and text.strip():
+ text_blocks.append(block)
+
+ else:
+ # Other block types (images, documents, etc.) go in the text position
+ other_blocks.append(block)
+
+ # Reorder: thinking → other → text → tool_use
+ return thinking_blocks + other_blocks + text_blocks + tool_use_blocks
+
+
+def anthropic_to_openai_messages(
+ anthropic_messages: List[dict], system: Optional[Union[str, List[dict]]] = None
+) -> List[dict]:
+ """
+ Convert Anthropic message format to OpenAI format.
+
+ Key differences:
+ - Anthropic: system is a separate field, content can be string or list of blocks
+ - OpenAI: system is a message with role="system", content is usually string
+
+ Args:
+ anthropic_messages: List of messages in Anthropic format
+ system: Optional system message (string or list of text blocks)
+
+ Returns:
+ List of messages in OpenAI format
+ """
+ openai_messages = []
+
+ # Handle system message
+ if system:
+ if isinstance(system, str):
+ openai_messages.append({"role": "system", "content": system})
+ elif isinstance(system, list):
+ # System can be list of text blocks in Anthropic format
+ system_text = " ".join(
+ block.get("text", "")
+ for block in system
+ if isinstance(block, dict) and block.get("type") == "text"
+ )
+ if system_text:
+ openai_messages.append({"role": "system", "content": system_text})
+
+ for msg in anthropic_messages:
+ role = msg.get("role", "user")
+ content = msg.get("content", "")
+
+ if isinstance(content, str):
+ openai_messages.append({"role": role, "content": content})
+ elif isinstance(content, list):
+ # Reorder assistant content blocks to ensure correct order:
+ # thinking → text → tool_use
+ if role == "assistant":
+ content = _reorder_assistant_content(content)
+
+ # Handle content blocks
+ openai_content = []
+ tool_calls = []
+ reasoning_content = ""
+ thinking_signature = ""
+
+ for block in content:
+ if isinstance(block, dict):
+ block_type = block.get("type", "text")
+
+ if block_type == "text":
+ openai_content.append(
+ {"type": "text", "text": block.get("text", "")}
+ )
+ elif block_type == "image":
+ # Convert Anthropic image format to OpenAI
+ source = block.get("source", {})
+ if source.get("type") == "base64":
+ openai_content.append(
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:{source.get('media_type', 'image/png')};base64,{source.get('data', '')}"
+ },
+ }
+ )
+ elif source.get("type") == "url":
+ openai_content.append(
+ {
+ "type": "image_url",
+ "image_url": {"url": source.get("url", "")},
+ }
+ )
+ elif block_type == "document":
+ # Convert Anthropic document format (e.g. PDF) to OpenAI
+ # Documents are treated similarly to images with appropriate mime type
+ source = block.get("source", {})
+ if source.get("type") == "base64":
+ openai_content.append(
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:{source.get('media_type', 'application/pdf')};base64,{source.get('data', '')}"
+ },
+ }
+ )
+ elif source.get("type") == "url":
+ openai_content.append(
+ {
+ "type": "image_url",
+ "image_url": {"url": source.get("url", "")},
+ }
+ )
+ elif block_type == "thinking":
+ signature = block.get("signature", "")
+ if (
+ signature
+ and len(signature) >= MIN_THINKING_SIGNATURE_LENGTH
+ ):
+ thinking_text = block.get("thinking", "")
+ if thinking_text:
+ reasoning_content += thinking_text
+ thinking_signature = signature
+ elif block_type == "redacted_thinking":
+ signature = block.get("signature", "")
+ if (
+ signature
+ and len(signature) >= MIN_THINKING_SIGNATURE_LENGTH
+ ):
+ thinking_signature = signature
+ elif block_type == "tool_use":
+ # Anthropic tool_use -> OpenAI tool_calls
+ tool_calls.append(
+ {
+ "id": block.get("id", ""),
+ "type": "function",
+ "function": {
+ "name": block.get("name", ""),
+ "arguments": json.dumps(block.get("input", {})),
+ },
+ }
+ )
+ elif block_type == "tool_result":
+ # Tool results become separate messages in OpenAI format
+ # Content can be string, or list of text/image blocks
+ tool_content = block.get("content", "")
+ if isinstance(tool_content, str):
+ # Simple string content
+ openai_messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": block.get("tool_use_id", ""),
+ "content": tool_content,
+ }
+ )
+ elif isinstance(tool_content, list):
+ # List of content blocks - may include text and images
+ tool_content_parts = []
+ for b in tool_content:
+ if not isinstance(b, dict):
+ continue
+ b_type = b.get("type", "")
+ if b_type == "text":
+ tool_content_parts.append(
+ {"type": "text", "text": b.get("text", "")}
+ )
+ elif b_type == "image":
+ # Convert Anthropic image format to OpenAI format
+ source = b.get("source", {})
+ if source.get("type") == "base64":
+ tool_content_parts.append(
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:{source.get('media_type', 'image/png')};base64,{source.get('data', '')}"
+ },
+ }
+ )
+ elif source.get("type") == "url":
+ tool_content_parts.append(
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": source.get("url", "")
+ },
+ }
+ )
+
+ # If we only have text parts, join them as a string for compatibility
+ # Otherwise use the array format for multimodal content
+ if all(p.get("type") == "text" for p in tool_content_parts):
+ combined_text = " ".join(
+ p.get("text", "") for p in tool_content_parts
+ )
+ openai_messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": block.get("tool_use_id", ""),
+ "content": combined_text,
+ }
+ )
+ elif tool_content_parts:
+ # Multimodal content (includes images)
+ openai_messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": block.get("tool_use_id", ""),
+ "content": tool_content_parts,
+ }
+ )
+ else:
+ # Empty content
+ openai_messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": block.get("tool_use_id", ""),
+ "content": "",
+ }
+ )
+ else:
+ # Fallback for unexpected content type
+ openai_messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": block.get("tool_use_id", ""),
+ "content": str(tool_content)
+ if tool_content
+ else "",
+ }
+ )
+ continue # Don't add to current message
+
+ # Build the message
+ if tool_calls:
+ # Assistant message with tool calls
+ msg_dict = {"role": role}
+ if openai_content:
+ # If there's text content alongside tool calls
+ text_parts = [
+ c.get("text", "")
+ for c in openai_content
+ if c.get("type") == "text"
+ ]
+ msg_dict["content"] = " ".join(text_parts) if text_parts else None
+ else:
+ msg_dict["content"] = None
+ if reasoning_content:
+ msg_dict["reasoning_content"] = reasoning_content
+ if thinking_signature:
+ msg_dict["thinking_signature"] = thinking_signature
+ msg_dict["tool_calls"] = tool_calls
+ openai_messages.append(msg_dict)
+ elif openai_content:
+ # Check if it's just text or mixed content
+ if len(openai_content) == 1 and openai_content[0].get("type") == "text":
+ msg_dict = {
+ "role": role,
+ "content": openai_content[0].get("text", ""),
+ }
+ if reasoning_content:
+ msg_dict["reasoning_content"] = reasoning_content
+ if thinking_signature:
+ msg_dict["thinking_signature"] = thinking_signature
+ openai_messages.append(msg_dict)
+ else:
+ msg_dict = {"role": role, "content": openai_content}
+ if reasoning_content:
+ msg_dict["reasoning_content"] = reasoning_content
+ if thinking_signature:
+ msg_dict["thinking_signature"] = thinking_signature
+ openai_messages.append(msg_dict)
+ elif reasoning_content:
+ msg_dict = {"role": role, "content": ""}
+ msg_dict["reasoning_content"] = reasoning_content
+ if thinking_signature:
+ msg_dict["thinking_signature"] = thinking_signature
+ openai_messages.append(msg_dict)
+
+ return openai_messages
+
+
+def anthropic_to_openai_tools(
+ anthropic_tools: Optional[List[dict]],
+) -> Optional[List[dict]]:
+ """
+ Convert Anthropic tool definitions to OpenAI format.
+
+ Args:
+ anthropic_tools: List of tools in Anthropic format
+
+ Returns:
+ List of tools in OpenAI format, or None if no tools provided
+ """
+ if not anthropic_tools:
+ return None
+
+ openai_tools = []
+ for tool in anthropic_tools:
+ openai_tools.append(
+ {
+ "type": "function",
+ "function": {
+ "name": tool.get("name", ""),
+ "description": tool.get("description", ""),
+ "parameters": tool.get("input_schema", {}),
+ },
+ }
+ )
+ return openai_tools
+
+
+def anthropic_to_openai_tool_choice(
+ anthropic_tool_choice: Optional[dict],
+) -> Optional[Union[str, dict]]:
+ """
+ Convert Anthropic tool_choice to OpenAI format.
+
+ Args:
+ anthropic_tool_choice: Tool choice in Anthropic format
+
+ Returns:
+ Tool choice in OpenAI format
+ """
+ if not anthropic_tool_choice:
+ return None
+
+ choice_type = anthropic_tool_choice.get("type", "auto")
+
+ if choice_type == "auto":
+ return "auto"
+ elif choice_type == "any":
+ return "required"
+ elif choice_type == "tool":
+ return {
+ "type": "function",
+ "function": {"name": anthropic_tool_choice.get("name", "")},
+ }
+ elif choice_type == "none":
+ return "none"
+
+ return "auto"
+
+
+def openai_to_anthropic_response(openai_response: dict, original_model: str) -> dict:
+ """
+ Convert OpenAI chat completion response to Anthropic Messages format.
+
+ Args:
+ openai_response: Response from OpenAI-compatible API
+ original_model: The model name requested by the client
+
+ Returns:
+ Response in Anthropic Messages format
+ """
+ choice = openai_response.get("choices", [{}])[0]
+ message = choice.get("message", {})
+ usage = openai_response.get("usage", {})
+
+ # Build content blocks
+ content_blocks = []
+
+ # Add thinking content block if reasoning_content is present
+ reasoning_content = message.get("reasoning_content")
+ if reasoning_content:
+ thinking_signature = message.get("thinking_signature", "")
+ signature = (
+ thinking_signature
+ if thinking_signature
+ and len(thinking_signature) >= MIN_THINKING_SIGNATURE_LENGTH
+ else ""
+ )
+ content_blocks.append(
+ {
+ "type": "thinking",
+ "thinking": reasoning_content,
+ "signature": signature,
+ }
+ )
+
+ # Add text content if present
+ text_content = message.get("content")
+ if text_content:
+ content_blocks.append({"type": "text", "text": text_content})
+
+ # Add tool use blocks if present
+ tool_calls = message.get("tool_calls") or []
+ for tc in tool_calls:
+ func = tc.get("function", {})
+ try:
+ input_data = json.loads(func.get("arguments", "{}"))
+ except json.JSONDecodeError:
+ input_data = {}
+
+ content_blocks.append(
+ {
+ "type": "tool_use",
+ "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"),
+ "name": func.get("name", ""),
+ "input": input_data,
+ }
+ )
+
+ # Map finish_reason to stop_reason
+ finish_reason = choice.get("finish_reason", "end_turn")
+ stop_reason_map = {
+ "stop": "end_turn",
+ "length": "max_tokens",
+ "tool_calls": "tool_use",
+ "content_filter": "end_turn",
+ "function_call": "tool_use",
+ }
+ stop_reason = stop_reason_map.get(finish_reason, "end_turn")
+
+ # Build usage
+ # Note: Google's promptTokenCount INCLUDES cached tokens, but Anthropic's
+ # input_tokens EXCLUDES cached tokens. We need to subtract cached tokens.
+ prompt_tokens = usage.get("prompt_tokens", 0)
+ cached_tokens = 0
+
+ # Extract cached tokens if present
+ if usage.get("prompt_tokens_details"):
+ details = usage["prompt_tokens_details"]
+ cached_tokens = details.get("cached_tokens", 0)
+
+ anthropic_usage = {
+ "input_tokens": prompt_tokens - cached_tokens, # Subtract cached tokens
+ "output_tokens": usage.get("completion_tokens", 0),
+ }
+
+ # Add cache tokens if present
+ if cached_tokens > 0:
+ anthropic_usage["cache_read_input_tokens"] = cached_tokens
+ anthropic_usage["cache_creation_input_tokens"] = 0
+
+ return {
+ "id": openai_response.get("id", f"msg_{uuid.uuid4().hex[:24]}"),
+ "type": "message",
+ "role": "assistant",
+ "content": content_blocks,
+ "model": original_model,
+ "stop_reason": stop_reason,
+ "stop_sequence": None,
+ "usage": anthropic_usage,
+ }
+
+
+def translate_anthropic_request(request: AnthropicMessagesRequest) -> Dict[str, Any]:
+ """
+ Translate a complete Anthropic Messages API request to OpenAI format.
+
+ This is a high-level function that handles all aspects of request translation,
+ including messages, tools, tool_choice, and thinking configuration.
+
+ Args:
+ request: An AnthropicMessagesRequest object
+
+ Returns:
+ Dictionary containing the OpenAI-compatible request parameters
+ """
+ anthropic_request = request.model_dump(exclude_none=True)
+
+ messages = anthropic_request.get("messages", [])
+ openai_messages = anthropic_to_openai_messages(
+ messages, anthropic_request.get("system")
+ )
+
+ openai_tools = anthropic_to_openai_tools(anthropic_request.get("tools"))
+ openai_tool_choice = anthropic_to_openai_tool_choice(
+ anthropic_request.get("tool_choice")
+ )
+
+ # Build OpenAI-compatible request
+ openai_request = {
+ "model": request.model,
+ "messages": openai_messages,
+ "max_tokens": request.max_tokens,
+ "stream": request.stream or False,
+ }
+
+ if request.temperature is not None:
+ openai_request["temperature"] = request.temperature
+ if request.top_p is not None:
+ openai_request["top_p"] = request.top_p
+ if request.top_k is not None:
+ openai_request["top_k"] = request.top_k
+ if request.stop_sequences:
+ openai_request["stop"] = request.stop_sequences
+ if openai_tools:
+ openai_request["tools"] = openai_tools
+ if openai_tool_choice:
+ openai_request["tool_choice"] = openai_tool_choice
+
+ # Note: request.metadata is intentionally not mapped.
+ # OpenAI's API doesn't have an equivalent field for client-side metadata.
+ # The metadata is typically used by Anthropic clients for tracking purposes
+ # and doesn't affect the model's behavior.
+
+ # Handle Anthropic thinking config -> reasoning_effort translation
+ # Only set reasoning_effort if thinking is explicitly configured
+ if request.thinking:
+ if request.thinking.type == "enabled":
+ # Only set reasoning_effort if budget_tokens was specified
+ if request.thinking.budget_tokens is not None:
+ openai_request["reasoning_effort"] = _budget_to_reasoning_effort(
+ request.thinking.budget_tokens, request.model
+ )
+ # If thinking enabled but no budget specified, don't set anything
+ # Let the provider decide the default
+ elif request.thinking.type == "disabled":
+ openai_request["reasoning_effort"] = "disable"
+
+ return openai_request
diff --git a/src/rotator_library/client.py b/src/rotator_library/client.py
index 554b6d2a..a0ec4dfa 100644
--- a/src/rotator_library/client.py
+++ b/src/rotator_library/client.py
@@ -1244,13 +1244,22 @@ async def _execute_with_retry(
f"No API keys or OAuth credentials configured for provider: {provider}"
)
+ # Extract internal logging parameters (not passed to API)
+ parent_log_dir = kwargs.pop("_parent_log_dir", None)
+
# Establish a global deadline for the entire request lifecycle.
deadline = time.time() + self.global_timeout
# Create transaction logger if request logging is enabled
transaction_logger = None
if self.enable_request_logging:
- transaction_logger = TransactionLogger(provider, model, enabled=True)
+ transaction_logger = TransactionLogger(
+ provider,
+ model,
+ enabled=True,
+ api_format="oai",
+ parent_dir=parent_log_dir,
+ )
transaction_logger.log_request(kwargs)
# Create a mutable copy of the keys and shuffle it to ensure
@@ -2000,6 +2009,9 @@ async def _streaming_acompletion_with_retry(
model = kwargs.get("model")
provider = model.split("/")[0]
+ # Extract internal logging parameters (not passed to API)
+ parent_log_dir = kwargs.pop("_parent_log_dir", None)
+
# Create a mutable copy of the keys and shuffle it.
credentials_for_provider = list(self.all_credentials[provider])
random.shuffle(credentials_for_provider)
@@ -2022,7 +2034,13 @@ async def _streaming_acompletion_with_retry(
# Create transaction logger if request logging is enabled
transaction_logger = None
if self.enable_request_logging:
- transaction_logger = TransactionLogger(provider, model, enabled=True)
+ transaction_logger = TransactionLogger(
+ provider,
+ model,
+ enabled=True,
+ api_format="oai",
+ parent_dir=parent_log_dir,
+ )
transaction_logger.log_request(kwargs)
tried_creds = set()
@@ -2893,7 +2911,12 @@ def aembedding(
)
def token_count(self, **kwargs) -> int:
- """Calculates the number of tokens for a given text or list of messages."""
+ """Calculates the number of tokens for a given text or list of messages.
+
+ For Antigravity provider models, this also includes the preprompt tokens
+ that get injected during actual API calls (agent instruction + identity override).
+ This ensures token counts match actual usage.
+ """
kwargs = self._convert_model_params(**kwargs)
model = kwargs.get("model")
text = kwargs.get("text")
@@ -2901,13 +2924,35 @@ def token_count(self, **kwargs) -> int:
if not model:
raise ValueError("'model' is a required parameter.")
+
+ # Calculate base token count
if messages:
- return token_counter(model=model, messages=messages)
+ base_count = token_counter(model=model, messages=messages)
elif text:
- return token_counter(model=model, text=text)
+ base_count = token_counter(model=model, text=text)
else:
raise ValueError("Either 'text' or 'messages' must be provided.")
+ # Add preprompt tokens for Antigravity provider
+ # The Antigravity provider injects system instructions during actual API calls,
+ # so we need to account for those tokens in the count
+ provider = model.split("/")[0] if "/" in model else ""
+ if provider == "antigravity":
+ try:
+ from .providers.antigravity_provider import (
+ get_antigravity_preprompt_text,
+ )
+
+ preprompt_text = get_antigravity_preprompt_text()
+ if preprompt_text:
+ preprompt_tokens = token_counter(model=model, text=preprompt_text)
+ base_count += preprompt_tokens
+ except ImportError:
+ # Provider not available, skip preprompt token counting
+ pass
+
+ return base_count
+
async def get_available_models(self, provider: str) -> List[str]:
"""Returns a list of available models for a specific provider, with caching."""
lib_logger.info(f"Getting available models for provider: {provider}")
@@ -3417,3 +3462,168 @@ async def force_refresh_quota(
result["duration_ms"] = int((time.time() - start_time) * 1000)
return result
+
+ # --- Anthropic API Compatibility Methods ---
+
+ async def anthropic_messages(
+ self,
+ request: "AnthropicMessagesRequest",
+ raw_request: Optional[Any] = None,
+ pre_request_callback: Optional[callable] = None,
+ ) -> Any:
+ """
+ Handle Anthropic Messages API requests.
+
+ This method accepts requests in Anthropic's format, translates them to
+ OpenAI format internally, processes them through the existing acompletion
+ method, and returns responses in Anthropic's format.
+
+ Args:
+ request: An AnthropicMessagesRequest object
+ raw_request: Optional raw request object for disconnect checks
+ pre_request_callback: Optional async callback before each API request
+
+ Returns:
+ For non-streaming: dict in Anthropic Messages format
+ For streaming: AsyncGenerator yielding Anthropic SSE format strings
+ """
+ from .anthropic_compat import (
+ translate_anthropic_request,
+ openai_to_anthropic_response,
+ anthropic_streaming_wrapper,
+ )
+ import uuid
+
+ request_id = f"msg_{uuid.uuid4().hex[:24]}"
+ original_model = request.model
+
+ # Extract provider from model for logging
+ provider = original_model.split("/")[0] if "/" in original_model else "unknown"
+
+ # Create Anthropic transaction logger if request logging is enabled
+ anthropic_logger = None
+ if self.enable_request_logging:
+ anthropic_logger = TransactionLogger(
+ provider,
+ original_model,
+ enabled=True,
+ api_format="ant",
+ )
+ # Log original Anthropic request
+ anthropic_logger.log_request(
+ request.model_dump(exclude_none=True),
+ filename="anthropic_request.json",
+ )
+
+ # Translate Anthropic request to OpenAI format
+ openai_request = translate_anthropic_request(request)
+
+ # Pass parent log directory to acompletion for nested logging
+ if anthropic_logger and anthropic_logger.log_dir:
+ openai_request["_parent_log_dir"] = anthropic_logger.log_dir
+
+ if request.stream:
+ # Streaming response
+ response_generator = self.acompletion(
+ request=raw_request,
+ pre_request_callback=pre_request_callback,
+ **openai_request,
+ )
+
+ # Create disconnect checker if raw_request provided
+ is_disconnected = None
+ if raw_request is not None and hasattr(raw_request, "is_disconnected"):
+ is_disconnected = raw_request.is_disconnected
+
+ # Return the streaming wrapper
+ # Note: For streaming, the anthropic response logging happens in the wrapper
+ return anthropic_streaming_wrapper(
+ openai_stream=response_generator,
+ original_model=original_model,
+ request_id=request_id,
+ is_disconnected=is_disconnected,
+ transaction_logger=anthropic_logger,
+ )
+ else:
+ # Non-streaming response
+ response = await self.acompletion(
+ request=raw_request,
+ pre_request_callback=pre_request_callback,
+ **openai_request,
+ )
+
+ # Convert OpenAI response to Anthropic format
+ openai_response = (
+ response.model_dump()
+ if hasattr(response, "model_dump")
+ else dict(response)
+ )
+ anthropic_response = openai_to_anthropic_response(
+ openai_response, original_model
+ )
+
+ # Override the ID with our request ID
+ anthropic_response["id"] = request_id
+
+ # Log Anthropic response
+ if anthropic_logger:
+ anthropic_logger.log_response(
+ anthropic_response,
+ filename="anthropic_response.json",
+ )
+
+ return anthropic_response
+
+ async def anthropic_count_tokens(
+ self,
+ request: "AnthropicCountTokensRequest",
+ ) -> dict:
+ """
+ Handle Anthropic count_tokens API requests.
+
+ Counts the number of tokens that would be used by a Messages API request.
+ This is useful for estimating costs and managing context windows.
+
+ Args:
+ request: An AnthropicCountTokensRequest object
+
+ Returns:
+ Dict with input_tokens count in Anthropic format
+ """
+ from .anthropic_compat import (
+ anthropic_to_openai_messages,
+ anthropic_to_openai_tools,
+ )
+ import json
+
+ anthropic_request = request.model_dump(exclude_none=True)
+
+ openai_messages = anthropic_to_openai_messages(
+ anthropic_request.get("messages", []), anthropic_request.get("system")
+ )
+
+ # Count tokens for messages
+ message_tokens = self.token_count(
+ model=request.model,
+ messages=openai_messages,
+ )
+
+ # Count tokens for tools if present
+ tool_tokens = 0
+ if request.tools:
+ # Tools add tokens based on their definitions
+ # Convert to JSON string and count tokens for tool definitions
+ openai_tools = anthropic_to_openai_tools(
+ [tool.model_dump() for tool in request.tools]
+ )
+ if openai_tools:
+ # Serialize tools to count their token contribution
+ tools_text = json.dumps(openai_tools)
+ tool_tokens = self.token_count(
+ model=request.model,
+ text=tools_text,
+ )
+
+ total_tokens = message_tokens + tool_tokens
+
+ return {"input_tokens": total_tokens}
diff --git a/src/rotator_library/providers/antigravity_provider.py b/src/rotator_library/providers/antigravity_provider.py
index 077b45a6..102d7a95 100644
--- a/src/rotator_library/providers/antigravity_provider.py
+++ b/src/rotator_library/providers/antigravity_provider.py
@@ -445,12 +445,47 @@ def _get_claude_thinking_cache_file():
# Exact prompt from CLIProxyAPI commit 1b2f9076715b62610f9f37d417e850832b3c7ed1
ANTIGRAVITY_AGENT_SYSTEM_INSTRUCTION_SHORT = """You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.You are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.**Absolute paths only****Proactiveness**"""
-
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
+def get_antigravity_preprompt_text() -> str:
+ """
+ Get the combined Antigravity preprompt text that gets injected into requests.
+
+ This function returns the exact text that gets prepended to system instructions
+ during actual API calls. It respects the current configuration settings:
+ - PREPEND_INSTRUCTION: Whether to include any preprompt at all
+ - USE_SHORT_ANTIGRAVITY_PROMPTS: Whether to use short or full versions
+ - INJECT_IDENTITY_OVERRIDE: Whether to include the identity override
+
+ This is useful for accurate token counting - the token count endpoints should
+ include these preprompts to match what actually gets sent to the API.
+
+ Returns:
+ The combined preprompt text, or empty string if prepending is disabled.
+ """
+ if not PREPEND_INSTRUCTION:
+ return ""
+
+ # Choose prompt versions based on USE_SHORT_ANTIGRAVITY_PROMPTS setting
+ if USE_SHORT_ANTIGRAVITY_PROMPTS:
+ agent_instruction = ANTIGRAVITY_AGENT_SYSTEM_INSTRUCTION_SHORT
+ override_instruction = ANTIGRAVITY_IDENTITY_OVERRIDE_INSTRUCTION_SHORT
+ else:
+ agent_instruction = ANTIGRAVITY_AGENT_SYSTEM_INSTRUCTION
+ override_instruction = ANTIGRAVITY_IDENTITY_OVERRIDE_INSTRUCTION
+
+ # Build the combined preprompt
+ parts = [agent_instruction]
+
+ if INJECT_IDENTITY_OVERRIDE:
+ parts.append(override_instruction)
+
+ return "\n".join(parts)
+
+
def _sanitize_headers(headers: Dict[str, str]) -> Dict[str, str]:
"""
Strip identifiable client headers for privacy/security.
@@ -2262,7 +2297,9 @@ def _close_tool_loop_for_thinking(
# =========================================================================
def _get_thinking_config(
- self, reasoning_effort: Optional[str], model: str
+ self,
+ reasoning_effort: Optional[str],
+ model: str,
) -> Optional[Dict[str, Any]]:
"""
Map reasoning_effort to thinking configuration.
@@ -2638,16 +2675,12 @@ def _transform_tool_message(
func_name = tool_id_to_name.get(tool_id, "unknown_function")
content = msg.get("content", "{}")
- # Log ID lookup
if tool_id not in tool_id_to_name:
lib_logger.warning(
f"[ID Mismatch] Tool response has ID '{tool_id}' which was not found in tool_id_to_name map. "
f"Available IDs: {list(tool_id_to_name.keys())}"
)
- # else:
- # lib_logger.debug(f"[ID Mapping] Tool response matched: id={tool_id}, name={func_name}")
- # Add prefix for Gemini 3 (and rename problematic tools)
if self._is_gemini_3(model) and self._enable_gemini3_tool_fix:
func_name = GEMINI3_TOOL_RENAMES.get(func_name, func_name)
func_name = f"{self._gemini3_tool_prefix}{func_name}"
diff --git a/src/rotator_library/transaction_logger.py b/src/rotator_library/transaction_logger.py
index 58b3155f..6b95234a 100644
--- a/src/rotator_library/transaction_logger.py
+++ b/src/rotator_library/transaction_logger.py
@@ -98,11 +98,19 @@ class TransactionLogger:
"provider",
"model",
"streaming",
+ "api_format",
"_dir_available",
"_context",
)
- def __init__(self, provider: str, model: str, enabled: bool = True):
+ def __init__(
+ self,
+ provider: str,
+ model: str,
+ enabled: bool = True,
+ api_format: str = "oai",
+ parent_dir: Optional[Path] = None,
+ ):
"""
Initialize transaction logger.
@@ -110,11 +118,14 @@ def __init__(self, provider: str, model: str, enabled: bool = True):
provider: Provider name (e.g., 'antigravity', 'openai')
model: Model name (will be sanitized for filesystem)
enabled: Whether logging is enabled
+ api_format: API format prefix ('oai' for OpenAI, 'ant' for Anthropic)
+ parent_dir: Optional parent directory for nested logging
"""
self.enabled = enabled
self.start_time = time.time()
self.request_id = str(uuid.uuid4())[:8] # 8-char short ID
self.provider = provider
+ self.api_format = api_format
# Strip provider prefix from model if present
# e.g., "antigravity/claude-opus-4.5" → "claude-opus-4.5"
@@ -131,12 +142,19 @@ def __init__(self, provider: str, model: str, enabled: bool = True):
if not enabled:
return
- # Create directory: MMDD_HHMMSS_{provider}_{model}_{request_id}
+ # Create directory based on whether we have a parent directory
timestamp = datetime.now().strftime("%m%d_%H%M%S")
safe_provider = _sanitize_name(provider)
- dir_name = f"{timestamp}_{safe_provider}_{self.model}_{self.request_id}"
- self.log_dir = _get_transactions_dir() / dir_name
+ if parent_dir:
+ # Nested logging: create subdirectory inside parent
+ # e.g., parent_dir/openai/ for OpenAI translation layer
+ subdir_name = "openai" if api_format == "oai" else api_format
+ self.log_dir = parent_dir / subdir_name
+ else:
+ # Root-level logging: MMDD_HHMMSS_{api_format}_{provider}_{model}_{request_id}
+ dir_name = f"{timestamp}_{api_format}_{safe_provider}_{self.model}_{self.request_id}"
+ self.log_dir = _get_transactions_dir() / dir_name
try:
self.log_dir.mkdir(parents=True, exist_ok=True)
@@ -162,12 +180,15 @@ def get_context(self) -> TransactionContext:
)
return self._context
- def log_request(self, request_data: Dict[str, Any]) -> None:
+ def log_request(
+ self, request_data: Dict[str, Any], filename: str = "request.json"
+ ) -> None:
"""
- Log the OpenAI-compatible request received by client.py.
+ Log the request received by client.py.
Args:
request_data: The request data dict (messages, model, etc.)
+ filename: Custom filename for the log file (default: request.json)
"""
if not self.enabled or not self._dir_available:
return
@@ -179,7 +200,7 @@ def log_request(self, request_data: Dict[str, Any]) -> None:
"timestamp_utc": datetime.utcnow().isoformat(),
"data": request_data,
}
- self._write_json("request.json", data)
+ self._write_json(filename, data)
def log_stream_chunk(self, chunk: Dict[str, Any]) -> None:
"""
@@ -203,14 +224,16 @@ def log_response(
response_data: Dict[str, Any],
status_code: int = 200,
headers: Optional[Dict[str, Any]] = None,
+ filename: str = "response.json",
) -> None:
"""
- Log the OpenAI-compatible response returned by client.py.
+ Log the response returned by client.py.
Args:
response_data: The response data dict
status_code: HTTP status code (default 200)
headers: Optional response headers
+ filename: Custom filename for the log file (default: response.json)
"""
if not self.enabled or not self._dir_available:
return
@@ -226,7 +249,7 @@ def log_response(
"headers": dict(headers) if headers else None,
"data": response_data,
}
- self._write_json("response.json", data)
+ self._write_json(filename, data)
# Also write metadata
self._log_metadata(response_data, status_code, duration_ms)