diff --git a/libs/partners/openai/README.md b/libs/partners/openai/README.md index bcd17f8142456..bc1c720fd3813 100644 --- a/libs/partners/openai/README.md +++ b/libs/partners/openai/README.md @@ -20,6 +20,33 @@ See a [usage example](https://python.langchain.com/docs/integrations/chat/openai from langchain_openai import ChatOpenAI ``` +### High concurrency - optional OpenAI aiohttp backend + +For improved throughput in high-concurrency scenarios (parallel chains, graphs, and agents), you can enable the OpenAI aiohttp backend which removes concurrency limits seen with the default httpx client. + +**Installation:** +```bash +pip install "openai[aiohttp]" +``` + +**Usage:** +```python +from openai import DefaultAioHttpClient +from langchain_openai import ChatOpenAI + +# Option 1: Pass explicitly +llm = ChatOpenAI( + http_client=DefaultAioHttpClient(), + http_async_client=DefaultAioHttpClient() +) + +# Option 2: Use environment variable +# Set LC_OPENAI_USE_AIOHTTP=1 in your environment +llm = ChatOpenAI() # Will automatically use aiohttp if available +``` + +For more details, see the [OpenAI Python library documentation](https://github.com/openai/openai-python#httpx-client). + If you are using a model hosted on `Azure`, you should use different wrapper for that: ```python diff --git a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py index c41f500a454ef..26c90f9e8b947 100644 --- a/libs/partners/openai/langchain_openai/chat_models/_client_utils.py +++ b/libs/partners/openai/langchain_openai/chat_models/_client_utils.py @@ -8,6 +8,7 @@ import asyncio import os +import warnings from functools import lru_cache from typing import Any, Optional @@ -105,3 +106,68 @@ def _get_default_async_httpx_client( return _build_async_httpx_client(base_url, timeout) else: return _cached_async_httpx_client(base_url, timeout) + + +def _get_aiohttp_client() -> Optional[Any]: + """Get OpenAI DefaultAioHttpClient if available. + + Returns: + DefaultAioHttpClient instance if openai[aiohttp] is installed, None otherwise. + """ + try: + from openai import DefaultAioHttpClient + + return DefaultAioHttpClient() + except ImportError: + return None + + +def _should_use_aiohttp() -> bool: + """Check if aiohttp backend should be used based on environment variable. + + Returns: + True if LC_OPENAI_USE_AIOHTTP environment variable is set to a truthy value. + """ + return os.getenv("LC_OPENAI_USE_AIOHTTP", "").lower() in ("1", "true", "yes", "on") + + +def _get_http_client_for_aiohttp_env( + provided_client: Optional[Any], + base_url: Optional[str], + timeout: Any, + is_async: bool = False, +) -> Optional[Any]: + """Get appropriate HTTP client considering aiohttp environment variable. + + Args: + provided_client: User-provided http client (takes precedence) + base_url: OpenAI API base URL + timeout: Request timeout + is_async: Whether to get async client + + Returns: + Appropriate HTTP client or None to use OpenAI default + """ + # User-provided client takes precedence + if provided_client is not None: + return provided_client + + # Check if aiohttp should be used via environment variable + if _should_use_aiohttp(): + aiohttp_client = _get_aiohttp_client() + if aiohttp_client is not None: + return aiohttp_client + else: + warnings.warn( + "LC_OPENAI_USE_AIOHTTP is set but openai[aiohttp] is not installed. " + "Install with 'pip install \"openai[aiohttp]\"' to use the aiohttp " + "backend. Falling back to default httpx client.", + UserWarning, + stacklevel=3, + ) + + # Fall back to existing httpx client logic + if is_async: + return _get_default_async_httpx_client(base_url, timeout) + else: + return _get_default_httpx_client(base_url, timeout) diff --git a/libs/partners/openai/langchain_openai/chat_models/base.py b/libs/partners/openai/langchain_openai/chat_models/base.py index bf2fe87746a58..36b8104bf7787 100644 --- a/libs/partners/openai/langchain_openai/chat_models/base.py +++ b/libs/partners/openai/langchain_openai/chat_models/base.py @@ -102,10 +102,7 @@ from pydantic.v1 import BaseModel as BaseModelV1 from typing_extensions import Self -from langchain_openai.chat_models._client_utils import ( - _get_default_async_httpx_client, - _get_default_httpx_client, -) +from langchain_openai.chat_models._client_utils import _get_http_client_for_aiohttp_env from langchain_openai.chat_models._compat import ( _convert_from_v03_ai_message, _convert_to_v03_ai_message, @@ -555,13 +552,23 @@ class BaseChatOpenAI(BaseChatModel): # Configure a custom httpx client. See the # [httpx documentation](https://www.python-httpx.org/api/#client) for more details. http_client: Union[Any, None] = Field(default=None, exclude=True) - """Optional ``httpx.Client``. Only used for sync invocations. Must specify + """Optional HTTP client. Only used for sync invocations. Must specify ``http_async_client`` as well if you'd like a custom client for async invocations. + + Supports ``httpx.Client`` or ``openai.DefaultAioHttpClient`` for improved + concurrency. To use the aiohttp backend, install with + ``pip install "openai[aiohttp]"`` and pass ``DefaultAioHttpClient()`` or + set the environment variable ``LC_OPENAI_USE_AIOHTTP=1``. """ http_async_client: Union[Any, None] = Field(default=None, exclude=True) - """Optional ``httpx.AsyncClient``. Only used for async invocations. Must specify - ``http_client`` as well if you'd like a custom client for sync invocations.""" + """Optional HTTP client. Only used for async invocations. Must specify + ``http_client`` as well if you'd like a custom client for sync invocations. + + Supports ``httpx.AsyncClient`` or ``openai.DefaultAioHttpClient`` for improved + concurrency. To use the aiohttp backend, install with + ``pip install "openai[aiohttp]"`` and pass ``DefaultAioHttpClient()`` or + set the environment variable ``LC_OPENAI_USE_AIOHTTP=1``.""" stop: Optional[Union[list[str], str]] = Field(default=None, alias="stop_sequences") """Default stop sequences.""" extra_body: Optional[Mapping[str, Any]] = None @@ -787,8 +794,12 @@ def validate_environment(self) -> Self: proxy=self.openai_proxy, verify=global_ssl_context ) sync_specific = { - "http_client": self.http_client - or _get_default_httpx_client(self.openai_api_base, self.request_timeout) + "http_client": _get_http_client_for_aiohttp_env( + self.http_client, + self.openai_api_base, + self.request_timeout, + is_async=False, + ) } self.root_client = openai.OpenAI(**client_params, **sync_specific) # type: ignore[arg-type] self.client = self.root_client.chat.completions @@ -805,9 +816,11 @@ def validate_environment(self) -> Self: proxy=self.openai_proxy, verify=global_ssl_context ) async_specific = { - "http_client": self.http_async_client - or _get_default_async_httpx_client( - self.openai_api_base, self.request_timeout + "http_client": _get_http_client_for_aiohttp_env( + self.http_async_client, + self.openai_api_base, + self.request_timeout, + is_async=True, ) } self.root_async_client = openai.AsyncOpenAI( diff --git a/libs/partners/openai/tests/unit_tests/chat_models/test_base.py b/libs/partners/openai/tests/unit_tests/chat_models/test_base.py index 01a1e1cae9870..695617ce88021 100644 --- a/libs/partners/openai/tests/unit_tests/chat_models/test_base.py +++ b/libs/partners/openai/tests/unit_tests/chat_models/test_base.py @@ -1,6 +1,7 @@ """Test OpenAI Chat API wrapper.""" import json +import os from functools import partial from types import TracebackType from typing import Any, Literal, Optional, Union, cast @@ -2780,3 +2781,173 @@ def test_gpt_5_temperature(use_responses_api: bool) -> None: messages = [HumanMessage(content="Hello")] payload = llm._get_request_payload(messages) assert payload["temperature"] == 0.5 # gpt-5-chat is exception + + +def test_http_client_aiohttp_explicit() -> None: + """Test that custom http_client flows through to OpenAI client construction.""" + mock_client = MagicMock() + + # Mock the OpenAI client constructor to capture the http_client parameter + with ( + patch("openai.OpenAI") as mock_openai, + patch("openai.AsyncOpenAI") as mock_async_openai, + ): + mock_openai.return_value = MagicMock() + mock_async_openai.return_value = MagicMock() + + llm = ChatOpenAI( + http_client=mock_client, + http_async_client=mock_client, + api_key=SecretStr("test-key"), + ) + + # Trigger client initialization by accessing a client property + _ = llm.client + + # Verify the http_client was passed to OpenAI constructor + mock_openai.assert_called_once() + call_kwargs = mock_openai.call_args[1] + assert call_kwargs["http_client"] == mock_client + + # Verify async client gets the same client + mock_async_openai.assert_called_once() + async_call_kwargs = mock_async_openai.call_args[1] + assert async_call_kwargs["http_client"] == mock_client + + +def test_aiohttp_env_variable_enabled() -> None: + """Test that LC_OPENAI_USE_AIOHTTP=1 uses aiohttp client when available.""" + # Mock successful import of DefaultAioHttpClient + mock_aiohttp_client = MagicMock() + + with ( + patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "1"}), + patch("openai.DefaultAioHttpClient", return_value=mock_aiohttp_client), + patch("openai.OpenAI") as mock_openai, + patch("openai.AsyncOpenAI") as mock_async_openai, + ): + mock_openai.return_value = MagicMock() + mock_async_openai.return_value = MagicMock() + + llm = ChatOpenAI(api_key=SecretStr("test-key")) + _ = llm.client # Trigger client initialization + + # Verify aiohttp client was used for both sync and async + mock_openai.assert_called_once() + sync_call_kwargs = mock_openai.call_args[1] + assert sync_call_kwargs["http_client"] == mock_aiohttp_client + + mock_async_openai.assert_called_once() + async_call_kwargs = mock_async_openai.call_args[1] + assert async_call_kwargs["http_client"] == mock_aiohttp_client + + +def test_aiohttp_env_variable_disabled() -> None: + """Test that LC_OPENAI_USE_AIOHTTP=0 doesn't use aiohttp client.""" + with ( + patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "0"}), + patch("openai.OpenAI") as mock_openai, + patch("openai.AsyncOpenAI") as mock_async_openai, + ): + mock_openai.return_value = MagicMock() + mock_async_openai.return_value = MagicMock() + + llm = ChatOpenAI(api_key=SecretStr("test-key")) + _ = llm.client # Trigger client initialization + + # Verify httpx clients were used (not aiohttp) + mock_openai.assert_called_once() + sync_call_kwargs = mock_openai.call_args[1] + # Should be httpx client wrapper + assert hasattr(sync_call_kwargs["http_client"], "is_closed") + + mock_async_openai.assert_called_once() + async_call_kwargs = mock_async_openai.call_args[1] + assert hasattr(async_call_kwargs["http_client"], "is_closed") + + +def test_aiohttp_env_variable_fallback_with_warning() -> None: + """Test graceful fallback when aiohttp is not available but env var is set.""" + with ( + patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "1"}), + patch( + "openai.DefaultAioHttpClient", + side_effect=ImportError("No module named aiohttp"), + ), + patch("warnings.warn") as mock_warn, + patch("openai.OpenAI") as mock_openai, + patch("openai.AsyncOpenAI") as mock_async_openai, + ): + mock_openai.return_value = MagicMock() + mock_async_openai.return_value = MagicMock() + + llm = ChatOpenAI(api_key=SecretStr("test-key")) + _ = llm.client # Trigger client initialization + + # Verify warning was issued (called twice, once for sync and once for async) + assert mock_warn.call_count == 2 + warning_msg = mock_warn.call_args_list[0][0][0] + assert ( + "LC_OPENAI_USE_AIOHTTP is set but openai[aiohttp] is not installed" + in warning_msg + ) + assert 'pip install "openai[aiohttp]"' in warning_msg + + # Verify fallback to httpx clients + mock_openai.assert_called_once() + sync_call_kwargs = mock_openai.call_args[1] + assert hasattr(sync_call_kwargs["http_client"], "is_closed") + + +def test_aiohttp_env_variable_truthy_values() -> None: + """Test that various truthy values for LC_OPENAI_USE_AIOHTTP work.""" + mock_aiohttp_client = MagicMock() + + for env_value in ["1", "true", "TRUE", "yes", "YES", "on", "ON"]: + with ( + patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": env_value}), + patch("openai.DefaultAioHttpClient", return_value=mock_aiohttp_client), + patch("openai.OpenAI") as mock_openai, + patch("openai.AsyncOpenAI") as mock_async_openai, + ): + mock_openai.return_value = MagicMock() + mock_async_openai.return_value = MagicMock() + + llm = ChatOpenAI(api_key=SecretStr("test-key")) + _ = llm.client # Trigger client initialization + + # Verify aiohttp client was used + mock_openai.assert_called_once() + call_kwargs = mock_openai.call_args[1] + assert call_kwargs["http_client"] == mock_aiohttp_client + + +def test_explicit_http_client_overrides_env_variable() -> None: + """Test that explicitly provided http_client overrides environment variable.""" + explicit_client = MagicMock() + mock_aiohttp_client = MagicMock() + + with ( + patch.dict(os.environ, {"LC_OPENAI_USE_AIOHTTP": "1"}), + patch("openai.DefaultAioHttpClient", return_value=mock_aiohttp_client), + patch("openai.OpenAI") as mock_openai, + patch("openai.AsyncOpenAI") as mock_async_openai, + ): + mock_openai.return_value = MagicMock() + mock_async_openai.return_value = MagicMock() + + llm = ChatOpenAI( + http_client=explicit_client, + http_async_client=explicit_client, + api_key=SecretStr("test-key"), + ) + _ = llm.client # Trigger client initialization + + # Verify explicit client was used, not aiohttp + mock_openai.assert_called_once() + sync_call_kwargs = mock_openai.call_args[1] + assert sync_call_kwargs["http_client"] == explicit_client + + mock_async_openai.assert_called_once() + async_call_kwargs = mock_async_openai.call_args[1] + assert async_call_kwargs["http_client"] == explicit_client