Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 164 additions & 2 deletions src/openai/lib/_parsing/_completions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from __future__ import annotations


import json
import logging
import threading
from typing import TYPE_CHECKING, Any, Iterable, cast
from typing_extensions import TypeVar, TypeGuard, assert_never
from functools import lru_cache


import pydantic


from .._tools import PydanticFunctionTool
from ..._types import Omit, omit
from ..._utils import is_dict, is_given
Expand All @@ -30,16 +35,153 @@
from ...types.chat.completion_create_params import ResponseFormat as ResponseFormatParam
from ...types.chat.chat_completion_message_function_tool_call import Function


ResponseFormatT = TypeVar(
"ResponseFormatT",
# if it isn't given then we don't do any parsing
default=None,
)
_default_response_format: None = None


log: logging.Logger = logging.getLogger("openai.lib.parsing")


# ============================================================================
# FIX for Issue #2672: Thread-safe bounded TypeAdapter cache
# ============================================================================

_MAX_TYPE_ADAPTER_CACHE_SIZE = 128
_type_adapter_lock = threading.Lock()
_thread_local = threading.local()


def _get_type_cache_key(type_: Any) -> str:
"""
Generate a stable cache key for a type.

Uses type name and module information to create a key that
remains consistent across type recreations, preventing hash
conflicts in multi-threaded environments.

Args:
type_: The type to generate a key for

Returns:
A string key that uniquely identifies the type
"""
try:
# For generic types, extract the origin and args
if hasattr(type_, '__origin__'):
origin = type_.__origin__
args = getattr(type_, '__args__', ())

origin_key = f"{origin.__module__}.{origin.__qualname__}"
if args:
args_keys = ','.join(_get_type_cache_key(arg) for arg in args)
return f"{origin_key}[{args_keys}]"
return origin_key
else:
# For regular types
return f"{type_.__module__}.{type_.__qualname__}"
except (AttributeError, TypeError):
# Fallback to repr for complex types
return repr(type_)


def _get_cached_type_adapter(type_: type[ResponseFormatT]) -> pydantic.TypeAdapter[ResponseFormatT]:
"""
Get a cached TypeAdapter for the given type.

Uses thread-local storage with bounded cache size to prevent
memory leaks in multi-threaded environments (Issue #2672).

Args:
type_: The type to create an adapter for

Returns:
A TypeAdapter instance for the given type
"""
# Get or create thread-local cache
if not hasattr(_thread_local, 'adapter_cache'):
_thread_local.adapter_cache = {}

cache = _thread_local.adapter_cache

# Use stable type name as key instead of type hash
cache_key = _get_type_cache_key(type_)

if cache_key not in cache:
# Implement simple FIFO eviction if cache exceeds limit
if len(cache) >= _MAX_TYPE_ADAPTER_CACHE_SIZE:
# Remove oldest entry
first_key = next(iter(cache))
del cache[first_key]
log.debug(
"TypeAdapter cache size limit reached (%d), evicted oldest entry",
_MAX_TYPE_ADAPTER_CACHE_SIZE
)

# Create new TypeAdapter
cache[cache_key] = pydantic.TypeAdapter(type_)
log.debug("Created new TypeAdapter for type: %s", cache_key)

return cache[cache_key]


# Alternative: Global bounded cache with locking (use if thread-local has issues)
@lru_cache(maxsize=_MAX_TYPE_ADAPTER_CACHE_SIZE)
def _get_cached_type_adapter_global(cache_key: str, type_repr: str) -> Any:
"""
Global cached TypeAdapter factory with bounded LRU cache.

This is an alternative to thread-local caching. The actual TypeAdapter
must be created outside this function and cached separately.

Note: This function serves as a cache key manager only.
"""
# This is used as a bounded LRU cache manager
# The actual TypeAdapter creation happens in the calling function
return None


def _get_cached_type_adapter_with_lock(type_: type[ResponseFormatT]) -> pydantic.TypeAdapter[ResponseFormatT]:
"""
Get a cached TypeAdapter using global cache with explicit locking.

Alternative implementation using a global cache protected by locks.
Use _get_cached_type_adapter() for better thread isolation.

Args:
type_: The type to create an adapter for

Returns:
A TypeAdapter instance for the given type
"""
if not hasattr(_get_cached_type_adapter_with_lock, '_global_cache'):
_get_cached_type_adapter_with_lock._global_cache = {}

cache_key = _get_type_cache_key(type_)

with _type_adapter_lock:
cache = _get_cached_type_adapter_with_lock._global_cache

if cache_key not in cache:
if len(cache) >= _MAX_TYPE_ADAPTER_CACHE_SIZE:
# Remove first entry (FIFO)
first_key = next(iter(cache))
del cache[first_key]

cache[cache_key] = pydantic.TypeAdapter(type_)

return cache[cache_key]


# ============================================================================
# End of fix for Issue #2672
# ============================================================================


