From ee52dcb5adec7699faeb69c8e02b909a4b67ae4c Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Mon, 22 Sep 2025 14:55:07 +0400 Subject: [PATCH 1/8] Support Trio on httpx backend The main limitation is that sniffing isn't supported, as the way it's currently designed (starting a task in the background and never collecting it) is not compatible with structured concurrency. --- elastic_transport/_async_transport.py | 25 ++++++--- elastic_transport/_node/_http_httpx.py | 4 +- setup.py | 2 + tests/async_/test_async_transport.py | 78 ++++++++++++++++---------- tests/async_/test_httpbin.py | 16 ++++-- tests/node/test_http_httpx.py | 2 +- tests/node/test_tls_versions.py | 25 +++++++-- tests/test_logging.py | 34 ++++++++--- 8 files changed, 125 insertions(+), 61 deletions(-) diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 33017147..50abb9cd 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -17,6 +17,8 @@ import asyncio import logging +import time +import sniffio from typing import ( Any, Awaitable, @@ -169,6 +171,7 @@ def __init__( # time it's needed. Gets set within '_async_call()' which should # precede all logic within async calls. self._loop: asyncio.AbstractEventLoop = None # type: ignore[assignment] + self._async_library: str = None # type: ignore[assignment] # AsyncTransport doesn't require a thread lock for # sniffing. Uses '_sniffing_task' instead. @@ -258,7 +261,7 @@ async def perform_request( # type: ignore[override, return] node_failure = False last_response: Optional[TransportApiResponse] = None node: BaseAsyncNode = self.node_pool.get() # type: ignore[assignment] - start_time = self._loop.time() + start_time = time.monotonic() try: otel_span.set_node_metadata( node.host, node.port, node.base_url, target, method @@ -277,7 +280,7 @@ async def perform_request( # type: ignore[override, return] node.base_url, target, resp.meta.status, - self._loop.time() - start_time, + time.monotonic() - start_time, ) ) @@ -300,7 +303,7 @@ async def perform_request( # type: ignore[override, return] node.base_url, target, "N/A", - self._loop.time() - start_time, + time.monotonic() - start_time, ) ) @@ -377,6 +380,10 @@ async def perform_request( # type: ignore[override, return] ) async def sniff(self, is_initial_sniff: bool = False) -> None: # type: ignore[override] + if sniffio.current_async_library() != "asyncio": + raise ValueError( + f"Asynchronous sniffing only works with the 'asyncio' library, got {sniffio.current_async_library}" + ) await self._async_call() task = self._create_sniffing_task(is_initial_sniff) @@ -409,8 +416,7 @@ def _should_sniff(self, is_initial_sniff: bool) -> bool: self._sniffing_task.result() return ( - self._loop.time() - self._last_sniffed_at - >= self._min_delay_between_sniffing + time.monotonic() - self._last_sniffed_at >= self._min_delay_between_sniffing ) def _create_sniffing_task( @@ -429,7 +435,7 @@ async def _sniffing_task_impl(self, is_initial_sniff: bool) -> None: """Implementation of the sniffing task""" previously_sniffed_at = self._last_sniffed_at try: - self._last_sniffed_at = self._loop.time() + self._last_sniffed_at = time.monotonic() options = SniffOptions( is_initial_sniff=is_initial_sniff, sniff_timeout=self._sniff_timeout ) @@ -466,8 +472,13 @@ async def _async_call(self) -> None: because we're not guaranteed to be within an active asyncio event loop when __init__() is called. """ - if self._loop is not None: + if self._async_library is not None: return # Call at most once! + + self._async_library = sniffio.current_async_library() + if self._async_library != "asyncio": + return + self._loop = asyncio.get_running_loop() if self._sniff_on_start: await self.sniff(True) diff --git a/elastic_transport/_node/_http_httpx.py b/elastic_transport/_node/_http_httpx.py index 04ceb60a..9ecc6c99 100644 --- a/elastic_transport/_node/_http_httpx.py +++ b/elastic_transport/_node/_http_httpx.py @@ -175,11 +175,11 @@ async def perform_request( # type: ignore[override] body=body, exception=err, ) - raise err from None + raise err from e meta = ApiResponseMeta( resp.status_code, - resp.http_version, + resp.http_version.lstrip("HTTP/"), HttpHeaders(resp.headers), duration, self.config, diff --git a/setup.py b/setup.py index 21832718..149033b7 100644 --- a/setup.py +++ b/setup.py @@ -52,6 +52,8 @@ install_requires=[ "urllib3>=1.26.2, <3", "certifi", + "sniffio", + "anyio", ], python_requires=">=3.8", extras_require={ diff --git a/tests/async_/test_async_transport.py b/tests/async_/test_async_transport.py index 24a869ce..046507da 100644 --- a/tests/async_/test_async_transport.py +++ b/tests/async_/test_async_transport.py @@ -24,6 +24,9 @@ import warnings from unittest import mock +import anyio +import sniffio + import pytest from elastic_transport import ( @@ -45,9 +48,11 @@ from tests.conftest import AsyncDummyNode -@pytest.mark.asyncio +@pytest.mark.anyio async def test_async_transport_httpbin(httpbin_node_config, httpbin): - t = AsyncTransport([httpbin_node_config], meta_header=False) + t = AsyncTransport( + [httpbin_node_config], meta_header=False, node_class=HttpxAsyncHttpNode + ) resp, data = await t.perform_request("GET", "/anything?key=value") assert resp.status == 200 @@ -57,6 +62,8 @@ async def test_async_transport_httpbin(httpbin_node_config, httpbin): data["headers"].pop("X-Amzn-Trace-Id", None) assert data["headers"] == { + "Accept": "*/*", + "Accept-Encoding": "gzip, deflate, br", "User-Agent": DEFAULT_USER_AGENT, "Connection": "keep-alive", "Host": f"{httpbin.host}:{httpbin.port}", @@ -66,7 +73,7 @@ async def test_async_transport_httpbin(httpbin_node_config, httpbin): @pytest.mark.skipif( sys.version_info < (3, 8), reason="Mock didn't support async before Python 3.8" ) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_transport_close_node_pool(): t = AsyncTransport([NodeConfig("http", "localhost", 443)]) with mock.patch.object(t.node_pool.all()[0], "close") as node_close: @@ -74,7 +81,7 @@ async def test_transport_close_node_pool(): node_close.assert_called_with() -@pytest.mark.asyncio +@pytest.mark.anyio async def test_request_with_custom_user_agent_header(): t = AsyncTransport( [NodeConfig("http", "localhost", 80)], @@ -91,7 +98,7 @@ async def test_request_with_custom_user_agent_header(): } == t.node_pool.get().calls[0][1] -@pytest.mark.asyncio +@pytest.mark.anyio async def test_body_gets_encoded_into_bytes(): t = AsyncTransport([NodeConfig("http", "localhost", 80)], node_class=AsyncDummyNode) @@ -105,7 +112,7 @@ async def test_body_gets_encoded_into_bytes(): assert kwargs["body"] == b'{"key":"\xe4\xbd\xa0\xe5\xa5\xbd"}' -@pytest.mark.asyncio +@pytest.mark.anyio async def test_body_bytes_get_passed_untouched(): t = AsyncTransport([NodeConfig("http", "localhost", 80)], node_class=AsyncDummyNode) @@ -131,7 +138,7 @@ def test_kwargs_passed_on_to_node_pool(): assert dt is t.node_pool.max_dead_node_backoff -@pytest.mark.asyncio +@pytest.mark.anyio async def test_request_will_fail_after_x_retries(): t = AsyncTransport( [ @@ -154,7 +161,7 @@ async def test_request_will_fail_after_x_retries(): @pytest.mark.parametrize("retry_on_timeout", [True, False]) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_retry_on_timeout(retry_on_timeout): t = AsyncTransport( [ @@ -189,7 +196,7 @@ async def test_retry_on_timeout(retry_on_timeout): assert len(e.value.errors) == 0 -@pytest.mark.asyncio +@pytest.mark.anyio async def test_retry_on_status(): t = AsyncTransport( [ @@ -233,7 +240,7 @@ async def test_retry_on_status(): ] -@pytest.mark.asyncio +@pytest.mark.anyio async def test_failed_connection_will_be_marked_as_dead(): t = AsyncTransport( [ @@ -262,7 +269,7 @@ async def test_failed_connection_will_be_marked_as_dead(): assert all(isinstance(error, ConnectionError) for error in e.value.errors) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_resurrected_connection_will_be_marked_as_live_on_success(): for method in ("GET", "HEAD"): t = AsyncTransport( @@ -283,7 +290,7 @@ async def test_resurrected_connection_will_be_marked_as_live_on_success(): assert 1 == len(t.node_pool._dead_nodes.queue) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_mark_dead_error_doesnt_raise(): t = AsyncTransport( [ @@ -303,7 +310,7 @@ async def test_mark_dead_error_doesnt_raise(): mark_dead.assert_called_with(bad_node) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_node_class_as_string(): t = AsyncTransport([NodeConfig("http", "localhost", 80)], node_class="aiohttp") assert isinstance(t.node_pool.get(), AiohttpHttpNode) @@ -320,7 +327,7 @@ async def test_node_class_as_string(): @pytest.mark.parametrize(["status", "boolean"], [(200, True), (299, True)]) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_head_response_true(status, boolean): t = AsyncTransport( [NodeConfig("http", "localhost", 80, _extras={"status": status, "body": b""})], @@ -331,7 +338,7 @@ async def test_head_response_true(status, boolean): assert data is None -@pytest.mark.asyncio +@pytest.mark.anyio async def test_head_response_false(): t = AsyncTransport( [NodeConfig("http", "localhost", 80, _extras={"status": 404, "body": b""})], @@ -353,7 +360,7 @@ async def test_head_response_false(): (HttpxAsyncHttpNode, "hx"), ], ) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_transport_client_meta_node_class(node_class, client_short_name): t = AsyncTransport([NodeConfig("http", "localhost", 80)], node_class=node_class) assert ( @@ -366,7 +373,7 @@ async def test_transport_client_meta_node_class(node_class, client_short_name): ) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_transport_default_client_meta_node_class(): # Defaults to aiohttp t = AsyncTransport( @@ -635,7 +642,7 @@ async def test_sniff_on_start_no_results_errors(sniff_callback): @pytest.mark.parametrize("pool_size", [1, 8]) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_multiple_tasks_test(pool_size): node_configs = [ NodeConfig("http", "localhost", 80), @@ -648,34 +655,45 @@ async def sniff_callback(*_): await asyncio.sleep(random.random()) return node_configs + kwargs = {} + if sniffio.current_async_library() == "asyncio": + kwargs = { + "sniff_on_start": True, + "sniff_before_requests": True, + "sniff_on_node_failure": True, + "sniff_callback": sniff_callback, + } + + print(kwargs) + t = AsyncTransport( node_configs, retry_on_status=[500], max_retries=5, node_class=AsyncDummyNode, - sniff_on_start=True, - sniff_before_requests=True, - sniff_on_node_failure=True, - sniff_callback=sniff_callback, + **kwargs, ) - loop = asyncio.get_running_loop() - start = loop.time() + start = time.monotonic() + + successful_requests = 0 async def run_requests(): - successful_requests = 0 - while loop.time() - start < 2: + nonlocal successful_requests + while time.monotonic() - start < 2: await t.perform_request("GET", "/") successful_requests += 1 return successful_requests - tasks = [loop.create_task(run_requests()) for _ in range(pool_size * 2)] - assert sum([await task for task in tasks]) >= 1000 + async with anyio.create_task_group() as tg: + for _ in range(pool_size * 2): + tg.start_soon(run_requests) + assert successful_requests >= 1000 -@pytest.mark.asyncio +@pytest.mark.anyio async def test_httpbin(httpbin_node_config): - t = AsyncTransport([httpbin_node_config]) + t = AsyncTransport([httpbin_node_config], node_class=HttpxAsyncHttpNode) resp = await t.perform_request("GET", "/anything") assert resp.meta.status == 200 assert isinstance(resp.body, dict) diff --git a/tests/async_/test_httpbin.py b/tests/async_/test_httpbin.py index f6cc747b..a3d8e6ed 100644 --- a/tests/async_/test_httpbin.py +++ b/tests/async_/test_httpbin.py @@ -20,15 +20,15 @@ import pytest -from elastic_transport import AiohttpHttpNode, AsyncTransport +from elastic_transport import HttpxAsyncHttpNode, AsyncTransport from elastic_transport._node._base import DEFAULT_USER_AGENT from ..test_httpbin import parse_httpbin -@pytest.mark.asyncio +@pytest.mark.anyio async def test_simple_request(httpbin_node_config, httpbin): - t = AsyncTransport([httpbin_node_config]) + t = AsyncTransport([httpbin_node_config], node_class=HttpxAsyncHttpNode) resp, data = await t.perform_request( "GET", @@ -59,10 +59,10 @@ async def test_simple_request(httpbin_node_config, httpbin): assert all(v == data["headers"][k] for k, v in request_headers.items()) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_node(httpbin_node_config, httpbin): def new_node(**kwargs): - return AiohttpHttpNode(dataclasses.replace(httpbin_node_config, **kwargs)) + return HttpxAsyncHttpNode(dataclasses.replace(httpbin_node_config, **kwargs)) node = new_node() resp, data = await node.perform_request("GET", "/anything") @@ -70,6 +70,8 @@ def new_node(**kwargs): parsed = parse_httpbin(data) assert parsed == { "headers": { + "Accept": "*/*", + "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Host": f"{httpbin.host}:{httpbin.port}", "User-Agent": DEFAULT_USER_AGENT, @@ -84,6 +86,7 @@ def new_node(**kwargs): parsed = parse_httpbin(data) assert parsed == { "headers": { + "Accept": "*/*", "Accept-Encoding": "gzip", "Connection": "keep-alive", "Host": f"{httpbin.host}:{httpbin.port}", @@ -98,9 +101,9 @@ def new_node(**kwargs): parsed = parse_httpbin(data) assert parsed == { "headers": { + "Accept": "*/*", "Accept-Encoding": "gzip", "Content-Encoding": "gzip", - "Content-Type": "application/octet-stream", "Content-Length": "33", "Connection": "keep-alive", "Host": f"{httpbin.host}:{httpbin.port}", @@ -120,6 +123,7 @@ def new_node(**kwargs): parsed = parse_httpbin(data) assert parsed == { "headers": { + "Accept": "*/*", "Accept-Encoding": "gzip", "Content-Encoding": "gzip", "Content-Length": "36", diff --git a/tests/node/test_http_httpx.py b/tests/node/test_http_httpx.py index ce6e7f4a..da114041 100644 --- a/tests/node/test_http_httpx.py +++ b/tests/node/test_http_httpx.py @@ -82,7 +82,7 @@ def test_ca_certs_with_verify_ssl_false_raises_error(self): ) -@pytest.mark.asyncio +@pytest.mark.anyio class TestHttpxAsyncNode: @respx.mock async def test_simple_request(self): diff --git a/tests/node/test_tls_versions.py b/tests/node/test_tls_versions.py index e687d9f6..2a188c73 100644 --- a/tests/node/test_tls_versions.py +++ b/tests/node/test_tls_versions.py @@ -28,6 +28,7 @@ RequestsHttpNode, TlsError, Urllib3HttpNode, + ConnectionError, ) from elastic_transport._compat import await_if_coro from elastic_transport.client_utils import url_to_node_config @@ -98,10 +99,15 @@ def tlsv1_1_supported() -> bool: ["url", "ssl_version"], supported_version_params, ) -@pytest.mark.asyncio -async def test_supported_tls_versions(node_class, url: str, ssl_version: int): +@pytest.mark.anyio +async def test_supported_tls_versions( + node_class, url: str, ssl_version: int, anyio_backend +): if url in (TLSv1_0_URL, TLSv1_1_URL) and not tlsv1_1_supported(): pytest.skip("TLSv1.1 isn't supported by this OpenSSL distribution") + if anyio_backend == "trio" and node_class is not HttpxAsyncHttpNode: + pytest.skip("only httpx supports trio") + node_config = url_to_node_config(url).replace(ssl_version=ssl_version) node = node_class(node_config) @@ -114,20 +120,27 @@ async def test_supported_tls_versions(node_class, url: str, ssl_version: int): ["url", "ssl_version"], unsupported_version_params, ) -@pytest.mark.asyncio -async def test_unsupported_tls_versions(node_class, url: str, ssl_version: int): +@pytest.mark.anyio +async def test_unsupported_tls_versions( + node_class, url: str, ssl_version: int, anyio_backend +): + if anyio_backend == "trio" and node_class is not HttpxAsyncHttpNode: + pytest.skip("only httpx supports trio") + node_config = url_to_node_config(url).replace(ssl_version=ssl_version) node = node_class(node_config) - with pytest.raises(TlsError) as e: + with pytest.raises((TlsError, ConnectionError)) as e: await await_if_coro(node.perform_request("GET", "/")) + if anyio_backend == "trio" and node_class is HttpxAsyncHttpNode: + return # Trio errors are not correctly bubbled up by httpx assert "unsupported protocol" in str(e.value) or "handshake failure" in str(e.value) @node_classes @pytest.mark.parametrize("ssl_version", [0, "TLSv1", object()]) def test_ssl_version_value_error(node_class, ssl_version): - with pytest.raises(ValueError) as e: + with pytest.raises((ValueError, ConnectionError)) as e: node_class(NodeConfig("https", "localhost", 9200, ssl_version=ssl_version)) assert str(e.value) == ( f"Unsupported value for 'ssl_version': {ssl_version!r}. Must be either " diff --git a/tests/test_logging.py b/tests/test_logging.py index 98e084c6..a2f939c1 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -25,6 +25,7 @@ ConnectionError, HttpHeaders, RequestsHttpNode, + HttpxAsyncHttpNode, Urllib3HttpNode, debug_logging, ) @@ -32,13 +33,17 @@ from elastic_transport._node._base import DEFAULT_USER_AGENT node_class = pytest.mark.parametrize( - "node_class", [Urllib3HttpNode, RequestsHttpNode, AiohttpHttpNode] + "node_class", + [Urllib3HttpNode, RequestsHttpNode, AiohttpHttpNode, HttpxAsyncHttpNode], ) @node_class -@pytest.mark.asyncio -async def test_debug_logging(node_class, httpbin_node_config, httpbin): +@pytest.mark.anyio +async def test_debug_logging(node_class, anyio_backend, httpbin_node_config, httpbin): + if anyio_backend == "trio" and node_class is not HttpxAsyncHttpNode: + pytest.skip("only httpx supports trio") + debug_logging() stream = io.StringIO() @@ -92,8 +97,13 @@ async def test_debug_logging(node_class, httpbin_node_config, httpbin): @node_class -@pytest.mark.asyncio -async def test_debug_logging_uncompressed_body(httpbin_node_config, node_class): +@pytest.mark.anyio +async def test_debug_logging_uncompressed_body( + httpbin_node_config, node_class, anyio_backend +): + if anyio_backend == "trio" and node_class is not HttpxAsyncHttpNode: + pytest.skip("only httpx supports trio") + debug_logging() stream = io.StringIO() logging.getLogger("elastic_transport.node").addHandler( @@ -116,8 +126,11 @@ async def test_debug_logging_uncompressed_body(httpbin_node_config, node_class): @node_class -@pytest.mark.asyncio -async def test_debug_logging_no_body(httpbin_node_config, node_class): +@pytest.mark.anyio +async def test_debug_logging_no_body(httpbin_node_config, node_class, anyio_backend): + if anyio_backend == "trio" and node_class is not HttpxAsyncHttpNode: + pytest.skip("only httpx supports trio") + debug_logging() stream = io.StringIO() logging.getLogger("elastic_transport.node").addHandler( @@ -137,8 +150,11 @@ async def test_debug_logging_no_body(httpbin_node_config, node_class): @node_class -@pytest.mark.asyncio -async def test_debug_logging_error(httpbin_node_config, node_class): +@pytest.mark.anyio +async def test_debug_logging_error(httpbin_node_config, node_class, anyio_backend): + if anyio_backend == "trio" and node_class is not HttpxAsyncHttpNode: + pytest.skip("only httpx supports trio") + debug_logging() stream = io.StringIO() logging.getLogger("elastic_transport.node").addHandler( From e8fffbc93043205953d1d5b8531f97019d0b00c1 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Mon, 22 Sep 2025 15:05:30 +0400 Subject: [PATCH 2/8] Fix lint --- elastic_transport/_async_transport.py | 3 ++- tests/async_/test_async_transport.py | 3 +-- tests/async_/test_httpbin.py | 2 +- tests/node/test_tls_versions.py | 2 +- tests/test_logging.py | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 50abb9cd..f56273bf 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -18,7 +18,6 @@ import asyncio import logging import time -import sniffio from typing import ( Any, Awaitable, @@ -32,6 +31,8 @@ Union, ) +import sniffio + from ._compat import await_if_coro from ._exceptions import ( ConnectionError, diff --git a/tests/async_/test_async_transport.py b/tests/async_/test_async_transport.py index 046507da..8868c455 100644 --- a/tests/async_/test_async_transport.py +++ b/tests/async_/test_async_transport.py @@ -25,9 +25,8 @@ from unittest import mock import anyio -import sniffio - import pytest +import sniffio from elastic_transport import ( AiohttpHttpNode, diff --git a/tests/async_/test_httpbin.py b/tests/async_/test_httpbin.py index a3d8e6ed..9113e802 100644 --- a/tests/async_/test_httpbin.py +++ b/tests/async_/test_httpbin.py @@ -20,7 +20,7 @@ import pytest -from elastic_transport import HttpxAsyncHttpNode, AsyncTransport +from elastic_transport import AsyncTransport, HttpxAsyncHttpNode from elastic_transport._node._base import DEFAULT_USER_AGENT from ..test_httpbin import parse_httpbin diff --git a/tests/node/test_tls_versions.py b/tests/node/test_tls_versions.py index 2a188c73..298e13a8 100644 --- a/tests/node/test_tls_versions.py +++ b/tests/node/test_tls_versions.py @@ -23,12 +23,12 @@ from elastic_transport import ( AiohttpHttpNode, + ConnectionError, HttpxAsyncHttpNode, NodeConfig, RequestsHttpNode, TlsError, Urllib3HttpNode, - ConnectionError, ) from elastic_transport._compat import await_if_coro from elastic_transport.client_utils import url_to_node_config diff --git a/tests/test_logging.py b/tests/test_logging.py index a2f939c1..24fd5d7c 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -24,8 +24,8 @@ AiohttpHttpNode, ConnectionError, HttpHeaders, - RequestsHttpNode, HttpxAsyncHttpNode, + RequestsHttpNode, Urllib3HttpNode, debug_logging, ) From 940d296eb2798f9d7cea005c33215f2ba7f3e897 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Mon, 22 Sep 2025 15:08:29 +0400 Subject: [PATCH 3/8] Add trio to develop dependencies --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 149033b7..280776a7 100644 --- a/setup.py +++ b/setup.py @@ -72,6 +72,7 @@ "opentelemetry-api", "opentelemetry-sdk", "orjson", + "trio", # Override Read the Docs default (sphinx<2) "sphinx>2", "furo", From 17ef8fad614e51838257c9937a719a504682137e Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 24 Sep 2025 09:52:42 +0400 Subject: [PATCH 4/8] Remove anyio from runtime dependencies --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 280776a7..208521fb 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,6 @@ "urllib3>=1.26.2, <3", "certifi", "sniffio", - "anyio", ], python_requires=">=3.8", extras_require={ @@ -72,6 +71,7 @@ "opentelemetry-api", "opentelemetry-sdk", "orjson", + "anyio", "trio", # Override Read the Docs default (sphinx<2) "sphinx>2", From 573cae16385af3a53b972e991d342273c092bf28 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 24 Sep 2025 16:38:23 +0400 Subject: [PATCH 5/8] Clarify asyncio/trio support --- elastic_transport/_node/_http_aiohttp.py | 5 ++++- elastic_transport/_node/_http_httpx.py | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/elastic_transport/_node/_http_aiohttp.py b/elastic_transport/_node/_http_aiohttp.py index 5ed17003..5fe69f7e 100644 --- a/elastic_transport/_node/_http_aiohttp.py +++ b/elastic_transport/_node/_http_aiohttp.py @@ -72,7 +72,10 @@ class RequestKwarg(TypedDict, total=False): class AiohttpHttpNode(BaseAsyncNode): - """Default asynchronous node class using the ``aiohttp`` library via HTTP""" + """Default asynchronous node class using the ``aiohttp`` library via HTTP. + + Supports asyncio. + """ _CLIENT_META_HTTP_CLIENT = ("ai", _AIOHTTP_META_VERSION) diff --git a/elastic_transport/_node/_http_httpx.py b/elastic_transport/_node/_http_httpx.py index 9ecc6c99..2ac6de26 100644 --- a/elastic_transport/_node/_http_httpx.py +++ b/elastic_transport/_node/_http_httpx.py @@ -46,6 +46,10 @@ class HttpxAsyncHttpNode(BaseAsyncNode): + """ + Async HTTP node using httpx. Supports both Trio and asyncio. + """ + _CLIENT_META_HTTP_CLIENT = ("hx", _HTTPX_META_VERSION) def __init__(self, config: NodeConfig): From 6da5464e3782d23946a1ac5ed576d7c4b74a600d Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 24 Sep 2025 18:41:20 +0400 Subject: [PATCH 6/8] Link to HTTPX issue in badssl tests --- tests/node/test_tls_versions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/node/test_tls_versions.py b/tests/node/test_tls_versions.py index 298e13a8..71f7c358 100644 --- a/tests/node/test_tls_versions.py +++ b/tests/node/test_tls_versions.py @@ -130,6 +130,8 @@ async def test_unsupported_tls_versions( node_config = url_to_node_config(url).replace(ssl_version=ssl_version) node = node_class(node_config) + # Remove ConnectionError when we have a fix or workaround for + # https://github.com/encode/httpx/discussions/3674 with pytest.raises((TlsError, ConnectionError)) as e: await await_if_coro(node.perform_request("GET", "/")) if anyio_backend == "trio" and node_class is HttpxAsyncHttpNode: @@ -140,6 +142,8 @@ async def test_unsupported_tls_versions( @node_classes @pytest.mark.parametrize("ssl_version", [0, "TLSv1", object()]) def test_ssl_version_value_error(node_class, ssl_version): + # Remove ConnectionError when we have a fix or workaround for + # https://github.com/encode/httpx/discussions/3674 with pytest.raises((ValueError, ConnectionError)) as e: node_class(NodeConfig("https", "localhost", 9200, ssl_version=ssl_version)) assert str(e.value) == ( From 76722f00cb96c507cafdd2238742a616fd5858ae Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Tue, 14 Oct 2025 13:12:59 +0400 Subject: [PATCH 7/8] Remove debug print --- tests/async_/test_async_transport.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/async_/test_async_transport.py b/tests/async_/test_async_transport.py index 8868c455..4ed25eed 100644 --- a/tests/async_/test_async_transport.py +++ b/tests/async_/test_async_transport.py @@ -663,8 +663,6 @@ async def sniff_callback(*_): "sniff_callback": sniff_callback, } - print(kwargs) - t = AsyncTransport( node_configs, retry_on_status=[500], From 83bdbf948ec34c461f4a77c3d897f1877b28876d Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Wed, 15 Oct 2025 16:18:55 +0400 Subject: [PATCH 8/8] Reverse asyncio/trio check per review feedback --- elastic_transport/_async_transport.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index f56273bf..b51120ce 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -381,9 +381,9 @@ async def perform_request( # type: ignore[override, return] ) async def sniff(self, is_initial_sniff: bool = False) -> None: # type: ignore[override] - if sniffio.current_async_library() != "asyncio": + if sniffio.current_async_library() == "trio": raise ValueError( - f"Asynchronous sniffing only works with the 'asyncio' library, got {sniffio.current_async_library}" + f"Asynchronous sniffing is not supported with the 'trio' library, got {sniffio.current_async_library}" ) await self._async_call() task = self._create_sniffing_task(is_initial_sniff) @@ -477,7 +477,7 @@ async def _async_call(self) -> None: return # Call at most once! self._async_library = sniffio.current_async_library() - if self._async_library != "asyncio": + if self._async_library == "trio": return self._loop = asyncio.get_running_loop()