Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/openai/lib/_parsing/_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
import weakref
from typing import TYPE_CHECKING, Any, Iterable, cast
from typing_extensions import TypeVar, TypeGuard, assert_never

Expand Down Expand Up @@ -30,6 +31,9 @@
from ...types.chat.completion_create_params import ResponseFormat as ResponseFormatParam
from ...types.chat.chat_completion_message_function_tool_call import Function

# Cache to store weak references to schema objects
_schema_cache: weakref.WeakKeyDictionary[type, ResponseFormatParam] = weakref.WeakKeyDictionary()

ResponseFormatT = TypeVar(
"ResponseFormatT",
# if it isn't given then we don't do any parsing
Expand Down Expand Up @@ -284,6 +288,10 @@ def type_to_response_format_param(
# can only be a `type`
response_format = cast(type, response_format)

# Check if we already have a schema for this type in the cache
if response_format in _schema_cache:
return _schema_cache[response_format]

json_schema_type: type[pydantic.BaseModel] | pydantic.TypeAdapter[Any] | None = None

if is_basemodel_type(response_format):
Expand All @@ -295,11 +303,16 @@ def type_to_response_format_param(
else:
raise TypeError(f"Unsupported response_format type - {response_format}")

return {
schema_param: ResponseFormatParam = {
"type": "json_schema",
"json_schema": {
"schema": to_strict_json_schema(json_schema_type),
"name": name,
"strict": True,
},
}

# Store a weak reference to the schema parameter
_schema_cache[response_format] = schema_param

return schema_param
50 changes: 50 additions & 0 deletions tests/lib/_parsing/test_memory_leak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import gc
from typing import List

import pytest
from pydantic import Field, create_model

from openai.lib._parsing import type_to_response_format_param
from openai.lib._parsing._completions import _schema_cache


@pytest.mark.asyncio
async def test_async_completions_parse_memory() -> None:
"""Test if AsyncCompletions.parse() doesn't leak memory with dynamic models"""
# Create a base step model
StepModel = create_model(
"Step",
explanation=(str, Field()),
output=(str, Field()),
)

# Clear the cache before testing
_schema_cache.clear()

# Simulate the issue by creating multiple models and making calls
models: list[type] = []
for i in range(10):
# Create a new dynamic model each time
new_model = create_model(
f"MathResponse{i}",
steps=(List[StepModel], Field()), # type: ignore[valid-type]
final_answer=(str, Field()),
)
models.append(new_model)

# Convert to response format and check if it's in the cache
type_to_response_format_param(new_model)
assert new_model in _schema_cache

# Record cache size with all models referenced
cache_size_with_references = len(_schema_cache)

# Let the models go out of scope and trigger garbage collection
del models
gc.collect()

# After garbage collection, the cache should be significantly reduced
cache_size_after_gc = len(_schema_cache)
assert cache_size_after_gc < cache_size_with_references
# The cache size should be close to the initial size (with some tolerance)
assert cache_size_after_gc < cache_size_with_references / 2