def is_strict_chat_completion_tool_param(
tool: ChatCompletionToolUnionParam,
) -> TypeGuard[ChatCompletionFunctionToolParam]:
Expand Down Expand Up @@ -258,14 +400,33 @@ def is_parseable_tool(input_tool: ChatCompletionToolUnionParam) -> bool:


def _parse_content(response_format: type[ResponseFormatT], content: str) -> ResponseFormatT:
"""
Parse content string into the specified response format.

FIXED: Uses bounded thread-safe TypeAdapter cache to prevent memory leaks
in multi-threaded environments (Issue #2672).

Args:
response_format: The target type for parsing
content: The JSON string content to parse

Returns:
Parsed content of type ResponseFormatT

Raises:
TypeError: If the response format type is not supported
"""
if is_basemodel_type(response_format):
return cast(ResponseFormatT, model_parse_json(response_format, content))

if is_dataclass_like_type(response_format):
if PYDANTIC_V1:
raise TypeError(f"Non BaseModel types are only supported with Pydantic v2 - {response_format}")

return pydantic.TypeAdapter(response_format).validate_json(content)
# FIXED: Use cached TypeAdapter instead of creating new instances
# This prevents unbounded memory growth in multi-threaded scenarios
adapter = _get_cached_type_adapter(response_format)
return adapter.validate_json(content)

raise TypeError(f"Unable to automatically parse response format type {response_format}")

Expand All @@ -291,7 +452,8 @@ def type_to_response_format_param(
json_schema_type = response_format
elif is_dataclass_like_type(response_format):
name = response_format.__name__
json_schema_type = pydantic.TypeAdapter(response_format)
# FIXED: Use cached TypeAdapter here as well
json_schema_type = _get_cached_type_adapter(response_format)
else:
raise TypeError(f"Unsupported response_format type - {response_format}")

Expand Down
112 changes: 112 additions & 0 deletions src/openai/lib/_parsing/_type_adapter_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Thread-safe TypeAdapter cache with bounded size."""

from __future__ import annotations

import threading
from typing import Any, TypeVar, Generic
from functools import lru_cache
from pydantic import TypeAdapter

T = TypeVar('T')

# Use a bounded cache instead of unbounded
_MAX_CACHE_SIZE = 128

# Thread-local storage for type adapters to prevent hash conflicts
_thread_local = threading.local()


def get_type_adapter(type_: type[T]) -> TypeAdapter[T]:
"""
Get a TypeAdapter instance for the given type.

Uses a thread-safe, bounded cache to prevent memory leaks
in multi-threaded environments.

Args:
type_: The type to create an adapter for

Returns:
A TypeAdapter instance for the given type
"""
# Get or create thread-local cache
if not hasattr(_thread_local, 'adapter_cache'):
_thread_local.adapter_cache = {}

cache = _thread_local.adapter_cache

# Use the fully qualified name as key instead of the type object itself
# This avoids hash conflicts from dynamically generated generic types
cache_key = _get_type_cache_key(type_)

if cache_key not in cache:
# Implement LRU eviction if cache is too large
if len(cache) >= _MAX_CACHE_SIZE:
# Remove oldest item (simple FIFO for thread-local cache)
first_key = next(iter(cache))
del cache[first_key]

cache[cache_key] = TypeAdapter(type_)

return cache[cache_key]


def _get_type_cache_key(type_: Any) -> str:
"""
Generate a stable cache key for a type.

Uses type name and module information to create a key that
remains consistent across type recreations.

Args:
type_: The type to generate a key for

Returns:
A string key that uniquely identifies the type
"""
try:
# For generic types, extract the origin and args
if hasattr(type_, '__origin__'):
origin = type_.__origin__
args = getattr(type_, '__args__', ())

origin_key = f"{origin.__module__}.{origin.__qualname__}"
args_keys = ','.join(_get_type_cache_key(arg) for arg in args)

return f"{origin_key}[{args_keys}]"
else:
# For regular types
return f"{type_.__module__}.{type_.__qualname__}"
except (AttributeError, TypeError):
# Fallback to repr for complex types
return repr(type_)


# Alternative implementation using a global bounded LRU cache with locks
_cache_lock = threading.Lock()

@lru_cache(maxsize=_MAX_CACHE_SIZE)
def get_type_adapter_global(type_key: str, type_: type[T]) -> TypeAdapter[T]:
"""
Global cached TypeAdapter factory with bounded size.

This is thread-safe but uses a global cache.
Use get_type_adapter() for better thread isolation.
"""
return TypeAdapter(type_)


def get_type_adapter_with_lock(type_: type[T]) -> TypeAdapter[T]:
"""
Get a TypeAdapter using a global cache with explicit locking.

Args:
type_: The type to create an adapter for

Returns:
A TypeAdapter instance for the given type
"""
cache_key = _get_type_cache_key(type_)

with _cache_lock:
return get_type_adapter_global(cache_key, type_)