Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions libs/partners/openai/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,33 @@ See a [usage example](https://python.langchain.com/docs/integrations/chat/openai
from langchain_openai import ChatOpenAI
```

### High concurrency - optional OpenAI aiohttp backend

For improved throughput in high-concurrency scenarios (parallel chains, graphs, and agents), you can enable the OpenAI aiohttp backend which removes concurrency limits seen with the default httpx client.

**Installation:**
```bash
pip install "openai[aiohttp]"
```

**Usage:**
```python
from openai import DefaultAioHttpClient
from langchain_openai import ChatOpenAI

# Option 1: Pass explicitly
llm = ChatOpenAI(
http_client=DefaultAioHttpClient(),
http_async_client=DefaultAioHttpClient()
)

# Option 2: Use environment variable
# Set LC_OPENAI_USE_AIOHTTP=1 in your environment
llm = ChatOpenAI() # Will automatically use aiohttp if available
```

For more details, see the [OpenAI Python library documentation](https://github.com/openai/openai-python#httpx-client).

If you are using a model hosted on `Azure`, you should use different wrapper for that:

```python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import asyncio
import os
import warnings
from functools import lru_cache
from typing import Any, Optional

Expand Down Expand Up @@ -105,3 +106,68 @@ def _get_default_async_httpx_client(
return _build_async_httpx_client(base_url, timeout)
else:
return _cached_async_httpx_client(base_url, timeout)


def _get_aiohttp_client() -> Optional[Any]:
"""Get OpenAI DefaultAioHttpClient if available.

Returns:
DefaultAioHttpClient instance if openai[aiohttp] is installed, None otherwise.
"""
try:
from openai import DefaultAioHttpClient

return DefaultAioHttpClient()
except ImportError:
return None


def _should_use_aiohttp() -> bool:
"""Check if aiohttp backend should be used based on environment variable.

Returns:
True if LC_OPENAI_USE_AIOHTTP environment variable is set to a truthy value.
"""
return os.getenv("LC_OPENAI_USE_AIOHTTP", "").lower() in ("1", "true", "yes", "on")


def _get_http_client_for_aiohttp_env(
provided_client: Optional[Any],
base_url: Optional[str],
timeout: Any,
is_async: bool = False,
) -> Optional[Any]:
"""Get appropriate HTTP client considering aiohttp environment variable.

Args:
provided_client: User-provided http client (takes precedence)
base_url: OpenAI API base URL
timeout: Request timeout
is_async: Whether to get async client

Returns:
Appropriate HTTP client or None to use OpenAI default
"""
# User-provided client takes precedence
if provided_client is not None:
return provided_client

# Check if aiohttp should be used via environment variable
if _should_use_aiohttp():
aiohttp_client = _get_aiohttp_client()
if aiohttp_client is not None:
return aiohttp_client
else:
warnings.warn(
"LC_OPENAI_USE_AIOHTTP is set but openai[aiohttp] is not installed. "
"Install with 'pip install \"openai[aiohttp]\"' to use the aiohttp "
"backend. Falling back to default httpx client.",
UserWarning,
stacklevel=3,
)

# Fall back to existing httpx client logic
if is_async:
return _get_default_async_httpx_client(base_url, timeout)
else:
return _get_default_httpx_client(base_url, timeout)
37 changes: 25 additions & 12 deletions libs/partners/openai/langchain_openai/chat_models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@
from pydantic.v1 import BaseModel as BaseModelV1
from typing_extensions import Self

from langchain_openai.chat_models._client_utils import (
_get_default_async_httpx_client,
_get_default_httpx_client,
)
from langchain_openai.chat_models._client_utils import _get_http_client_for_aiohttp_env
from langchain_openai.chat_models._compat import (
_convert_from_v03_ai_message,
_convert_to_v03_ai_message,
Expand Down Expand Up @@ -555,13 +552,23 @@ class BaseChatOpenAI(BaseChatModel):
# Configure a custom httpx client. See the
# [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
http_client: Union[Any, None] = Field(default=None, exclude=True)
"""Optional ``httpx.Client``. Only used for sync invocations. Must specify
"""Optional HTTP client. Only used for sync invocations. Must specify
``http_async_client`` as well if you'd like a custom client for async
invocations.

Supports ``httpx.Client`` or ``openai.DefaultAioHttpClient`` for improved
concurrency. To use the aiohttp backend, install with
``pip install "openai[aiohttp]"`` and pass ``DefaultAioHttpClient()`` or
set the environment variable ``LC_OPENAI_USE_AIOHTTP=1``.
"""
http_async_client: Union[Any, None] = Field(default=None, exclude=True)
"""Optional ``httpx.AsyncClient``. Only used for async invocations. Must specify
``http_client`` as well if you'd like a custom client for sync invocations."""
"""Optional HTTP client. Only used for async invocations. Must specify
``http_client`` as well if you'd like a custom client for sync invocations.

Supports ``httpx.AsyncClient`` or ``openai.DefaultAioHttpClient`` for improved
concurrency. To use the aiohttp backend, install with
``pip install "openai[aiohttp]"`` and pass ``DefaultAioHttpClient()`` or
set the environment variable ``LC_OPENAI_USE_AIOHTTP=1``."""
stop: Optional[Union[list[str], str]] = Field(default=None, alias="stop_sequences")
"""Default stop sequences."""
extra_body: Optional[Mapping[str, Any]] = None
Expand Down Expand Up @@ -787,8 +794,12 @@ def validate_environment(self) -> Self:
proxy=self.openai_proxy, verify=global_ssl_context
)
sync_specific = {
"http_client": self.http_client
or _get_default_httpx_client(self.openai_api_base, self.request_timeout)
"http_client": _get_http_client_for_aiohttp_env(
self.http_client,
self.openai_api_base,
self.request_timeout,
is_async=False,
)
}
self.root_client = openai.OpenAI(**client_params, **sync_specific) # type: ignore[arg-type]
self.client = self.root_client.chat.completions
Expand All @@ -805,9 +816,11 @@ def validate_environment(self) -> Self:
proxy=self.openai_proxy, verify=global_ssl_context
)
async_specific = {
"http_client": self.http_async_client
or _get_default_async_httpx_client(
self.openai_api_base, self.request_timeout
"http_client": _get_http_client_for_aiohttp_env(
self.http_async_client,
self.openai_api_base,
self.request_timeout,
is_async=True,
)
}
self.root_async_client = openai.AsyncOpenAI(
Expand Down
171 changes: 171 additions & 0 deletions libs/partners/openai/tests/unit_tests/chat_models/test_base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test OpenAI Chat API wrapper."""

import json
import os
from functools import partial
from types import TracebackType
from typing import Any, Literal, Optional, Union, cast
Expand Down Expand Up @@ -2780,3 +2781,173 @@ def test_gpt_5_temperature(use_responses_api: bool) -> None:
messages = [HumanMessage(content="Hello")]
payload = llm._get_request_payload(messages)
assert payload["temperature"] == 0.5 # gpt-5-chat is exception


def test_http_client_aiohttp_explicit() -> None:
"""Test that custom http_client flows through to OpenAI client construction."""
mock_client = MagicMock()

# Mock the OpenAI client constructor to capture the http_client parameter
with (
patch("openai.OpenAI") as mock_openai,
patch("openai.AsyncOpenAI") as mock_async_openai,
):
mock_openai.return_value = MagicMock()
mock_async_openai.return_value = MagicMock()

llm = ChatOpenAI(
http_client=mock_client,
http_async_client=mock_client,
api_key=SecretStr("test-key"),
)

# Trigger client initialization by accessing a client property
_ = llm.client

# Verify the http_client was passed to OpenAI constructor
mock_openai.assert_called_once()
call_kwargs = mock_openai.call_args[1]
assert call_kwargs["http_client"] == mock_client

# Verify async client gets the same client
mock_async_openai.assert_called_once()
async_call_kwargs = mock_async_openai.call_args[1]
assert async_call_kwargs["http_client"] == mock_client


def test_aiohttp_env_variable_enabled() -> None:
"""Test that LC_OPENAI_USE_AIOHTTP=1 uses aiohttp client when available."""
# Mock successful import of DefaultAioHttpClient
mock_aiohttp_client = MagicMock()

with (
patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "1"}),
patch("openai.DefaultAioHttpClient", return_value=mock_aiohttp_client),
patch("openai.OpenAI") as mock_openai,
patch("openai.AsyncOpenAI") as mock_async_openai,
):
mock_openai.return_value = MagicMock()
mock_async_openai.return_value = MagicMock()

llm = ChatOpenAI(api_key=SecretStr("test-key"))
_ = llm.client # Trigger client initialization

# Verify aiohttp client was used for both sync and async
mock_openai.assert_called_once()
sync_call_kwargs = mock_openai.call_args[1]
assert sync_call_kwargs["http_client"] == mock_aiohttp_client

mock_async_openai.assert_called_once()
async_call_kwargs = mock_async_openai.call_args[1]
assert async_call_kwargs["http_client"] == mock_aiohttp_client


def test_aiohttp_env_variable_disabled() -> None:
"""Test that LC_OPENAI_USE_AIOHTTP=0 doesn't use aiohttp client."""
with (
patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "0"}),
patch("openai.OpenAI") as mock_openai,
patch("openai.AsyncOpenAI") as mock_async_openai,
):
mock_openai.return_value = MagicMock()
mock_async_openai.return_value = MagicMock()

llm = ChatOpenAI(api_key=SecretStr("test-key"))
_ = llm.client # Trigger client initialization

# Verify httpx clients were used (not aiohttp)
mock_openai.assert_called_once()
sync_call_kwargs = mock_openai.call_args[1]
# Should be httpx client wrapper
assert hasattr(sync_call_kwargs["http_client"], "is_closed")

mock_async_openai.assert_called_once()
async_call_kwargs = mock_async_openai.call_args[1]
assert hasattr(async_call_kwargs["http_client"], "is_closed")


def test_aiohttp_env_variable_fallback_with_warning() -> None:
"""Test graceful fallback when aiohttp is not available but env var is set."""
with (
patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "1"}),
patch(
"openai.DefaultAioHttpClient",
side_effect=ImportError("No module named aiohttp"),
),
patch("warnings.warn") as mock_warn,
patch("openai.OpenAI") as mock_openai,
patch("openai.AsyncOpenAI") as mock_async_openai,
):
mock_openai.return_value = MagicMock()
mock_async_openai.return_value = MagicMock()

