Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,39 @@ SMALL_MODEL="gpt-4o-mini"
# Optional: Server settings
HOST="0.0.0.0"
PORT="8082"
LOG_LEVEL="INFO"
LOG_LEVEL="INFO"
# DEBUG, INFO, WARNING, ERROR, CRITICAL

# Optional: Performance settings
# Logging settings
LOG_FILE_PATH="logs/proxy.log"
# Path to log file (default: logs/proxy.log)
LOG_FILE_MAX_BYTES="10485760"
# Maximum size of each log file in bytes (default: 10MB)
LOG_FILE_BACKUP_COUNT="5"
# Number of backup log files to keep (default: 5)
LOG_TO_CONSOLE="true"
# Whether to also output logs to console (default: true)

# Optional: Performance settings
MAX_TOKENS_LIMIT="4096"
# Minimum tokens limit for requests (to avoid errors with thinking model)
MIN_TOKENS_LIMIT="4096"

# Timeout settings (in seconds)
# Legacy timeout setting for backward compatibility (not used if fine-grained settings are provided)
REQUEST_TIMEOUT="90"

# Fine-grained timeout settings (recommended for better control)
# Connect timeout: Time to establish connection (fail fast on network issues)
CONNECT_TIMEOUT="10"
# Read timeout: Time to wait for response data (longer for streaming)
# Set to 600 (10 minutes) for long Claude streaming responses
READ_TIMEOUT="600"
# Write timeout: Time to send request data
WRITE_TIMEOUT="10"
# Pool timeout: Time to acquire connection from pool
POOL_TIMEOUT="10"

MAX_RETRIES="2"

# Examples for other providers:
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,7 @@ poetry.toml
pyrightconfig.json

# End of https://www.toptal.com/developers/gitignore/api/python
n

# Application logs
logs/
*.log
27 changes: 22 additions & 5 deletions src/api/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
config.request_timeout,
api_version=config.azure_api_version,
custom_headers=custom_headers,
connect_timeout=config.connect_timeout,
read_timeout=config.read_timeout,
write_timeout=config.write_timeout,
pool_timeout=config.pool_timeout,
)

