diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 7a558a3e..e5f28891 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -1,4 +1,5 @@ import time +import uuid # Phase 1: Minimal imports for arg parsing and TUI import asyncio @@ -99,7 +100,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware - from fastapi.responses import StreamingResponse + from fastapi.responses import StreamingResponse, JSONResponse from fastapi.security import APIKeyHeader print(" → Loading core dependencies...") @@ -124,6 +125,18 @@ from rotator_library.credential_manager import CredentialManager from rotator_library.background_refresher import BackgroundRefresher from rotator_library.model_info_service import init_model_info_service + from rotator_library.anthropic_compat import ( + AnthropicMessagesRequest, + AnthropicMessagesResponse, + AnthropicCountTokensRequest, + AnthropicCountTokensResponse, + anthropic_streaming_wrapper, + anthropic_to_openai_messages, + anthropic_to_openai_tools, + anthropic_to_openai_tool_choice, + openai_to_anthropic_response, + translate_anthropic_request, + ) from proxy_app.request_logger import log_request_to_console from proxy_app.batch_manager import EmbeddingBatcher from proxy_app.detailed_logger import DetailedLogger @@ -214,6 +227,9 @@ class EnrichedModelList(BaseModel): data: List[EnrichedModelCard] +# Anthropic API Models are imported from rotator_library.anthropic_compat + + # Calculate total loading time _elapsed = time.time() - _start_time print( @@ -665,6 +681,33 @@ async def verify_api_key(auth: str = Depends(api_key_header)): return auth +# --- Anthropic API Key Header --- +anthropic_api_key_header = APIKeyHeader(name="x-api-key", auto_error=False) + + +async def verify_anthropic_api_key( + x_api_key: str = Depends(anthropic_api_key_header), + auth: str = Depends(api_key_header), +): + """ + Dependency to verify API key for Anthropic endpoints. + Accepts either x-api-key header (Anthropic style) or Authorization Bearer (OpenAI style). + """ + # If PROXY_API_KEY is not set or empty, skip verification (open access) + if not PROXY_API_KEY: + return auth or x_api_key + # Check x-api-key first (Anthropic style) + if x_api_key and x_api_key == PROXY_API_KEY: + return x_api_key + # Fall back to Bearer token (OpenAI style) + if auth and auth == f"Bearer {PROXY_API_KEY}": + return auth + raise HTTPException(status_code=401, detail="Invalid or missing API Key") + + +# Format translation functions are now in rotator_library.anthropic_compat + + async def streaming_response_wrapper( request: Request, request_data: dict, @@ -967,6 +1010,259 @@ async def chat_completions( raise HTTPException(status_code=500, detail=str(e)) +# --- Anthropic Messages API Endpoint --- +@app.post("/v1/messages") +async def anthropic_messages( + request: Request, + body: AnthropicMessagesRequest, + client: RotatingClient = Depends(get_rotating_client), + _=Depends(verify_anthropic_api_key), +): + """ + Anthropic-compatible Messages API endpoint. + + Accepts requests in Anthropic's format and returns responses in Anthropic's format. + Internally translates to OpenAI format for processing via LiteLLM. + + This endpoint is compatible with Claude Code and other Anthropic API clients. + """ + request_id = f"msg_{uuid.uuid4().hex[:24]}" + original_model = body.model + + # Initialize logger if enabled + logger = DetailedLogger() if ENABLE_REQUEST_LOGGING else None + + try: + # Convert Anthropic request to OpenAI format + anthropic_request = body.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + openai_tools = anthropic_to_openai_tools(anthropic_request.get("tools")) + openai_tool_choice = anthropic_to_openai_tool_choice( + anthropic_request.get("tool_choice") + ) + + # Build OpenAI-compatible request + openai_request = { + "model": body.model, + "messages": openai_messages, + "max_tokens": body.max_tokens, + "stream": body.stream or False, + } + + if body.temperature is not None: + openai_request["temperature"] = body.temperature + if body.top_p is not None: + openai_request["top_p"] = body.top_p + if body.stop_sequences: + openai_request["stop"] = body.stop_sequences + if openai_tools: + openai_request["tools"] = openai_tools + if openai_tool_choice: + openai_request["tool_choice"] = openai_tool_choice + + # Handle Anthropic thinking config -> reasoning_effort translation + if body.thinking: + if body.thinking.type == "enabled": + # Map budget_tokens to reasoning_effort level + # Default to "medium" if enabled but budget not specified + budget = body.thinking.budget_tokens or 10000 + if budget >= 32000: + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + elif budget >= 10000: + openai_request["reasoning_effort"] = "high" + elif budget >= 5000: + openai_request["reasoning_effort"] = "medium" + else: + openai_request["reasoning_effort"] = "low" + elif body.thinking.type == "disabled": + openai_request["reasoning_effort"] = "disable" + elif "opus" in body.model.lower(): + # Force high thinking for Opus models when no thinking config is provided + # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget + # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + + log_request_to_console( + url=str(request.url), + headers=dict(request.headers), + client_info=( + request.client.host if request.client else "unknown", + request.client.port if request.client else 0, + ), + request_data=openai_request, + ) + + if body.stream: + # Streaming response - acompletion returns a generator for streaming + response_generator = client.acompletion(request=request, **openai_request) + + return StreamingResponse( + anthropic_streaming_wrapper( + request, response_generator, original_model, request_id + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + else: + # Non-streaming response + response = await client.acompletion(request=request, **openai_request) + + # Convert OpenAI response to Anthropic format + openai_response = ( + response.model_dump() + if hasattr(response, "model_dump") + else dict(response) + ) + anthropic_response = openai_to_anthropic_response( + openai_response, original_model + ) + + # Override the ID with our request ID + anthropic_response["id"] = request_id + + if logger: + logger.log_final_response( + status_code=200, + headers=None, + body=anthropic_response, + ) + + return JSONResponse(content=anthropic_response) + + except ( + litellm.InvalidRequestError, + ValueError, + litellm.ContextWindowExceededError, + ) as e: + error_response = { + "type": "error", + "error": {"type": "invalid_request_error", "message": str(e)}, + } + raise HTTPException(status_code=400, detail=error_response) + except litellm.AuthenticationError as e: + error_response = { + "type": "error", + "error": {"type": "authentication_error", "message": str(e)}, + } + raise HTTPException(status_code=401, detail=error_response) + except litellm.RateLimitError as e: + error_response = { + "type": "error", + "error": {"type": "rate_limit_error", "message": str(e)}, + } + raise HTTPException(status_code=429, detail=error_response) + except (litellm.ServiceUnavailableError, litellm.APIConnectionError) as e: + error_response = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + raise HTTPException(status_code=503, detail=error_response) + except litellm.Timeout as e: + error_response = { + "type": "error", + "error": {"type": "api_error", "message": f"Request timed out: {str(e)}"}, + } + raise HTTPException(status_code=504, detail=error_response) + except Exception as e: + logging.error(f"Anthropic messages endpoint error: {e}") + if logger: + logger.log_final_response( + status_code=500, + headers=None, + body={"error": str(e)}, + ) + error_response = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + raise HTTPException(status_code=500, detail=error_response) + + +# --- Anthropic Count Tokens Endpoint --- +@app.post("/v1/messages/count_tokens") +async def anthropic_count_tokens( + request: Request, + body: AnthropicCountTokensRequest, + client: RotatingClient = Depends(get_rotating_client), + _=Depends(verify_anthropic_api_key), +): + """ + Anthropic-compatible count_tokens endpoint. + + Counts the number of tokens that would be used by a Messages API request. + This is useful for estimating costs and managing context windows. + + Accepts requests in Anthropic's format and returns token count in Anthropic's format. + """ + try: + # Convert Anthropic request to OpenAI format for token counting + anthropic_request = body.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + # Count tokens for messages + message_tokens = client.token_count( + model=body.model, + messages=openai_messages, + ) + + # Count tokens for tools if present + tool_tokens = 0 + if body.tools: + # Tools add tokens based on their definitions + # Convert to JSON string and count tokens for tool definitions + openai_tools = anthropic_to_openai_tools( + [tool.model_dump() for tool in body.tools] + ) + if openai_tools: + # Serialize tools to count their token contribution + tools_text = json.dumps(openai_tools) + tool_tokens = client.token_count( + model=body.model, + text=tools_text, + ) + + total_tokens = message_tokens + tool_tokens + + return JSONResponse(content={"input_tokens": total_tokens}) + + except ( + litellm.InvalidRequestError, + ValueError, + litellm.ContextWindowExceededError, + ) as e: + error_response = { + "type": "error", + "error": {"type": "invalid_request_error", "message": str(e)}, + } + raise HTTPException(status_code=400, detail=error_response) + except litellm.AuthenticationError as e: + error_response = { + "type": "error", + "error": {"type": "authentication_error", "message": str(e)}, + } + raise HTTPException(status_code=401, detail=error_response) + except Exception as e: + logging.error(f"Anthropic count_tokens endpoint error: {e}") + error_response = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + raise HTTPException(status_code=500, detail=error_response) + + @app.post("/v1/embeddings") async def embeddings( request: Request, diff --git a/src/rotator_library/__init__.py b/src/rotator_library/__init__.py index 7944443f..1955f57a 100644 --- a/src/rotator_library/__init__.py +++ b/src/rotator_library/__init__.py @@ -8,6 +8,12 @@ from .providers import PROVIDER_PLUGINS from .providers.provider_interface import ProviderInterface from .model_info_service import ModelInfoService, ModelInfo, ModelMetadata + from .anthropic_compat import ( + AnthropicMessagesRequest, + AnthropicMessagesResponse, + AnthropicCountTokensRequest, + AnthropicCountTokensResponse, + ) __all__ = [ "RotatingClient", @@ -15,11 +21,16 @@ "ModelInfoService", "ModelInfo", "ModelMetadata", + # Anthropic compatibility + "AnthropicMessagesRequest", + "AnthropicMessagesResponse", + "AnthropicCountTokensRequest", + "AnthropicCountTokensResponse", ] def __getattr__(name): - """Lazy-load PROVIDER_PLUGINS and ModelInfoService to speed up module import.""" + """Lazy-load PROVIDER_PLUGINS, ModelInfoService, and Anthropic compat to speed up module import.""" if name == "PROVIDER_PLUGINS": from .providers import PROVIDER_PLUGINS @@ -36,4 +47,21 @@ def __getattr__(name): from .model_info_service import ModelMetadata return ModelMetadata + # Anthropic compatibility models + if name == "AnthropicMessagesRequest": + from .anthropic_compat import AnthropicMessagesRequest + + return AnthropicMessagesRequest + if name == "AnthropicMessagesResponse": + from .anthropic_compat import AnthropicMessagesResponse + + return AnthropicMessagesResponse + if name == "AnthropicCountTokensRequest": + from .anthropic_compat import AnthropicCountTokensRequest + + return AnthropicCountTokensRequest + if name == "AnthropicCountTokensResponse": + from .anthropic_compat import AnthropicCountTokensResponse + + return AnthropicCountTokensResponse raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/rotator_library/anthropic_compat/__init__.py b/src/rotator_library/anthropic_compat/__init__.py new file mode 100644 index 00000000..8572ac79 --- /dev/null +++ b/src/rotator_library/anthropic_compat/__init__.py @@ -0,0 +1,67 @@ +""" +Anthropic API compatibility module for rotator_library. + +This module provides format translation between Anthropic's Messages API +and OpenAI's Chat Completions API, enabling any OpenAI-compatible provider +to work with Anthropic clients like Claude Code. + +Usage: + from rotator_library.anthropic_compat import ( + AnthropicMessagesRequest, + AnthropicMessagesResponse, + translate_anthropic_request, + openai_to_anthropic_response, + anthropic_streaming_wrapper, + ) +""" + +from .models import ( + AnthropicTextBlock, + AnthropicImageSource, + AnthropicImageBlock, + AnthropicToolUseBlock, + AnthropicToolResultBlock, + AnthropicMessage, + AnthropicTool, + AnthropicThinkingConfig, + AnthropicMessagesRequest, + AnthropicUsage, + AnthropicMessagesResponse, + AnthropicCountTokensRequest, + AnthropicCountTokensResponse, +) + +from .translator import ( + anthropic_to_openai_messages, + anthropic_to_openai_tools, + anthropic_to_openai_tool_choice, + openai_to_anthropic_response, + translate_anthropic_request, +) + +from .streaming import anthropic_streaming_wrapper + +__all__ = [ + # Models + "AnthropicTextBlock", + "AnthropicImageSource", + "AnthropicImageBlock", + "AnthropicToolUseBlock", + "AnthropicToolResultBlock", + "AnthropicMessage", + "AnthropicTool", + "AnthropicThinkingConfig", + "AnthropicMessagesRequest", + "AnthropicUsage", + "AnthropicMessagesResponse", + "AnthropicCountTokensRequest", + "AnthropicCountTokensResponse", + # Translator functions + "anthropic_to_openai_messages", + "anthropic_to_openai_tools", + "anthropic_to_openai_tool_choice", + "openai_to_anthropic_response", + "translate_anthropic_request", + # Streaming + "anthropic_streaming_wrapper", +] diff --git a/src/rotator_library/anthropic_compat/models.py b/src/rotator_library/anthropic_compat/models.py new file mode 100644 index 00000000..c579f2e2 --- /dev/null +++ b/src/rotator_library/anthropic_compat/models.py @@ -0,0 +1,144 @@ +""" +Pydantic models for the Anthropic Messages API. + +These models define the request and response formats for Anthropic's Messages API, +enabling compatibility with Claude Code and other Anthropic API clients. +""" + +from typing import Any, List, Optional, Union +from pydantic import BaseModel + + +# --- Content Blocks --- +class AnthropicTextBlock(BaseModel): + """Anthropic text content block.""" + + type: str = "text" + text: str + + +class AnthropicImageSource(BaseModel): + """Anthropic image source for base64 images.""" + + type: str = "base64" + media_type: str + data: str + + +class AnthropicImageBlock(BaseModel): + """Anthropic image content block.""" + + type: str = "image" + source: AnthropicImageSource + + +class AnthropicToolUseBlock(BaseModel): + """Anthropic tool use content block.""" + + type: str = "tool_use" + id: str + name: str + input: dict + + +class AnthropicToolResultBlock(BaseModel): + """Anthropic tool result content block.""" + + type: str = "tool_result" + tool_use_id: str + content: Union[str, List[Any]] + is_error: Optional[bool] = None + + +# --- Message and Tool Definitions --- +class AnthropicMessage(BaseModel): + """Anthropic message format.""" + + role: str + content: Union[ + str, + List[ + Union[ + AnthropicTextBlock, + AnthropicImageBlock, + AnthropicToolUseBlock, + AnthropicToolResultBlock, + dict, + ] + ], + ] + + +class AnthropicTool(BaseModel): + """Anthropic tool definition.""" + + name: str + description: Optional[str] = None + input_schema: dict + + +class AnthropicThinkingConfig(BaseModel): + """Anthropic thinking configuration.""" + + type: str # "enabled" or "disabled" + budget_tokens: Optional[int] = None + + +# --- Messages Request --- +class AnthropicMessagesRequest(BaseModel): + """Anthropic Messages API request format.""" + + model: str + messages: List[AnthropicMessage] + max_tokens: int + system: Optional[Union[str, List[dict]]] = None + temperature: Optional[float] = None + top_p: Optional[float] = None + top_k: Optional[int] = None + stop_sequences: Optional[List[str]] = None + stream: Optional[bool] = False + tools: Optional[List[AnthropicTool]] = None + tool_choice: Optional[dict] = None + metadata: Optional[dict] = None + thinking: Optional[AnthropicThinkingConfig] = None + + +# --- Messages Response --- +class AnthropicUsage(BaseModel): + """Anthropic usage statistics.""" + + input_tokens: int + output_tokens: int + cache_creation_input_tokens: Optional[int] = None + cache_read_input_tokens: Optional[int] = None + + +class AnthropicMessagesResponse(BaseModel): + """Anthropic Messages API response format.""" + + id: str + type: str = "message" + role: str = "assistant" + content: List[Union[AnthropicTextBlock, AnthropicToolUseBlock, dict]] + model: str + stop_reason: Optional[str] = None + stop_sequence: Optional[str] = None + usage: AnthropicUsage + + +# --- Count Tokens --- +class AnthropicCountTokensRequest(BaseModel): + """Anthropic count_tokens API request format.""" + + model: str + messages: List[AnthropicMessage] + system: Optional[Union[str, List[dict]]] = None + tools: Optional[List[AnthropicTool]] = None + tool_choice: Optional[dict] = None + thinking: Optional[AnthropicThinkingConfig] = None + + +class AnthropicCountTokensResponse(BaseModel): + """Anthropic count_tokens API response format.""" + + input_tokens: int diff --git a/src/rotator_library/anthropic_compat/streaming.py b/src/rotator_library/anthropic_compat/streaming.py new file mode 100644 index 00000000..5ceb7145 --- /dev/null +++ b/src/rotator_library/anthropic_compat/streaming.py @@ -0,0 +1,308 @@ +""" +Streaming wrapper for converting OpenAI streaming format to Anthropic streaming format. + +This module provides a framework-agnostic streaming wrapper that converts +OpenAI SSE (Server-Sent Events) format to Anthropic's streaming format. +""" + +import json +import logging +import uuid +from typing import AsyncGenerator, Callable, Optional, Awaitable + +logger = logging.getLogger("rotator_library.anthropic_compat") + + +async def anthropic_streaming_wrapper( + openai_stream: AsyncGenerator[str, None], + original_model: str, + request_id: Optional[str] = None, + is_disconnected: Optional[Callable[[], Awaitable[bool]]] = None, +) -> AsyncGenerator[str, None]: + """ + Convert OpenAI streaming format to Anthropic streaming format. + + This is a framework-agnostic wrapper that can be used with any async web framework. + Instead of taking a FastAPI Request object, it accepts an optional callback function + to check for client disconnection. + + Anthropic SSE events: + - message_start: Initial message metadata + - content_block_start: Start of a content block + - content_block_delta: Content chunk + - content_block_stop: End of a content block + - message_delta: Final message metadata (stop_reason, usage) + - message_stop: End of message + + Args: + openai_stream: AsyncGenerator yielding OpenAI SSE format strings + original_model: The model name to include in responses + request_id: Optional request ID (auto-generated if not provided) + is_disconnected: Optional async callback that returns True if client disconnected + + Yields: + SSE format strings in Anthropic's streaming format + """ + if request_id is None: + request_id = f"msg_{uuid.uuid4().hex[:24]}" + + message_started = False + content_block_started = False + thinking_block_started = False + current_block_index = 0 + tool_calls_by_index = {} # Track tool calls by their index + tool_block_indices = {} # Track which block index each tool call uses + input_tokens = 0 + output_tokens = 0 + + try: + async for chunk_str in openai_stream: + # Check for client disconnection if callback provided + if is_disconnected is not None and await is_disconnected(): + break + + if not chunk_str.strip() or not chunk_str.startswith("data:"): + continue + + data_content = chunk_str[len("data:") :].strip() + if data_content == "[DONE]": + # CRITICAL: Send message_start if we haven't yet (e.g., empty response) + # Claude Code and other clients require message_start before message_stop + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": input_tokens, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + message_started = True + + # Close any open thinking block + if thinking_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + + # Close any open text block + if content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + content_block_started = False + + # Close all open tool_use blocks + for tc_index in sorted(tool_block_indices.keys()): + block_idx = tool_block_indices[tc_index] + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {block_idx}}}\n\n' + + # Determine stop_reason based on whether we had tool calls + stop_reason = "tool_use" if tool_calls_by_index else "end_turn" + + # Send message_delta with final info + yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "{stop_reason}", "stop_sequence": null}}, "usage": {{"output_tokens": {output_tokens}}}}}\n\n' + + # Send message_stop + yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' + break + + try: + chunk = json.loads(data_content) + except json.JSONDecodeError: + continue + + # Extract usage if present + if "usage" in chunk and chunk["usage"]: + input_tokens = chunk["usage"].get("prompt_tokens", input_tokens) + output_tokens = chunk["usage"].get("completion_tokens", output_tokens) + + # Send message_start on first chunk + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": input_tokens, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + message_started = True + + choices = chunk.get("choices", []) + if not choices: + continue + + delta = choices[0].get("delta", {}) + + # Handle reasoning/thinking content (from OpenAI-style reasoning_content) + reasoning_content = delta.get("reasoning_content") + if reasoning_content: + if not thinking_block_started: + # Start a thinking content block + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "thinking", "thinking": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + thinking_block_started = True + + # Send thinking delta + block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "thinking_delta", "thinking": reasoning_content}, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + + # Handle text content + content = delta.get("content") + if content: + # If we were in a thinking block, close it first + if thinking_block_started and not content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + + if not content_block_started: + # Start a text content block + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "text", "text": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + content_block_started = True + + # Send content delta + block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "text_delta", "text": content}, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + + # Handle tool calls + tool_calls = delta.get("tool_calls", []) + for tc in tool_calls: + tc_index = tc.get("index", 0) + + if tc_index not in tool_calls_by_index: + # Close previous thinking block if open + if thinking_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + thinking_block_started = False + + # Close previous text block if open + if content_block_started: + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + current_block_index += 1 + content_block_started = False + + # Start new tool use block + tool_calls_by_index[tc_index] = { + "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"), + "name": tc.get("function", {}).get("name", ""), + "arguments": "", + } + # Track which block index this tool call uses + tool_block_indices[tc_index] = current_block_index + + block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": { + "type": "tool_use", + "id": tool_calls_by_index[tc_index]["id"], + "name": tool_calls_by_index[tc_index]["name"], + "input": {}, + }, + } + yield f"event: content_block_start\ndata: {json.dumps(block_start)}\n\n" + # Increment for the next block + current_block_index += 1 + + # Accumulate arguments + func = tc.get("function", {}) + if func.get("name"): + tool_calls_by_index[tc_index]["name"] = func["name"] + if func.get("arguments"): + tool_calls_by_index[tc_index]["arguments"] += func["arguments"] + + # Send partial JSON delta using the correct block index for this tool + block_delta = { + "type": "content_block_delta", + "index": tool_block_indices[tc_index], + "delta": { + "type": "input_json_delta", + "partial_json": func["arguments"], + }, + } + yield f"event: content_block_delta\ndata: {json.dumps(block_delta)}\n\n" + + # Note: We intentionally ignore finish_reason here. + # Block closing is handled when we receive [DONE] to avoid + # premature closes with providers that send finish_reason on each chunk. + + except Exception as e: + logger.error(f"Error in Anthropic streaming wrapper: {e}") + + # If we haven't sent message_start yet, send it now so the client can display the error + # Claude Code and other clients may ignore events that come before message_start + if not message_started: + message_start = { + "type": "message_start", + "message": { + "id": request_id, + "type": "message", + "role": "assistant", + "content": [], + "model": original_model, + "stop_reason": None, + "stop_sequence": None, + "usage": {"input_tokens": 0, "output_tokens": 0}, + }, + } + yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + + # Send the error as a text content block so it's visible to the user + error_message = f"Error: {str(e)}" + error_block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": {"type": "text", "text": ""}, + } + yield f"event: content_block_start\ndata: {json.dumps(error_block_start)}\n\n" + + error_block_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": {"type": "text_delta", "text": error_message}, + } + yield f"event: content_block_delta\ndata: {json.dumps(error_block_delta)}\n\n" + + yield f'event: content_block_stop\ndata: {{"type": "content_block_stop", "index": {current_block_index}}}\n\n' + + # Send message_delta and message_stop to properly close the stream + yield f'event: message_delta\ndata: {{"type": "message_delta", "delta": {{"stop_reason": "end_turn", "stop_sequence": null}}, "usage": {{"output_tokens": 0}}}}\n\n' + yield 'event: message_stop\ndata: {"type": "message_stop"}\n\n' + + # Also send the formal error event for clients that handle it + error_event = { + "type": "error", + "error": {"type": "api_error", "message": str(e)}, + } + yield f"event: error\ndata: {json.dumps(error_event)}\n\n" diff --git a/src/rotator_library/anthropic_compat/translator.py b/src/rotator_library/anthropic_compat/translator.py new file mode 100644 index 00000000..eca6f8da --- /dev/null +++ b/src/rotator_library/anthropic_compat/translator.py @@ -0,0 +1,364 @@ +""" +Format translation functions between Anthropic and OpenAI API formats. + +This module provides functions to convert requests and responses between +Anthropic's Messages API format and OpenAI's Chat Completions API format. +This enables any OpenAI-compatible provider to work with Anthropic clients. +""" + +import json +import time +import uuid +from typing import Any, Dict, List, Optional, Union + +from .models import AnthropicMessagesRequest + + +def anthropic_to_openai_messages( + anthropic_messages: List[dict], system: Optional[Union[str, List[dict]]] = None +) -> List[dict]: + """ + Convert Anthropic message format to OpenAI format. + + Key differences: + - Anthropic: system is a separate field, content can be string or list of blocks + - OpenAI: system is a message with role="system", content is usually string + + Args: + anthropic_messages: List of messages in Anthropic format + system: Optional system message (string or list of text blocks) + + Returns: + List of messages in OpenAI format + """ + openai_messages = [] + + # Handle system message + if system: + if isinstance(system, str): + openai_messages.append({"role": "system", "content": system}) + elif isinstance(system, list): + # System can be list of text blocks in Anthropic format + system_text = " ".join( + block.get("text", "") + for block in system + if isinstance(block, dict) and block.get("type") == "text" + ) + if system_text: + openai_messages.append({"role": "system", "content": system_text}) + + for msg in anthropic_messages: + role = msg.get("role", "user") + content = msg.get("content", "") + + if isinstance(content, str): + openai_messages.append({"role": role, "content": content}) + elif isinstance(content, list): + # Handle content blocks + openai_content = [] + tool_calls = [] + + for block in content: + if isinstance(block, dict): + block_type = block.get("type", "text") + + if block_type == "text": + openai_content.append( + {"type": "text", "text": block.get("text", "")} + ) + elif block_type == "image": + # Convert Anthropic image format to OpenAI + source = block.get("source", {}) + if source.get("type") == "base64": + openai_content.append( + { + "type": "image_url", + "image_url": { + "url": f"data:{source.get('media_type', 'image/png')};base64,{source.get('data', '')}" + }, + } + ) + elif source.get("type") == "url": + openai_content.append( + { + "type": "image_url", + "image_url": {"url": source.get("url", "")}, + } + ) + elif block_type == "tool_use": + # Anthropic tool_use -> OpenAI tool_calls + tool_calls.append( + { + "id": block.get("id", ""), + "type": "function", + "function": { + "name": block.get("name", ""), + "arguments": json.dumps(block.get("input", {})), + }, + } + ) + elif block_type == "tool_result": + # Tool results become separate messages in OpenAI format + tool_content = block.get("content", "") + if isinstance(tool_content, list): + tool_content = " ".join( + b.get("text", "") + for b in tool_content + if isinstance(b, dict) and b.get("type") == "text" + ) + openai_messages.append( + { + "role": "tool", + "tool_call_id": block.get("tool_use_id", ""), + "content": str(tool_content), + } + ) + continue # Don't add to current message + + # Build the message + if tool_calls: + # Assistant message with tool calls + msg_dict = {"role": role} + if openai_content: + # If there's text content alongside tool calls + text_parts = [ + c.get("text", "") + for c in openai_content + if c.get("type") == "text" + ] + msg_dict["content"] = " ".join(text_parts) if text_parts else None + else: + msg_dict["content"] = None + msg_dict["tool_calls"] = tool_calls + openai_messages.append(msg_dict) + elif openai_content: + # Check if it's just text or mixed content + if len(openai_content) == 1 and openai_content[0].get("type") == "text": + openai_messages.append( + {"role": role, "content": openai_content[0].get("text", "")} + ) + else: + openai_messages.append({"role": role, "content": openai_content}) + + return openai_messages + + +def anthropic_to_openai_tools( + anthropic_tools: Optional[List[dict]], +) -> Optional[List[dict]]: + """ + Convert Anthropic tool definitions to OpenAI format. + + Args: + anthropic_tools: List of tools in Anthropic format + + Returns: + List of tools in OpenAI format, or None if no tools provided + """ + if not anthropic_tools: + return None + + openai_tools = [] + for tool in anthropic_tools: + openai_tools.append( + { + "type": "function", + "function": { + "name": tool.get("name", ""), + "description": tool.get("description", ""), + "parameters": tool.get("input_schema", {}), + }, + } + ) + return openai_tools + + +def anthropic_to_openai_tool_choice( + anthropic_tool_choice: Optional[dict], +) -> Optional[Union[str, dict]]: + """ + Convert Anthropic tool_choice to OpenAI format. + + Args: + anthropic_tool_choice: Tool choice in Anthropic format + + Returns: + Tool choice in OpenAI format + """ + if not anthropic_tool_choice: + return None + + choice_type = anthropic_tool_choice.get("type", "auto") + + if choice_type == "auto": + return "auto" + elif choice_type == "any": + return "required" + elif choice_type == "tool": + return { + "type": "function", + "function": {"name": anthropic_tool_choice.get("name", "")}, + } + elif choice_type == "none": + return "none" + + return "auto" + + +def openai_to_anthropic_response(openai_response: dict, original_model: str) -> dict: + """ + Convert OpenAI chat completion response to Anthropic Messages format. + + Args: + openai_response: Response from OpenAI-compatible API + original_model: The model name requested by the client + + Returns: + Response in Anthropic Messages format + """ + choice = openai_response.get("choices", [{}])[0] + message = choice.get("message", {}) + usage = openai_response.get("usage", {}) + + # Build content blocks + content_blocks = [] + + # Add thinking content block if reasoning_content is present + reasoning_content = message.get("reasoning_content") + if reasoning_content: + content_blocks.append( + { + "type": "thinking", + "thinking": reasoning_content, + "signature": "", # Signature is typically empty for proxied responses + } + ) + + # Add text content if present + text_content = message.get("content") + if text_content: + content_blocks.append({"type": "text", "text": text_content}) + + # Add tool use blocks if present + tool_calls = message.get("tool_calls") or [] + for tc in tool_calls: + func = tc.get("function", {}) + try: + input_data = json.loads(func.get("arguments", "{}")) + except json.JSONDecodeError: + input_data = {} + + content_blocks.append( + { + "type": "tool_use", + "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"), + "name": func.get("name", ""), + "input": input_data, + } + ) + + # Map finish_reason to stop_reason + finish_reason = choice.get("finish_reason", "end_turn") + stop_reason_map = { + "stop": "end_turn", + "length": "max_tokens", + "tool_calls": "tool_use", + "content_filter": "end_turn", + "function_call": "tool_use", + } + stop_reason = stop_reason_map.get(finish_reason, "end_turn") + + # Build usage + anthropic_usage = { + "input_tokens": usage.get("prompt_tokens", 0), + "output_tokens": usage.get("completion_tokens", 0), + } + + # Add cache tokens if present + if usage.get("prompt_tokens_details"): + details = usage["prompt_tokens_details"] + if details.get("cached_tokens"): + anthropic_usage["cache_read_input_tokens"] = details["cached_tokens"] + + return { + "id": openai_response.get("id", f"msg_{uuid.uuid4().hex[:24]}"), + "type": "message", + "role": "assistant", + "content": content_blocks, + "model": original_model, + "stop_reason": stop_reason, + "stop_sequence": None, + "usage": anthropic_usage, + } + + +def translate_anthropic_request(request: AnthropicMessagesRequest) -> Dict[str, Any]: + """ + Translate a complete Anthropic Messages API request to OpenAI format. + + This is a high-level function that handles all aspects of request translation, + including messages, tools, tool_choice, and thinking configuration. + + Args: + request: An AnthropicMessagesRequest object + + Returns: + Dictionary containing the OpenAI-compatible request parameters + """ + anthropic_request = request.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + openai_tools = anthropic_to_openai_tools(anthropic_request.get("tools")) + openai_tool_choice = anthropic_to_openai_tool_choice( + anthropic_request.get("tool_choice") + ) + + # Build OpenAI-compatible request + openai_request = { + "model": request.model, + "messages": openai_messages, + "max_tokens": request.max_tokens, + "stream": request.stream or False, + } + + if request.temperature is not None: + openai_request["temperature"] = request.temperature + if request.top_p is not None: + openai_request["top_p"] = request.top_p + if request.top_k is not None: + openai_request["top_k"] = request.top_k + if request.stop_sequences: + openai_request["stop"] = request.stop_sequences + if openai_tools: + openai_request["tools"] = openai_tools + if openai_tool_choice: + openai_request["tool_choice"] = openai_tool_choice + + # Handle Anthropic thinking config -> reasoning_effort translation + if request.thinking: + if request.thinking.type == "enabled": + # Map budget_tokens to reasoning_effort level + # Default to "medium" if enabled but budget not specified + budget = request.thinking.budget_tokens or 10000 + if budget >= 32000: + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + elif budget >= 10000: + openai_request["reasoning_effort"] = "high" + elif budget >= 5000: + openai_request["reasoning_effort"] = "medium" + else: + openai_request["reasoning_effort"] = "low" + elif request.thinking.type == "disabled": + openai_request["reasoning_effort"] = "disable" + elif "opus" in request.model.lower(): + # Force high thinking for Opus models when no thinking config is provided + # Opus 4.5 always uses the -thinking variant, so we want maximum thinking budget + # Without this, the backend defaults to thinkingBudget: -1 (auto) instead of high + openai_request["reasoning_effort"] = "high" + openai_request["custom_reasoning_budget"] = True + + return openai_request diff --git a/src/rotator_library/client.py b/src/rotator_library/client.py index c115f883..f2d458fe 100644 --- a/src/rotator_library/client.py +++ b/src/rotator_library/client.py @@ -2465,6 +2465,140 @@ def aembedding( **kwargs, ) + async def anthropic_messages( + self, + request: "AnthropicMessagesRequest", + raw_request: Optional[Any] = None, + pre_request_callback: Optional[callable] = None, + ) -> Any: + """ + Handle Anthropic Messages API requests. + + This method accepts requests in Anthropic's format, translates them to + OpenAI format internally, processes them through the existing acompletion + method, and returns responses in Anthropic's format. + + Args: + request: An AnthropicMessagesRequest object + raw_request: Optional raw request object for disconnect checks + pre_request_callback: Optional async callback before each API request + + Returns: + For non-streaming: AnthropicMessagesResponse dict + For streaming: AsyncGenerator yielding Anthropic SSE format strings + """ + from .anthropic_compat import ( + translate_anthropic_request, + openai_to_anthropic_response, + anthropic_streaming_wrapper, + ) + import uuid + + request_id = f"msg_{uuid.uuid4().hex[:24]}" + original_model = request.model + + # Translate Anthropic request to OpenAI format + openai_request = translate_anthropic_request(request) + + if request.stream: + # Streaming response + response_generator = self.acompletion( + request=raw_request, + pre_request_callback=pre_request_callback, + **openai_request, + ) + + # Create disconnect checker if raw_request provided + is_disconnected = None + if raw_request is not None and hasattr(raw_request, "is_disconnected"): + is_disconnected = raw_request.is_disconnected + + # Return the streaming wrapper + return anthropic_streaming_wrapper( + openai_stream=response_generator, + original_model=original_model, + request_id=request_id, + is_disconnected=is_disconnected, + ) + else: + # Non-streaming response + response = await self.acompletion( + request=raw_request, + pre_request_callback=pre_request_callback, + **openai_request, + ) + + # Convert OpenAI response to Anthropic format + if hasattr(response, "model_dump"): + openai_response = response.model_dump() + elif hasattr(response, "dict"): + openai_response = response.dict() + else: + openai_response = dict(response) if response else {} + + anthropic_response = openai_to_anthropic_response( + openai_response, original_model + ) + + # Override the ID with our request ID + anthropic_response["id"] = request_id + + return anthropic_response + + async def anthropic_count_tokens( + self, + request: "AnthropicCountTokensRequest", + ) -> dict: + """ + Handle Anthropic count_tokens API requests. + + Counts the number of tokens that would be used by a Messages API request. + This is useful for estimating costs and managing context windows. + + Args: + request: An AnthropicCountTokensRequest object + + Returns: + Dict with input_tokens count in Anthropic format + """ + from .anthropic_compat import ( + anthropic_to_openai_messages, + anthropic_to_openai_tools, + ) + import json + + anthropic_request = request.model_dump(exclude_none=True) + + openai_messages = anthropic_to_openai_messages( + anthropic_request.get("messages", []), anthropic_request.get("system") + ) + + # Count tokens for messages + message_tokens = self.token_count( + model=request.model, + messages=openai_messages, + ) + + # Count tokens for tools if present + tool_tokens = 0 + if request.tools: + # Tools add tokens based on their definitions + # Convert to JSON string and count tokens for tool definitions + openai_tools = anthropic_to_openai_tools( + [tool.model_dump() for tool in request.tools] + ) + if openai_tools: + # Serialize tools to count their token contribution + tools_text = json.dumps(openai_tools) + tool_tokens = self.token_count( + model=request.model, + text=tools_text, + ) + + total_tokens = message_tokens + tool_tokens + + return {"input_tokens": total_tokens} + def token_count(self, **kwargs) -> int: """Calculates the number of tokens for a given text or list of messages.""" kwargs = self._convert_model_params(**kwargs) diff --git a/src/rotator_library/providers/antigravity_provider.py b/src/rotator_library/providers/antigravity_provider.py index 02f36a1a..733245bb 100644 --- a/src/rotator_library/providers/antigravity_provider.py +++ b/src/rotator_library/providers/antigravity_provider.py @@ -2655,6 +2655,28 @@ def _transform_to_antigravity_format( gen_config["maxOutputTokens"] = DEFAULT_MAX_OUTPUT_TOKENS # For non-Claude models without explicit max_tokens, don't set it + # CRITICAL: For Claude with extended thinking, max_tokens MUST be > thinking.budget_tokens + # Per Claude docs: https://docs.claude.com/en/docs/build-with-claude/extended-thinking + # If this constraint is violated, the API returns 400 INVALID_ARGUMENT + thinking_config = gen_config.get("thinkingConfig", {}) + thinking_budget = thinking_config.get("thinkingBudget", 0) + current_max_tokens = gen_config.get("maxOutputTokens") + + if ( + is_claude + and thinking_budget + and thinking_budget > 0 + and current_max_tokens is not None + ): + # Ensure max_tokens > thinkingBudget (add buffer for actual response content) + min_required_tokens = thinking_budget + 1024 # 1024 buffer for response + if current_max_tokens <= thinking_budget: + lib_logger.warning( + f"max_tokens ({current_max_tokens}) must be > thinkingBudget ({thinking_budget}). " + f"Adjusting to {min_required_tokens}" + ) + gen_config["maxOutputTokens"] = min_required_tokens + antigravity_payload["request"]["generationConfig"] = gen_config # Set toolConfig based on tool_choice parameter