llm = ChatOpenAI(api_key=SecretStr("test-key"))
_ = llm.client # Trigger client initialization

# Verify warning was issued (called twice, once for sync and once for async)
assert mock_warn.call_count == 2
warning_msg = mock_warn.call_args_list[0][0][0]
assert (
"LC_OPENAI_USE_AIOHTTP is set but openai[aiohttp] is not installed"
in warning_msg
)
assert 'pip install "openai[aiohttp]"' in warning_msg

# Verify fallback to httpx clients
mock_openai.assert_called_once()
sync_call_kwargs = mock_openai.call_args[1]
assert hasattr(sync_call_kwargs["http_client"], "is_closed")


def test_aiohttp_env_variable_truthy_values() -> None:
"""Test that various truthy values for LC_OPENAI_USE_AIOHTTP work."""
mock_aiohttp_client = MagicMock()

for env_value in ["1", "true", "TRUE", "yes", "YES", "on", "ON"]:
with (
patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": env_value}),
patch("openai.DefaultAioHttpClient", return_value=mock_aiohttp_client),
patch("openai.OpenAI") as mock_openai,
patch("openai.AsyncOpenAI") as mock_async_openai,
):
mock_openai.return_value = MagicMock()
mock_async_openai.return_value = MagicMock()

llm = ChatOpenAI(api_key=SecretStr("test-key"))
_ = llm.client # Trigger client initialization

# Verify aiohttp client was used
mock_openai.assert_called_once()
call_kwargs = mock_openai.call_args[1]
assert call_kwargs["http_client"] == mock_aiohttp_client


def test_explicit_http_client_overrides_env_variable() -> None:
"""Test that explicitly provided http_client overrides environment variable."""
explicit_client = MagicMock()
mock_aiohttp_client = MagicMock()

with (
patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "1"}),
patch("openai.DefaultAioHttpClient", return_value=mock_aiohttp_client),
patch("openai.OpenAI") as mock_openai,
patch("openai.AsyncOpenAI") as mock_async_openai,
):
mock_openai.return_value = MagicMock()
mock_async_openai.return_value = MagicMock()

llm = ChatOpenAI(
http_client=explicit_client,
http_async_client=explicit_client,
api_key=SecretStr("test-key"),
)
_ = llm.client # Trigger client initialization

# Verify explicit client was used, not aiohttp
mock_openai.assert_called_once()
sync_call_kwargs = mock_openai.call_args[1]
assert sync_call_kwargs["http_client"] == explicit_client

mock_async_openai.assert_called_once()
async_call_kwargs = mock_async_openai.call_args[1]
assert async_call_kwargs["http_client"] == explicit_client
Loading