async def validate_api_key(x_api_key: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
Expand All @@ -52,13 +56,14 @@ async def validate_api_key(x_api_key: Optional[str] = Header(None), authorizatio

@router.post("/v1/messages")
async def create_message(request: ClaudeMessagesRequest, http_request: Request, _: None = Depends(validate_api_key)):
request_id = str(uuid.uuid4())
try:
logger.debug(
f"Processing Claude request: model={request.model}, stream={request.stream}"
logger.info(
f"[{request_id}] Processing Claude request: model={request.model}, stream={request.stream}, "
f"messages_count={len(request.messages)}, max_tokens={request.max_tokens}"
)

# Generate unique request ID for cancellation tracking
request_id = str(uuid.uuid4())

# Convert Claude request to OpenAI format
openai_request = convert_claude_to_openai(request, model_manager)
Expand Down Expand Up @@ -92,7 +97,8 @@ async def create_message(request: ClaudeMessagesRequest, http_request: Request,
)
except HTTPException as e:
# Convert to proper error response for streaming
logger.error(f"Streaming error: {e.detail}")
logger.error(f"[{request_id}] Streaming error (status {e.status_code}): {e.detail}")
logger.error(f"[{request_id}] Request - model={request.model}, stream=True, messages_count={len(request.messages)}")
import traceback

logger.error(traceback.format_exc())
Expand All @@ -116,7 +122,10 @@ async def create_message(request: ClaudeMessagesRequest, http_request: Request,
except Exception as e:
import traceback

logger.error(f"Unexpected error processing request: {e}")
logger.error(f"[{request_id}] Unexpected error processing request: {e}")
logger.error(f"[{request_id}] Error type: {type(e).__name__}")
logger.error(f"[{request_id}] Request - model={request.model}, stream={request.stream}, messages_count={len(request.messages)}")
logger.error(f"[{request_id}] Original Claude request model: {request.model}")
logger.error(traceback.format_exc())
error_message = openai_client.classify_openai_error(str(e))
raise HTTPException(status_code=500, detail=error_message)
Expand Down Expand Up @@ -156,7 +165,11 @@ async def count_tokens(request: ClaudeTokenCountRequest, _: None = Depends(valid
return {"input_tokens": estimated_tokens}

except Exception as e:
import traceback
logger.error(f"Error counting tokens: {e}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Request model: {request.model}, messages_count: {len(request.messages)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))


Expand Down Expand Up @@ -194,7 +207,11 @@ async def test_connection():
}

except Exception as e:
import traceback
logger.error(f"API connectivity test failed: {e}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Test model: {config.small_model}")
logger.error(traceback.format_exc())
return JSONResponse(
status_code=503,
content={
Expand Down
18 changes: 15 additions & 3 deletions src/conversion/response_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ def convert_openai_to_claude_response(
openai_response: dict, original_request: ClaudeMessagesRequest
) -> dict:
"""Convert OpenAI response to Claude format."""
from src.core.logging import logger
import traceback

# Extract response data
choices = openai_response.get("choices", [])
if not choices:
logger.error(f"No choices in OpenAI response")
logger.error(f"OpenAI response: {openai_response}")
logger.error(f"Original request model: {original_request.model}")
raise HTTPException(status_code=500, detail="No choices in OpenAI response")

choice = choices[0]
Expand Down Expand Up @@ -190,7 +195,9 @@ async def convert_openai_streaming_to_claude(

except Exception as e:
# Handle any streaming errors gracefully
logger.error(f"Streaming error: {e}")
logger.error(f"[Converter] Streaming error: {e}")
logger.error(f"[Converter] Error type: {type(e).__name__}")
logger.error(f"[Converter] Original request model: {original_request.model}")
import traceback

logger.error(traceback.format_exc())
Expand Down Expand Up @@ -349,7 +356,8 @@ async def convert_openai_streaming_to_claude_with_cancellation(
except HTTPException as e:
# Handle cancellation
if e.status_code == 499:
logger.info(f"Request {request_id} was cancelled")
logger.info(f"[{request_id}] Request was cancelled")
logger.info(f"[{request_id}] Request model: {original_request.model}")
error_event = {
"type": "error",
"error": {
Expand All @@ -360,10 +368,14 @@ async def convert_openai_streaming_to_claude_with_cancellation(
yield f"event: error\ndata: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return
else:
logger.error(f"[{request_id}] HTTPException in streaming (status {e.status_code}): {e.detail}")
logger.error(f"[{request_id}] Request model: {original_request.model}")
raise
except Exception as e:
# Handle any streaming errors gracefully
logger.error(f"Streaming error: {e}")
logger.error(f"[Converter] Streaming error: {e}")
logger.error(f"[Converter] Error type: {type(e).__name__}")
logger.error(f"[Converter] Original request model: {original_request.model}")
import traceback

logger.error(traceback.format_exc())
Expand Down
81 changes: 73 additions & 8 deletions src/core/client.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,69 @@
import asyncio
import json
import httpx
import traceback
from fastapi import HTTPException
from typing import Optional, AsyncGenerator, Dict, Any
from openai import AsyncOpenAI, AsyncAzureOpenAI
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from openai._exceptions import APIError, RateLimitError, AuthenticationError, BadRequestError
from src.core.logging import logger

class OpenAIClient:
"""Async OpenAI client with cancellation support."""

def __init__(self, api_key: str, base_url: str, timeout: int = 90, api_version: Optional[str] = None, custom_headers: Optional[Dict[str, str]] = None):
def __init__(
self,
api_key: str,
base_url: str,
timeout: int = 90,
api_version: Optional[str] = None,
custom_headers: Optional[Dict[str, str]] = None,
connect_timeout: int = 10,
read_timeout: int = 600,
write_timeout: int = 10,
pool_timeout: int = 10
):
self.api_key = api_key
self.base_url = base_url
self.custom_headers = custom_headers or {}

# Prepare default headers
default_headers = {
"Content-Type": "application/json",
"User-Agent": "claude-proxy/1.0.0"
}

# Merge custom headers with default headers
all_headers = {**default_headers, **self.custom_headers}


# Create fine-grained timeout configuration
# This allows us to:
# - Quickly detect connection failures (connect_timeout)
# - Allow long streaming responses (read_timeout)
# - Prevent slow request uploads (write_timeout)
# - Avoid pool starvation (pool_timeout)
timeout_config = httpx.Timeout(
connect=connect_timeout,
read=read_timeout,
write=write_timeout,
pool=pool_timeout
)

# Detect if using Azure and instantiate the appropriate client
if api_version:
self.client = AsyncAzureOpenAI(
api_key=api_key,
azure_endpoint=base_url,
api_version=api_version,
timeout=timeout,
timeout=timeout_config,
default_headers=all_headers
)
else:
self.client = AsyncOpenAI(
api_key=api_key,
base_url=base_url,
timeout=timeout,
timeout=timeout_config,
default_headers=all_headers
)
self.active_requests: Dict[str, asyncio.Event] = {}
Expand Down Expand Up @@ -82,17 +109,36 @@ async def create_chat_completion(self, request: Dict[str, Any], request_id: Opti

# Convert to dict format that matches the original interface
return completion.model_dump()

except AuthenticationError as e:
logger.error(f"Authentication error for request {request_id}: {str(e)}")
logger.error(f"Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.debug(f"Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
raise HTTPException(status_code=401, detail=self.classify_openai_error(str(e)))
except RateLimitError as e:
logger.error(f"Rate limit error for request {request_id}: {str(e)}")
logger.error(f"Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.debug(f"Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
raise HTTPException(status_code=429, detail=self.classify_openai_error(str(e)))
except BadRequestError as e:
logger.error(f"Bad request error for request {request_id}: {str(e)}")
logger.error(f"Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.error(f"Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=400, detail=self.classify_openai_error(str(e)))
except APIError as e:
status_code = getattr(e, 'status_code', 500)
logger.error(f"API error (status {status_code}) for request {request_id}: {str(e)}")
logger.error(f"Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.error(f"Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=status_code, detail=self.classify_openai_error(str(e)))
except Exception as e:
logger.error(f"Unexpected error for request {request_id}: {str(e)}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.error(f"Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")

finally:
Expand Down Expand Up @@ -131,17 +177,36 @@ async def create_chat_completion_stream(self, request: Dict[str, Any], request_i

# Signal end of stream
yield "data: [DONE]"

except AuthenticationError as e:
logger.error(f"[Stream] Authentication error for request {request_id}: {str(e)}")
logger.error(f"[Stream] Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.debug(f"[Stream] Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
raise HTTPException(status_code=401, detail=self.classify_openai_error(str(e)))
except RateLimitError as e:
logger.error(f"[Stream] Rate limit error for request {request_id}: {str(e)}")
logger.error(f"[Stream] Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.debug(f"[Stream] Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
raise HTTPException(status_code=429, detail=self.classify_openai_error(str(e)))
except BadRequestError as e:
logger.error(f"[Stream] Bad request error for request {request_id}: {str(e)}")
logger.error(f"[Stream] Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.error(f"[Stream] Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=400, detail=self.classify_openai_error(str(e)))
except APIError as e:
status_code = getattr(e, 'status_code', 500)
logger.error(f"[Stream] API error (status {status_code}) for request {request_id}: {str(e)}")
logger.error(f"[Stream] Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.error(f"[Stream] Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=status_code, detail=self.classify_openai_error(str(e)))
except Exception as e:
logger.error(f"[Stream] Unexpected error for request {request_id}: {str(e)}")
logger.error(f"[Stream] Error type: {type(e).__name__}")
logger.error(f"[Stream] Request details - Model: {request.get('model')}, Messages count: {len(request.get('messages', []))}")
logger.error(f"[Stream] Full request: {json.dumps(request, ensure_ascii=False, default=str)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")

finally:
Expand Down
21 changes: 20 additions & 1 deletion src/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,30 @@ def __init__(self):
self.host = os.environ.get("HOST", "0.0.0.0")
self.port = int(os.environ.get("PORT", "8082"))
self.log_level = os.environ.get("LOG_LEVEL", "INFO")

# Logging settings
self.log_file_path = os.environ.get("LOG_FILE_PATH", "logs/proxy.log")
self.log_file_max_bytes = int(os.environ.get("LOG_FILE_MAX_BYTES", str(10 * 1024 * 1024))) # 10MB default
self.log_file_backup_count = int(os.environ.get("LOG_FILE_BACKUP_COUNT", "5")) # Keep 5 backup files
self.log_to_console = os.environ.get("LOG_TO_CONSOLE", "true").lower() in ("true", "1", "yes")

self.max_tokens_limit = int(os.environ.get("MAX_TOKENS_LIMIT", "4096"))
self.min_tokens_limit = int(os.environ.get("MIN_TOKENS_LIMIT", "100"))

# Connection settings
# Connection settings - Fine-grained timeout configuration
# Legacy timeout for backward compatibility
self.request_timeout = int(os.environ.get("REQUEST_TIMEOUT", "90"))

# New fine-grained timeout settings
# connect: timeout for establishing a connection (short)
# read: timeout for reading response data (long for streaming)
# write: timeout for sending request data (short)
# pool: timeout for acquiring a connection from the pool (short)
self.connect_timeout = int(os.environ.get("CONNECT_TIMEOUT", "10"))
self.read_timeout = int(os.environ.get("READ_TIMEOUT", "600")) # 10 minutes for long streaming responses
self.write_timeout = int(os.environ.get("WRITE_TIMEOUT", "10"))
self.pool_timeout = int(os.environ.get("POOL_TIMEOUT", "10"))

self.max_retries = int(os.environ.get("MAX_RETRIES", "2"))

# Model settings - BIG and SMALL models
Expand Down
Loading