Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
_ANTHROPIC_ENDPOINT = "https://api.anthropic.com/v1"
_SYNC_CLIENT_PATCH = "data_designer.engine.models.clients.adapters.http_model_client.lazy.httpx.Client"
_ASYNC_CLIENT_PATCH = "data_designer.engine.models.clients.adapters.http_model_client.lazy.httpx.AsyncClient"
_HTTP_TRANSPORT_PATCH = "data_designer.engine.models.clients.adapters.http_model_client.lazy.httpx.HTTPTransport"
_ASYNC_HTTP_TRANSPORT_PATCH = (
"data_designer.engine.models.clients.adapters.http_model_client.lazy.httpx.AsyncHTTPTransport"
)


def _make_openai_client(
*,
concurrency_mode: ClientConcurrencyMode = ClientConcurrencyMode.SYNC,
sync_client: MagicMock | None = None,
async_client: MagicMock | None = None,
**kwargs: Any,
) -> OpenAICompatibleClient:
return OpenAICompatibleClient(
provider_name=_OPENAI_PROVIDER,
Expand All @@ -38,6 +43,7 @@ def _make_openai_client(
concurrency_mode=concurrency_mode,
sync_client=sync_client,
async_client=async_client,
**kwargs,
)


Expand All @@ -46,6 +52,7 @@ def _make_anthropic_client(
concurrency_mode: ClientConcurrencyMode = ClientConcurrencyMode.SYNC,
sync_client: MagicMock | None = None,
async_client: MagicMock | None = None,
**kwargs: Any,
) -> AnthropicClient:
return AnthropicClient(
provider_name=_ANTHROPIC_PROVIDER,
Expand All @@ -54,6 +61,7 @@ def _make_anthropic_client(
concurrency_mode=concurrency_mode,
sync_client=sync_client,
async_client=async_client,
**kwargs,
)


Expand Down Expand Up @@ -295,17 +303,91 @@ async def test_acompletion_lazy_initializes_async_client(
# Connection pool size regression tests (issue #459)
# ---------------------------------------------------------------------------

_POOL_LIMITS_CASES = [
pytest.param(_make_openai_client, id="openai"),
pytest.param(_make_anthropic_client, id="anthropic"),
]

_SYNC_TRANSPORT_WIRING_CASES = [
pytest.param(_make_openai_client, _OPENAI_MODEL, _make_openai_chat_response(), id="openai"),
pytest.param(_make_anthropic_client, _ANTHROPIC_MODEL, _make_anthropic_chat_response(), id="anthropic"),
]

_ASYNC_TRANSPORT_WIRING_CASES = [
pytest.param(_make_openai_client, _OPENAI_MODEL, _make_openai_chat_response(), id="openai"),
pytest.param(_make_anthropic_client, _ANTHROPIC_MODEL, _make_anthropic_chat_response(), id="anthropic"),
]


def test_client_limits_respect_max_parallel_requests() -> None:
"""Connection pool limits must reflect max_parallel_requests (regression for issue #459).
@pytest.mark.parametrize("client_factory", _POOL_LIMITS_CASES)
def test_client_limits_respect_max_parallel_requests(client_factory: Callable[..., Any]) -> None:
"""Connection pool limits must reflect max_parallel_requests.

pool_max = max(32, 2 * max_parallel_requests) = max(32, 600) = 600
"""
client = OpenAICompatibleClient(
provider_name=_OPENAI_PROVIDER,
endpoint=_OPENAI_ENDPOINT,
api_key="sk-test",
max_parallel_requests=300,
client = client_factory(
concurrency_mode=ClientConcurrencyMode.SYNC,
max_parallel_requests=300,
)
assert client.limits.max_connections == 600


@pytest.mark.parametrize(("client_factory", "model_name", "response_json"), _SYNC_TRANSPORT_WIRING_CASES)
@patch(_HTTP_TRANSPORT_PATCH)
@patch(_SYNC_CLIENT_PATCH)
def test_sync_pool_limits_forwarded_to_transport(
mock_client_cls: MagicMock,
mock_transport_cls: MagicMock,
client_factory: Callable[..., Any],
model_name: str,
response_json: dict[str, Any],
) -> None:
"""Regression for #459: limits must reach HTTPTransport, not just httpx.Client.

The pre-fix code passed limits= to httpx.Client which silently ignores it
when a custom transport= is provided. The fix constructs HTTPTransport
with the correct limits before wrapping it in RetryTransport. This test
fails on the pre-fix code because HTTPTransport was never constructed
explicitly (assert_called_once fails).
"""
mock_client_cls.return_value = MagicMock(post=MagicMock(return_value=mock_httpx_response(response_json)))
client = client_factory(
concurrency_mode=ClientConcurrencyMode.SYNC,
max_parallel_requests=300,
)
client.completion(_make_chat_request(model_name))

mock_transport_cls.assert_called_once()
limits = mock_transport_cls.call_args.kwargs["limits"]
assert limits.max_connections == 600
assert limits.max_keepalive_connections == 300


@pytest.mark.parametrize(("client_factory", "model_name", "response_json"), _ASYNC_TRANSPORT_WIRING_CASES)
@patch(_ASYNC_HTTP_TRANSPORT_PATCH)
@patch(_ASYNC_CLIENT_PATCH)
@pytest.mark.asyncio
async def test_async_pool_limits_forwarded_to_transport(
mock_client_cls: MagicMock,
mock_transport_cls: MagicMock,
client_factory: Callable[..., Any],
model_name: str,
response_json: dict[str, Any],
) -> None:
"""Regression for #459: limits must reach AsyncHTTPTransport for async clients.

Same issue as the sync path β€” the pre-fix code never explicitly constructed
AsyncHTTPTransport, so RetryTransport created a default pool with 100
connections regardless of max_parallel_requests.
"""
mock_client_cls.return_value = MagicMock(post=AsyncMock(return_value=mock_httpx_response(response_json)))
client = client_factory(
concurrency_mode=ClientConcurrencyMode.ASYNC,
max_parallel_requests=300,
)
await client.acompletion(_make_chat_request(model_name))

mock_transport_cls.assert_called_once()
limits = mock_transport_cls.call_args.kwargs["limits"]
assert limits.max_connections == 600
assert limits.max_keepalive_connections == 300
Loading