Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions litellm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,16 +955,6 @@ def responses_api_bridge_check(
model_info["mode"] = "responses"
model = model.replace("responses/", "")

# OpenAI gpt-5.4+ chat-completions calls with both tools + reasoning_effort
# must be bridged to Responses API.
if (
custom_llm_provider == "openai"
and OpenAIGPT5Config.is_model_gpt_5_4_plus_model(model)
and tools
and reasoning_effort is not None
):
model_info["mode"] = "responses"
model = model.replace("responses/", "")
except Exception as e:
verbose_logger.debug("Error getting model info: {}".format(e))

Expand All @@ -974,6 +964,19 @@ def responses_api_bridge_check(
model = model.replace("responses/", "")
mode = "responses"
model_info["mode"] = mode

# OpenAI/Azure gpt-5.4+ chat-completions calls with both tools + reasoning_effort
# must be bridged to Responses API.
if (
custom_llm_provider in ("openai", "azure")
and OpenAIGPT5Config.is_model_gpt_5_4_plus_model(model)
and tools
and reasoning_effort is not None
and model_info.get("mode") != "responses"
):
model_info["mode"] = "responses"
model = model.replace("responses/", "")

return model_info, model


Expand Down Expand Up @@ -6718,6 +6721,10 @@ def speech( # noqa: PLR0915
kwargs=kwargs,
)

stream_format = kwargs.get("stream_format")
if stream_format is not None:
optional_params["stream_format"] = stream_format
Comment on lines +6724 to +6726
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No automated test for the core stream_format fix

The PR description mentions manual verification only. There are no automated tests that assert stream_format reaches optional_params or that the proxy returns Content-Type: text/event-stream for SSE requests. A unit/integration test would prevent this from regressing silently.

Rule Used: What: Ensure that any PR claiming to fix an issue ... (source)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, though the TTS codepath needs real provider credentials to test end-to-end. happy to add a unit test for the param forwarding if a mock pattern exists in the test suite.


logging_obj: LiteLLMLoggingObj = cast(
LiteLLMLoggingObj, kwargs.get("litellm_logging_obj")
)
Expand Down Expand Up @@ -7533,9 +7540,7 @@ def stream_chunk_builder( # noqa: PLR0915
# the final chunk.
all_annotations: list = []
for ac in annotation_chunks:
all_annotations.extend(
ac["choices"][0]["delta"]["annotations"]
)
all_annotations.extend(ac["choices"][0]["delta"]["annotations"])
response["choices"][0]["message"]["annotations"] = all_annotations

audio_chunks = [
Expand Down
219 changes: 215 additions & 4 deletions litellm/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ def generate_feedback_box():
AnthropicResponseUsageBlock,
)
from litellm.types.llms.openai import HttpxBinaryResponseContent
from litellm.types.proxy.control_plane_endpoints import WorkerRegistryEntry
from litellm.types.proxy.management_endpoints.model_management_endpoints import (
ModelGroupInfoProxy,
)
Expand Down Expand Up @@ -1546,6 +1547,7 @@ async def root_redirect():
# Sentinel: prevents PKCE-no-Redis advisory from re-logging on config hot-reload.
# Tests that need to reset it can patch 'litellm.proxy.proxy_server._pkce_no_redis_warning_emitted'.
_pkce_no_redis_warning_emitted: bool = False
_cp_no_redis_warning_emitted: bool = False
user_custom_sso = None
user_custom_ui_sso_sign_in_handler = None
use_background_health_checks = None
Expand Down Expand Up @@ -2295,6 +2297,7 @@ def __init__(self) -> None:
self.config: Dict[str, Any] = {}
self._last_semantic_filter_config: Optional[Dict[str, Any]] = None
self._last_hashicorp_vault_config: Optional[Dict[str, Any]] = None
self.worker_registry: List["WorkerRegistryEntry"] = []

def is_yaml(self, config_file_path: str) -> bool:
if not os.path.isfile(config_file_path):
Expand Down Expand Up @@ -2958,6 +2961,29 @@ async def load_config( # noqa: PLR0915
print( # noqa
f"{blue_color_code} Initialized Failure Callbacks - {litellm.failure_callback} {reset_color_code}"
) # noqa
elif key == "audit_log_callbacks":
litellm.audit_log_callbacks = []

for callback in value:
if "." in callback:
litellm.audit_log_callbacks.append(
get_instance_fn(value=callback)
)
else:
litellm.audit_log_callbacks.append(callback)

_store_audit_logs = litellm_settings.get(
"store_audit_logs", litellm.store_audit_logs
)
if _store_audit_logs:
print( # noqa
f"{blue_color_code} Initialized Audit Log Callbacks - {litellm.audit_log_callbacks} {reset_color_code}"
) # noqa
else:
verbose_proxy_logger.warning(
"'audit_log_callbacks' is configured but 'store_audit_logs' is not enabled. "
"Audit log callbacks will not fire until 'store_audit_logs: true' is added to litellm_settings."
)
elif key == "cache_params":
# this is set in the cache branch
# see usage here: https://docs.litellm.ai/docs/proxy/caching
Expand Down Expand Up @@ -3095,6 +3121,21 @@ async def load_config( # noqa: PLR0915
"Set PKCE_STRICT_CACHE_MISS=true to fail fast with a 401 on cache misses "
"instead of continuing without a code_verifier."
)

### CONTROL PLANE CODE-EXCHANGE PREREQUISITE CHECK ###
cp_url = general_settings.get("control_plane_url")
if cp_url and redis_usage_cache is None:
global _cp_no_redis_warning_emitted
if not _cp_no_redis_warning_emitted:
_cp_no_redis_warning_emitted = True
verbose_proxy_logger.warning(
"control_plane_url is configured but Redis is not configured for LiteLLM caching. "
"Login codes (SSO and /v3/login) will not be shared across instances — "
"the /v3/login/exchange call may land on a different pod and fail with 401. "
"Configure Redis via the 'cache' section in your proxy config, "
"or ensure sticky sessions for single-instance deployments."
)

### STORE MODEL IN DB ### feature flag for `/model/new`
store_model_in_db = general_settings.get("store_model_in_db", False)
if store_model_in_db is None:
Expand Down Expand Up @@ -3303,7 +3344,7 @@ async def load_config( # noqa: PLR0915
async_only_mode=True # only init async clients
),
ignore_invalid_deployments=True, # don't raise an error if a deployment is invalid
) # type:ignore
) # type: ignore

if redis_usage_cache is not None and router.cache.redis_cache is None:
router._update_redis_cache(cache=redis_usage_cache)
Expand Down Expand Up @@ -3385,7 +3426,15 @@ async def _init_non_llm_configs(self, config: dict):
litellm.vector_store_registry.load_vector_stores_from_config(
vector_store_registry_config
)
pass

## WORKER REGISTRY (Control Plane)
worker_registry_config = config.get("worker_registry", None)
if worker_registry_config:
self.worker_registry = [
WorkerRegistryEntry(**e) for e in worker_registry_config
]
else:
self.worker_registry = []

async def _init_policy_engine(
self,
Expand Down Expand Up @@ -5357,8 +5406,8 @@ async def initialize( # noqa: PLR0915
litellm.add_function_to_prompt = True
dynamic_config["general"]["add_function_to_prompt"] = True
if max_budget: # litellm-specific param
litellm.max_budget = max_budget
dynamic_config["general"]["max_budget"] = max_budget
litellm.max_budget = float(max_budget)
dynamic_config["general"]["max_budget"] = litellm.max_budget
if experimental:
pass
user_telemetry = telemetry
Expand Down Expand Up @@ -7446,6 +7495,9 @@ async def audio_speech(
"audio/wav" # Gemini TTS returns WAV format after conversion
)

if data.get("stream_format") == "sse":
media_type = "text/event-stream"
Comment on lines +7498 to +7499
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 SSE check unconditionally overrides Gemini audio/wav media type

The stream_format == "sse" block runs after the Gemini-specific audio/wav assignment, so a Gemini TTS request that also carries stream_format=sse would return Content-Type: text/event-stream instead of audio/wav. Gemini TTS doesn't currently support SSE, so this will likely mislead any client that inspects the content type.

Consider guarding the SSE override to non-Gemini providers:

Suggested change
if data.get("stream_format") == "sse":
media_type = "text/event-stream"
if data.get("stream_format") == "sse" and "gemini" not in (data.get("model") or "").lower():
media_type = "text/event-stream"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional — if the user explicitly passes stream_format=sse, they want SSE content type. Gemini TTS doesn't support SSE, so that param shouldn't be set for Gemini requests in the first place. if a guard is needed, it belongs in the caller, not here.


return StreamingResponse(
_audio_speech_chunk_generator(response), # type: ignore[arg-type]
media_type=media_type,
Expand Down Expand Up @@ -11095,6 +11147,165 @@ async def login_v2(request: Request): # noqa: PLR0915
)


@router.post(
"/v3/login", include_in_schema=False
) # control-plane login — always returns token in body for cross-origin use
async def login_v3(request: Request): # noqa: PLR0915
global premium_user, general_settings, master_key
from litellm.proxy.auth.login_utils import authenticate_user, create_ui_token_object
from litellm.proxy.utils import get_custom_url

try:
if not general_settings.get("control_plane_url"):
raise ProxyException(
message="/v3/login is only available on workers with control_plane_url configured",
type=ProxyErrorTypes.not_found_error,
param="control_plane_url",
code=status.HTTP_404_NOT_FOUND,
)

body = await request.json()
username = str(body.get("username"))
password = str(body.get("password"))
Comment on lines +11167 to +11169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 str(None) produces "None" when credentials are absent

If username or password is not present in the request body, body.get("username") returns None, and str(None) produces the literal string "None". This value is then passed to authenticate_user, which attempts to match against "None" rather than returning a clear 400 error for missing credentials. A guard clause that validates both fields are non-null strings before conversion would prevent this edge case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-existing issue, not part of this change.


login_result = await authenticate_user(
username=username,
password=password,
master_key=master_key,
prisma_client=prisma_client,
)

returned_ui_token_object = create_ui_token_object(
login_result=login_result,
general_settings=general_settings,
premium_user=premium_user,
)

import jwt

jwt_token = jwt.encode(
cast(dict, returned_ui_token_object),
cast(str, master_key),
algorithm="HS256",
)

litellm_dashboard_ui = get_custom_url(str(request.base_url))
if litellm_dashboard_ui.endswith("/"):
litellm_dashboard_ui += "ui/"
else:
litellm_dashboard_ui += "/ui/"
litellm_dashboard_ui += "?login=success"

# Store JWT behind a single-use opaque code (60s TTL)
code = secrets.token_urlsafe(32)
cache_key = f"login_code:{code}"
cache_value = {"token": jwt_token, "redirect_url": litellm_dashboard_ui}
if redis_usage_cache is not None:
await redis_usage_cache.async_set_cache(
key=cache_key, value=cache_value, ttl=60
)
else:
await user_api_key_cache.async_set_cache(
key=cache_key, value=cache_value, ttl=60
)

return JSONResponse(
content={"code": code, "expires_in": 60},
status_code=status.HTTP_200_OK,
)
except Exception as e:
verbose_proxy_logger.exception(
"litellm.proxy.proxy_server.login_v3(): Exception occurred - {}".format(
str(e)
)
)
if isinstance(e, ProxyException):
raise e
elif isinstance(e, HTTPException):
raise ProxyException(
message=getattr(e, "detail", str(e)),
type=ProxyErrorTypes.auth_error,
param=getattr(e, "param", "None"),
code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR),
)
else:
error_msg = f"{str(e)}"
raise ProxyException(
message=error_msg,
type=ProxyErrorTypes.auth_error,
param="None",
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)


@router.post(
"/v3/login/exchange", include_in_schema=False
) # exchange single-use opaque code for JWT
async def login_v3_exchange(request: Request):
try:
if not general_settings.get("control_plane_url"):
raise ProxyException(
message="/v3/login/exchange is only available on workers with control_plane_url configured",
type=ProxyErrorTypes.not_found_error,
param="control_plane_url",
code=status.HTTP_404_NOT_FOUND,
)

body = await request.json()
code = body.get("code")
if not code:
raise ProxyException(
message="Missing 'code' parameter",
type=ProxyErrorTypes.auth_error,
param="code",
code=status.HTTP_400_BAD_REQUEST,
)

cache_key = f"login_code:{code}"
if redis_usage_cache is not None:
cached_data = await redis_usage_cache.async_get_cache(key=cache_key)
else:
cached_data = await user_api_key_cache.async_get_cache(key=cache_key)

if not cached_data or not isinstance(cached_data, dict):
raise ProxyException(
message="Invalid or expired login code",
type=ProxyErrorTypes.auth_error,
param="code",
code=status.HTTP_401_UNAUTHORIZED,
)

# Single-use: delete immediately
if redis_usage_cache is not None:
await redis_usage_cache.async_delete_cache(key=cache_key)
else:
await user_api_key_cache.async_delete_cache(key=cache_key)

json_response = JSONResponse(
content={
"token": cached_data["token"],
"redirect_url": cached_data["redirect_url"],
},
status_code=status.HTTP_200_OK,
)
json_response.set_cookie(key="token", value=cached_data["token"])
return json_response
except ProxyException:
raise
except Exception as e:
verbose_proxy_logger.exception(
"litellm.proxy.proxy_server.login_v3_exchange(): Exception occurred - {}".format(
str(e)
)
)
raise ProxyException(
message=str(e),
type=ProxyErrorTypes.auth_error,
param="None",
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
)


@app.get("/onboarding/get_token", include_in_schema=False)
async def onboarding(invite_link: str, request: Request):
"""
Expand Down
Loading