From d4651f371b10bfb7ea5358c55305d3edad34494b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Fri, 13 Jun 2025 13:17:49 +0300 Subject: [PATCH 01/17] Ensure that all async generators are explicitly closed --- CHANGELOG.md | 4 ++++ httpcore/_async/connection_pool.py | 22 ++++++++++++---------- httpcore/_async/http11.py | 13 +++++++------ httpcore/_async/http2.py | 5 +++-- httpcore/_async/interfaces.py | 5 +++-- httpcore/_models.py | 15 ++++++++++----- httpcore/_utils.py | 13 +++++++++++++ tests/test_cancellations.py | 3 --- tests/test_models.py | 3 +++ 9 files changed, 55 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b43d7f01a..9f67da38e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [Unreleased] + +- Explicitly close all async generators to ensure predictable behavior + ## Version 1.0.9 (April 24th, 2025) - Resolve https://github.com/advisories/GHSA-vqfr-h8mv-ghfj with h11 dependency update. (#1008) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 96e973d0c..d8f405a7a 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -1,5 +1,7 @@ from __future__ import annotations +from collections.abc import AsyncGenerator + import ssl import sys import types @@ -10,9 +12,13 @@ from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Proxy, Request, Response from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock +from .._utils import aclosing from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface +if typing.TYPE_CHECKING: + from .http11 import HTTP11ConnectionByteStream + from .http2 import HTTP2ConnectionByteStream class AsyncPoolRequest: def __init__(self, request: Request) -> None: @@ -389,7 +395,7 @@ def __repr__(self) -> str: class PoolByteStream: def __init__( self, - stream: typing.AsyncIterable[bytes], + stream: HTTP11ConnectionByteStream | HTTP2ConnectionByteStream, pool_request: AsyncPoolRequest, pool: AsyncConnectionPool, ) -> None: @@ -398,20 +404,16 @@ def __init__( self._pool = pool self._closed = False - async def __aiter__(self) -> typing.AsyncIterator[bytes]: - try: - async for part in self._stream: - yield part - except BaseException as exc: - await self.aclose() - raise exc from None + async def __aiter__(self) -> AsyncGenerator[bytes]: + async with aclosing(self._stream.__aiter__()) as iterator: + async for chunk in iterator: + yield chunk async def aclose(self) -> None: if not self._closed: self._closed = True with AsyncShieldCancellation(): - if hasattr(self._stream, "aclose"): - await self._stream.aclose() + await self._stream.aclose() with self._pool._optional_thread_lock: self._pool._requests.remove(self._pool_request) diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index e6d6d7098..7b85f54c3 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -6,6 +6,7 @@ import time import types import typing +from collections.abc import AsyncGenerator import h11 @@ -21,6 +22,7 @@ from .._synchronization import AsyncLock, AsyncShieldCancellation from .._trace import Trace from .interfaces import AsyncConnectionInterface +from .._utils import aclosing logger = logging.getLogger("httpcore.http11") @@ -193,9 +195,7 @@ async def _receive_response_headers( return http_version, event.status_code, event.reason, headers, trailing_data - async def _receive_response_body( - self, request: Request - ) -> typing.AsyncIterator[bytes]: + async def _receive_response_body(self, request: Request) -> AsyncGenerator[bytes]: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("read", None) @@ -327,12 +327,13 @@ def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None: self._request = request self._closed = False - async def __aiter__(self) -> typing.AsyncIterator[bytes]: + async def __aiter__(self) -> AsyncGenerator[bytes]: kwargs = {"request": self._request} try: async with Trace("receive_response_body", logger, self._request, kwargs): - async for chunk in self._connection._receive_response_body(**kwargs): - yield chunk + async with aclosing(self._connection._receive_response_body(**kwargs)) as body: + async for chunk in body: + yield chunk except BaseException as exc: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index dbd0beeb4..473fc577c 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -5,6 +5,7 @@ import time import types import typing +from collections.abc import AsyncGenerator import h2.config import h2.connection @@ -308,7 +309,7 @@ async def _receive_response( async def _receive_response_body( self, request: Request, stream_id: int - ) -> typing.AsyncIterator[bytes]: + ) -> AsyncGenerator[bytes]: """ Iterator that returns the bytes of the response body for a given stream ID. """ @@ -568,7 +569,7 @@ def __init__( self._stream_id = stream_id self._closed = False - async def __aiter__(self) -> typing.AsyncIterator[bytes]: + async def __aiter__(self) -> AsyncGenerator[bytes]: kwargs = {"request": self._request, "stream_id": self._stream_id} try: async with Trace("receive_response_body", logger, self._request, kwargs): diff --git a/httpcore/_async/interfaces.py b/httpcore/_async/interfaces.py index 361583bed..55b98412e 100644 --- a/httpcore/_async/interfaces.py +++ b/httpcore/_async/interfaces.py @@ -2,6 +2,7 @@ import contextlib import typing +from collections.abc import AsyncGenerator from .._models import ( URL, @@ -56,9 +57,9 @@ async def stream( url: URL | bytes | str, *, headers: HeaderTypes = None, - content: bytes | typing.AsyncIterator[bytes] | None = None, + content: bytes | AsyncGenerator[bytes] | None = None, extensions: Extensions | None = None, - ) -> typing.AsyncIterator[Response]: + ) -> AsyncGenerator[Response]: # Strict type checking on our parameters. method = enforce_bytes(method, name="method") url = enforce_url(url, name="url") diff --git a/httpcore/_models.py b/httpcore/_models.py index 8a65f1334..b7e6dc77c 100644 --- a/httpcore/_models.py +++ b/httpcore/_models.py @@ -4,6 +4,9 @@ import ssl import typing import urllib.parse +from collections.abc import AsyncGenerator + +from ._utils import aclosing # Functions for typechecking... @@ -151,7 +154,7 @@ def __init__(self, content: bytes) -> None: def __iter__(self) -> typing.Iterator[bytes]: yield self._content - async def __aiter__(self) -> typing.AsyncIterator[bytes]: + async def __aiter__(self) -> AsyncGenerator[bytes]: yield self._content def __repr__(self) -> str: @@ -463,10 +466,11 @@ async def aread(self) -> bytes: "You should use 'response.read()' instead." ) if not hasattr(self, "_content"): - self._content = b"".join([part async for part in self.aiter_stream()]) + async with aclosing(self.aiter_stream()) as parts: + self._content = b"".join([part async for part in parts]) return self._content - async def aiter_stream(self) -> typing.AsyncIterator[bytes]: + async def aiter_stream(self) -> AsyncGenerator[bytes]: if not isinstance(self.stream, typing.AsyncIterable): # pragma: nocover raise RuntimeError( "Attempted to stream an synchronous response using 'async for ... in " @@ -479,8 +483,9 @@ async def aiter_stream(self) -> typing.AsyncIterator[bytes]: "more than once." ) self._stream_consumed = True - async for chunk in self.stream: - yield chunk + async with aclosing(self.stream) as parts: + async for chunk in parts: + yield chunk async def aclose(self) -> None: if not isinstance(self.stream, typing.AsyncIterable): # pragma: nocover diff --git a/httpcore/_utils.py b/httpcore/_utils.py index c44ff93cb..9cda24c5f 100644 --- a/httpcore/_utils.py +++ b/httpcore/_utils.py @@ -4,6 +4,19 @@ import socket import sys +if sys.version_info >= (3, 10): + from contextlib import aclosing as aclosing +else: + class aclosing(AbstractAsyncContextManager): + def __init__(self, thing): + self.thing = thing + + async def __aenter__(self): + return self.thing + + async def __aexit__(self, *exc_info): + await self.thing.aclose() + def is_socket_readable(sock: socket.socket | None) -> bool: """ diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py index 033acef60..fe8d3c911 100644 --- a/tests/test_cancellations.py +++ b/tests/test_cancellations.py @@ -171,7 +171,6 @@ async def test_h11_timeout_during_response(): assert conn.is_closed() -@pytest.mark.xfail @pytest.mark.anyio async def test_h2_timeout_during_handshake(): """ @@ -186,7 +185,6 @@ async def test_h2_timeout_during_handshake(): assert conn.is_closed() -@pytest.mark.xfail @pytest.mark.anyio async def test_h2_timeout_during_request(): """ @@ -207,7 +205,6 @@ async def test_h2_timeout_during_request(): assert conn.is_idle() -@pytest.mark.xfail @pytest.mark.anyio async def test_h2_timeout_during_response(): """ diff --git a/tests/test_models.py b/tests/test_models.py index 7dd6e419d..eb0cfa411 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -163,6 +163,9 @@ async def __aiter__(self) -> typing.AsyncIterator[bytes]: for chunk in self._chunks: yield chunk + async def aclose(self) -> None: + pass + @pytest.mark.trio async def test_response_async_read(): From 838252c4a0d1384f39950c30cb3b486a03c1029c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 18 Jun 2025 18:21:05 +0300 Subject: [PATCH 02/17] Fixed linting/formatting --- httpcore/_async/connection_pool.py | 6 +-- httpcore/_async/http11.py | 6 ++- httpcore/_utils.py | 2 + scripts/unasync.py | 65 ++++++++++++++++-------------- 4 files changed, 43 insertions(+), 36 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index d8f405a7a..37d21ec38 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -1,11 +1,10 @@ from __future__ import annotations -from collections.abc import AsyncGenerator - import ssl import sys import types import typing +from collections.abc import AsyncGenerator from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend @@ -17,8 +16,9 @@ from .interfaces import AsyncConnectionInterface, AsyncRequestInterface if typing.TYPE_CHECKING: - from .http11 import HTTP11ConnectionByteStream from .http2 import HTTP2ConnectionByteStream + from .http11 import HTTP11ConnectionByteStream + class AsyncPoolRequest: def __init__(self, request: Request) -> None: diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 7b85f54c3..e868c6c98 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -21,8 +21,8 @@ from .._models import Origin, Request, Response from .._synchronization import AsyncLock, AsyncShieldCancellation from .._trace import Trace -from .interfaces import AsyncConnectionInterface from .._utils import aclosing +from .interfaces import AsyncConnectionInterface logger = logging.getLogger("httpcore.http11") @@ -331,7 +331,9 @@ async def __aiter__(self) -> AsyncGenerator[bytes]: kwargs = {"request": self._request} try: async with Trace("receive_response_body", logger, self._request, kwargs): - async with aclosing(self._connection._receive_response_body(**kwargs)) as body: + async with aclosing( + self._connection._receive_response_body(**kwargs) + ) as body: async for chunk in body: yield chunk except BaseException as exc: diff --git a/httpcore/_utils.py b/httpcore/_utils.py index 9cda24c5f..959d834e9 100644 --- a/httpcore/_utils.py +++ b/httpcore/_utils.py @@ -7,6 +7,8 @@ if sys.version_info >= (3, 10): from contextlib import aclosing as aclosing else: + from contextlib import AbstractAsyncContextManager + class aclosing(AbstractAsyncContextManager): def __init__(self, thing): self.thing = thing diff --git a/scripts/unasync.py b/scripts/unasync.py index 5a5627d71..b6a54b96d 100644 --- a/scripts/unasync.py +++ b/scripts/unasync.py @@ -5,33 +5,36 @@ from pprint import pprint SUBS = [ - ('from .._backends.auto import AutoBackend', 'from .._backends.sync import SyncBackend'), - ('import trio as concurrency', 'from tests import concurrency'), - ('AsyncIterator', 'Iterator'), - ('Async([A-Z][A-Za-z0-9_]*)', r'\2'), - ('async def', 'def'), - ('async with', 'with'), - ('async for', 'for'), - ('await ', ''), - ('handle_async_request', 'handle_request'), - ('aclose', 'close'), - ('aiter_stream', 'iter_stream'), - ('aread', 'read'), - ('asynccontextmanager', 'contextmanager'), - ('__aenter__', '__enter__'), - ('__aexit__', '__exit__'), - ('__aiter__', '__iter__'), - ('@pytest.mark.anyio', ''), - ('@pytest.mark.trio', ''), - ('AutoBackend', 'SyncBackend'), + ( + "from .._backends.auto import AutoBackend", + "from .._backends.sync import SyncBackend", + ), + ("import trio as concurrency", "from tests import concurrency"), + ("AsyncIterator", "Iterator"), + ("Async([A-Z][A-Za-z0-9_]*)", r"\2"), + ("async def", "def"), + ("async with", "with"), + ("async for", "for"), + ("await ", ""), + ("handle_async_request", "handle_request"), + ("aclose", "close"), + ("aiter_stream", "iter_stream"), + ("aread", "read"), + ("asynccontextmanager", "contextmanager"), + ("__aenter__", "__enter__"), + ("__aexit__", "__exit__"), + ("__aiter__", "__iter__"), + ("@pytest.mark.anyio", ""), + ("@pytest.mark.trio", ""), + ("AutoBackend", "SyncBackend"), ] COMPILED_SUBS = [ - (re.compile(r'(^|\b)' + regex + r'($|\b)'), repl) - for regex, repl in SUBS + (re.compile(r"(^|\b)" + regex + r"($|\b)"), repl) for regex, repl in SUBS ] USED_SUBS = set() + def unasync_line(line): for index, (regex, repl) in enumerate(COMPILED_SUBS): old_line = line @@ -55,22 +58,22 @@ def unasync_file_check(in_path, out_path): for in_line, out_line in zip(in_file.readlines(), out_file.readlines()): expected = unasync_line(in_line) if out_line != expected: - print(f'unasync mismatch between {in_path!r} and {out_path!r}') - print(f'Async code: {in_line!r}') - print(f'Expected sync code: {expected!r}') - print(f'Actual sync code: {out_line!r}') + print(f"unasync mismatch between {in_path!r} and {out_path!r}") + print(f"Async code: {in_line!r}") + print(f"Expected sync code: {expected!r}") + print(f"Actual sync code: {out_line!r}") sys.exit(1) def unasync_dir(in_dir, out_dir, check_only=False): for dirpath, dirnames, filenames in os.walk(in_dir): for filename in filenames: - if not filename.endswith('.py'): + if not filename.endswith(".py"): continue rel_dir = os.path.relpath(dirpath, in_dir) in_path = os.path.normpath(os.path.join(in_dir, rel_dir, filename)) out_path = os.path.normpath(os.path.join(out_dir, rel_dir, filename)) - print(in_path, '->', out_path) + print(in_path, "->", out_path) if check_only: unasync_file_check(in_path, out_path) else: @@ -78,7 +81,7 @@ def unasync_dir(in_dir, out_dir, check_only=False): def main(): - check_only = '--check' in sys.argv + check_only = "--check" in sys.argv unasync_dir("httpcore/_async", "httpcore/_sync", check_only=check_only) unasync_dir("tests/_async", "tests/_sync", check_only=check_only) @@ -87,8 +90,8 @@ def main(): print("These patterns were not used:") pprint(unused_subs) - exit(1) - + exit(1) + -if __name__ == '__main__': +if __name__ == "__main__": main() From c3688d0e48d3e04268712724ddf7dd1aa34df5d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 19 Jun 2025 15:17:32 +0300 Subject: [PATCH 03/17] Updated mypy --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 880330c6f..b87315b8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ twine==6.1.0 # Tests & Linting coverage[toml]==7.5.4 ruff==0.5.0 -mypy==1.10.1 +mypy==1.16.1 trio-typing==0.10.0 pytest==8.2.2 pytest-httpbin==2.0.0 From b5abb0072014f487d3e35b9ba66aaca8b4f76107 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 19 Jun 2025 15:19:47 +0300 Subject: [PATCH 04/17] Use mypy 0.14.1 for py3.8 compat --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b87315b8f..e25fc3ad1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ twine==6.1.0 # Tests & Linting coverage[toml]==7.5.4 ruff==0.5.0 -mypy==1.16.1 +mypy==1.14.1 trio-typing==0.10.0 pytest==8.2.2 pytest-httpbin==2.0.0 From 2ee74ed7c9a82e4276c0cc95da7044eb7415d867 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 14:37:25 +0300 Subject: [PATCH 05/17] Fixed mypy errors --- httpcore/_async/connection_pool.py | 17 +++++++++-------- httpcore/_models.py | 9 +++++++-- httpcore/_utils.py | 14 ++++++++++---- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 37d21ec38..da42aa2be 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -5,20 +5,16 @@ import types import typing from collections.abc import AsyncGenerator +from contextlib import AsyncExitStack from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Proxy, Request, Response from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock -from .._utils import aclosing from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface -if typing.TYPE_CHECKING: - from .http2 import HTTP2ConnectionByteStream - from .http11 import HTTP11ConnectionByteStream - class AsyncPoolRequest: def __init__(self, request: Request) -> None: @@ -395,7 +391,7 @@ def __repr__(self) -> str: class PoolByteStream: def __init__( self, - stream: HTTP11ConnectionByteStream | HTTP2ConnectionByteStream, + stream: typing.AsyncIterable[bytes], pool_request: AsyncPoolRequest, pool: AsyncConnectionPool, ) -> None: @@ -405,7 +401,11 @@ def __init__( self._closed = False async def __aiter__(self) -> AsyncGenerator[bytes]: - async with aclosing(self._stream.__aiter__()) as iterator: + async with AsyncExitStack() as stack: + iterator = self._stream.__aiter__() + if hasattr(iterator, "aclose"): + stack.push_async_callback(iterator.aclose) + async for chunk in iterator: yield chunk @@ -413,7 +413,8 @@ async def aclose(self) -> None: if not self._closed: self._closed = True with AsyncShieldCancellation(): - await self._stream.aclose() + if hasattr(self._stream, "aclose"): + await self._stream.aclose() with self._pool._optional_thread_lock: self._pool._requests.remove(self._pool_request) diff --git a/httpcore/_models.py b/httpcore/_models.py index b7e6dc77c..caa194707 100644 --- a/httpcore/_models.py +++ b/httpcore/_models.py @@ -5,6 +5,8 @@ import typing import urllib.parse from collections.abc import AsyncGenerator +from contextlib import AsyncExitStack +from inspect import isasyncgen from ._utils import aclosing @@ -483,8 +485,11 @@ async def aiter_stream(self) -> AsyncGenerator[bytes]: "more than once." ) self._stream_consumed = True - async with aclosing(self.stream) as parts: - async for chunk in parts: + async with AsyncExitStack() as stack: + if isasyncgen(self.stream): + stack.push_async_callback(self.stream.aclose) + + async for chunk in self.stream: yield chunk async def aclose(self) -> None: diff --git a/httpcore/_utils.py b/httpcore/_utils.py index 959d834e9..f69dee6e5 100644 --- a/httpcore/_utils.py +++ b/httpcore/_utils.py @@ -8,15 +8,21 @@ from contextlib import aclosing as aclosing else: from contextlib import AbstractAsyncContextManager + from typing import Any, Awaitable, Protocol, TypeVar - class aclosing(AbstractAsyncContextManager): - def __init__(self, thing): + class _SupportsAclose(Protocol): + def aclose(self) -> Awaitable[object]: ... + + _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) + + class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]): + def __init__(self, thing: _SupportsAcloseT) -> None: self.thing = thing - async def __aenter__(self): + async def __aenter__(self) -> _SupportsAcloseT: return self.thing - async def __aexit__(self, *exc_info): + async def __aexit__(self, *exc_info: Any) -> None: await self.thing.aclose() From 9d03a6481918df5e57c7a812ea3f8ff6c2df5dbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 14:46:42 +0300 Subject: [PATCH 06/17] Ran unasync and fixed emerging problems --- httpcore/_sync/connection_pool.py | 17 ++++++++++------- httpcore/_sync/http11.py | 15 +++++++++------ httpcore/_sync/http2.py | 5 +++-- httpcore/_sync/interfaces.py | 5 +++-- scripts/unasync.py | 3 +++ 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 9ccfa53e5..2038bf3a2 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -4,6 +4,8 @@ import sys import types import typing +from collections.abc import Generator +from contextlib import ExitStack from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend @@ -398,13 +400,14 @@ def __init__( self._pool = pool self._closed = False - def __iter__(self) -> typing.Iterator[bytes]: - try: - for part in self._stream: - yield part - except BaseException as exc: - self.close() - raise exc from None + def __iter__(self) -> Generator[bytes]: + with ExitStack() as stack: + iterator = self._stream.__iter__() + if hasattr(iterator, "close"): + stack.callback(iterator.close) + + for chunk in iterator: + yield chunk def close(self) -> None: if not self._closed: diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index ebd3a9748..3a1dc8fc8 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -6,6 +6,7 @@ import time import types import typing +from collections.abc import Generator import h11 @@ -20,6 +21,7 @@ from .._models import Origin, Request, Response from .._synchronization import Lock, ShieldCancellation from .._trace import Trace +from contextlib import closing from .interfaces import ConnectionInterface logger = logging.getLogger("httpcore.http11") @@ -193,9 +195,7 @@ def _receive_response_headers( return http_version, event.status_code, event.reason, headers, trailing_data - def _receive_response_body( - self, request: Request - ) -> typing.Iterator[bytes]: + def _receive_response_body(self, request: Request) -> Generator[bytes]: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("read", None) @@ -327,12 +327,15 @@ def __init__(self, connection: HTTP11Connection, request: Request) -> None: self._request = request self._closed = False - def __iter__(self) -> typing.Iterator[bytes]: + def __iter__(self) -> Generator[bytes]: kwargs = {"request": self._request} try: with Trace("receive_response_body", logger, self._request, kwargs): - for chunk in self._connection._receive_response_body(**kwargs): - yield chunk + with closing( + self._connection._receive_response_body(**kwargs) + ) as body: + for chunk in body: + yield chunk except BaseException as exc: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index ddcc18900..e69ef7d2b 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -5,6 +5,7 @@ import time import types import typing +from collections.abc import Generator import h2.config import h2.connection @@ -308,7 +309,7 @@ def _receive_response( def _receive_response_body( self, request: Request, stream_id: int - ) -> typing.Iterator[bytes]: + ) -> Generator[bytes]: """ Iterator that returns the bytes of the response body for a given stream ID. """ @@ -568,7 +569,7 @@ def __init__( self._stream_id = stream_id self._closed = False - def __iter__(self) -> typing.Iterator[bytes]: + def __iter__(self) -> Generator[bytes]: kwargs = {"request": self._request, "stream_id": self._stream_id} try: with Trace("receive_response_body", logger, self._request, kwargs): diff --git a/httpcore/_sync/interfaces.py b/httpcore/_sync/interfaces.py index e673d4cc1..26468be87 100644 --- a/httpcore/_sync/interfaces.py +++ b/httpcore/_sync/interfaces.py @@ -2,6 +2,7 @@ import contextlib import typing +from collections.abc import Generator from .._models import ( URL, @@ -56,9 +57,9 @@ def stream( url: URL | bytes | str, *, headers: HeaderTypes = None, - content: bytes | typing.Iterator[bytes] | None = None, + content: bytes | Generator[bytes] | None = None, extensions: Extensions | None = None, - ) -> typing.Iterator[Response]: + ) -> Generator[Response]: # Strict type checking on our parameters. method = enforce_bytes(method, name="method") url = enforce_url(url, name="url") diff --git a/scripts/unasync.py b/scripts/unasync.py index b6a54b96d..ca2dc3882 100644 --- a/scripts/unasync.py +++ b/scripts/unasync.py @@ -9,6 +9,7 @@ "from .._backends.auto import AutoBackend", "from .._backends.sync import SyncBackend", ), + ("from .._utils import aclosing", "from contextlib import closing"), ("import trio as concurrency", "from tests import concurrency"), ("AsyncIterator", "Iterator"), ("Async([A-Z][A-Za-z0-9_]*)", r"\2"), @@ -21,6 +22,8 @@ ("aiter_stream", "iter_stream"), ("aread", "read"), ("asynccontextmanager", "contextmanager"), + ("push_async_callback", "callback"), + ("aclosing", "closing"), ("__aenter__", "__enter__"), ("__aexit__", "__exit__"), ("__aiter__", "__iter__"), From 239ba57ee33ed417a076d6dcfc65cd2595d76af1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 14:52:16 +0300 Subject: [PATCH 07/17] Fixed new mypy errors --- httpcore/_async/interfaces.py | 2 +- httpcore/_sync/interfaces.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/httpcore/_async/interfaces.py b/httpcore/_async/interfaces.py index 55b98412e..92859b6a6 100644 --- a/httpcore/_async/interfaces.py +++ b/httpcore/_async/interfaces.py @@ -57,7 +57,7 @@ async def stream( url: URL | bytes | str, *, headers: HeaderTypes = None, - content: bytes | AsyncGenerator[bytes] | None = None, + content: bytes | typing.AsyncIterator[bytes] | None = None, extensions: Extensions | None = None, ) -> AsyncGenerator[Response]: # Strict type checking on our parameters. diff --git a/httpcore/_sync/interfaces.py b/httpcore/_sync/interfaces.py index 26468be87..130cd532a 100644 --- a/httpcore/_sync/interfaces.py +++ b/httpcore/_sync/interfaces.py @@ -57,7 +57,7 @@ def stream( url: URL | bytes | str, *, headers: HeaderTypes = None, - content: bytes | Generator[bytes] | None = None, + content: bytes | typing.Iterator[bytes] | None = None, extensions: Extensions | None = None, ) -> Generator[Response]: # Strict type checking on our parameters. From c52711dcf83c9c0efcbafbc5b18ce0221e63f040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 14:56:27 +0300 Subject: [PATCH 08/17] Fixed issue on Python 3.8 --- httpcore/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httpcore/_utils.py b/httpcore/_utils.py index f69dee6e5..9900ff482 100644 --- a/httpcore/_utils.py +++ b/httpcore/_utils.py @@ -15,7 +15,7 @@ def aclose(self) -> Awaitable[object]: ... _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) - class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]): + class aclosing(AbstractAsyncContextManager): def __init__(self, thing: _SupportsAcloseT) -> None: self.thing = thing From ca3d6d0de6e6de487c39d3017c857a9b15fe0066 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 15:09:35 +0300 Subject: [PATCH 09/17] Dropped Python 3.8 support and added Python 3.13 to CI matrix --- .github/workflows/test-suite.yml | 2 +- CHANGELOG.md | 1 + httpcore/_utils.py | 2 +- pyproject.toml | 5 +++-- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index c3ad08f14..206e27f6a 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: "actions/checkout@v4" diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f67da38e..49ca238b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] +- Drop Python 3.8 support - Explicitly close all async generators to ensure predictable behavior ## Version 1.0.9 (April 24th, 2025) diff --git a/httpcore/_utils.py b/httpcore/_utils.py index 9900ff482..f69dee6e5 100644 --- a/httpcore/_utils.py +++ b/httpcore/_utils.py @@ -15,7 +15,7 @@ def aclose(self) -> Awaitable[object]: ... _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) - class aclosing(AbstractAsyncContextManager): + class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]): def __init__(self, thing: _SupportsAcloseT) -> None: self.thing = thing diff --git a/pyproject.toml b/pyproject.toml index 1bdd99eb9..fe143b2e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "httpcore" dynamic = ["readme", "version"] description = "A minimal low-level HTTP client." license = "BSD-3-Clause" -requires-python = ">=3.8" +requires-python = ">=3.9" authors = [ { name = "Tom Christie", email = "tom@tomchristie.com" }, ] @@ -26,6 +26,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Topic :: Internet :: WWW/HTTP", ] dependencies = [ @@ -96,7 +97,7 @@ filterwarnings = ["error"] [tool.coverage.run] omit = [ - "venv/*", + "venv/*", "httpcore/_sync/*" ] include = ["httpcore/*", "tests/*"] From 7db7d9be6fc0fc134cff7cc426ea3ae9f4488a5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 16:14:03 +0300 Subject: [PATCH 10/17] Fixed coverage problems --- httpcore/_models.py | 7 ++++--- httpcore/_utils.py | 4 ++-- tests/test_models.py | 3 --- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/httpcore/_models.py b/httpcore/_models.py index caa194707..46301e149 100644 --- a/httpcore/_models.py +++ b/httpcore/_models.py @@ -486,10 +486,11 @@ async def aiter_stream(self) -> AsyncGenerator[bytes]: ) self._stream_consumed = True async with AsyncExitStack() as stack: - if isasyncgen(self.stream): - stack.push_async_callback(self.stream.aclose) + iterator = self.stream.__aiter__() + if isasyncgen(iterator): + stack.push_async_callback(iterator.aclose) - async for chunk in self.stream: + async for chunk in iterator: yield chunk async def aclose(self) -> None: diff --git a/httpcore/_utils.py b/httpcore/_utils.py index f69dee6e5..4c57a1415 100644 --- a/httpcore/_utils.py +++ b/httpcore/_utils.py @@ -4,9 +4,9 @@ import socket import sys -if sys.version_info >= (3, 10): +if sys.version_info >= (3, 10): # pragma: no cover from contextlib import aclosing as aclosing -else: +else: # pragma: no cover from contextlib import AbstractAsyncContextManager from typing import Any, Awaitable, Protocol, TypeVar diff --git a/tests/test_models.py b/tests/test_models.py index eb0cfa411..7dd6e419d 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -163,9 +163,6 @@ async def __aiter__(self) -> typing.AsyncIterator[bytes]: for chunk in self._chunks: yield chunk - async def aclose(self) -> None: - pass - @pytest.mark.trio async def test_response_async_read(): From d090e3588dd126e2a1a733d4d92ab95a2bcee5bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 16:16:35 +0300 Subject: [PATCH 11/17] Added Python 3.14 to test matrix and updated to latest setup-python --- .github/workflows/test-suite.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 206e27f6a..eb8c8a670 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -14,11 +14,11 @@ jobs: strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] steps: - uses: "actions/checkout@v4" - - uses: "actions/setup-python@v4" + - uses: "actions/setup-python@v5" with: python-version: "${{ matrix.python-version }}" allow-prereleases: true From 55eaf401b8d20be2293bb409576ea671835d6d60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 23 Jun 2025 16:27:36 +0300 Subject: [PATCH 12/17] Removed Python 3.14 from the test matrix --- .github/workflows/test-suite.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index eb8c8a670..14567a26e 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: "actions/checkout@v4" From 04adadc962b6fa6413e3849200cd35c2b233566e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 25 Jun 2025 23:50:30 +0300 Subject: [PATCH 13/17] Covered the rest of cases where "async for" is used --- httpcore/_async/http11.py | 17 ++++++++++++----- httpcore/_async/http2.py | 24 ++++++++++++++++++------ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index e868c6c98..446be4601 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -7,6 +7,8 @@ import types import typing from collections.abc import AsyncGenerator +from contextlib import AsyncExitStack +from inspect import isasyncgen import h11 @@ -156,9 +158,14 @@ async def _send_request_body(self, request: Request) -> None: timeout = timeouts.get("write", None) assert isinstance(request.stream, typing.AsyncIterable) - async for chunk in request.stream: - event = h11.Data(data=chunk) - await self._send_event(event, timeout=timeout) + async with AsyncExitStack() as stack: + iterator = request.stream.__aiter__() + if isasyncgen(iterator): + stack.push_async_callback(iterator.aclose) + + async for chunk in iterator: + event = h11.Data(data=chunk) + await self._send_event(event, timeout=timeout) await self._send_event(h11.EndOfMessage(), timeout=timeout) @@ -333,8 +340,8 @@ async def __aiter__(self) -> AsyncGenerator[bytes]: async with Trace("receive_response_body", logger, self._request, kwargs): async with aclosing( self._connection._receive_response_body(**kwargs) - ) as body: - async for chunk in body: + ) as iterator: + async for chunk in iterator: yield chunk except BaseException as exc: # If we get an exception while streaming the response, diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 473fc577c..1cd66dcc9 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -6,6 +6,8 @@ import types import typing from collections.abc import AsyncGenerator +from contextlib import AsyncExitStack +from inspect import isasyncgen import h2.config import h2.connection @@ -22,6 +24,7 @@ from .._models import Origin, Request, Response from .._synchronization import AsyncLock, AsyncSemaphore, AsyncShieldCancellation from .._trace import Trace +from .._utils import aclosing from .interfaces import AsyncConnectionInterface logger = logging.getLogger("httpcore.http2") @@ -259,8 +262,14 @@ async def _send_request_body(self, request: Request, stream_id: int) -> None: return assert isinstance(request.stream, typing.AsyncIterable) - async for data in request.stream: - await self._send_stream_data(request, stream_id, data) + async with AsyncExitStack() as stack: + iterator = request.stream.__aiter__() + if isasyncgen(iterator): + stack.push_async_callback(iterator.aclose) + + async for chunk in iterator: + await self._send_stream_data(request, stream_id, chunk) + await self._send_end_stream(request, stream_id) async def _send_stream_data( @@ -573,10 +582,13 @@ async def __aiter__(self) -> AsyncGenerator[bytes]: kwargs = {"request": self._request, "stream_id": self._stream_id} try: async with Trace("receive_response_body", logger, self._request, kwargs): - async for chunk in self._connection._receive_response_body( - request=self._request, stream_id=self._stream_id - ): - yield chunk + async with aclosing( + self._connection._receive_response_body( + request=self._request, stream_id=self._stream_id + ) + ) as iterator: + async for chunk in iterator: + yield chunk except BaseException as exc: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) From bcf7e9eacd2b14d762e44063555d9173741c246c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 25 Jun 2025 23:53:46 +0300 Subject: [PATCH 14/17] Ran unasync again --- httpcore/_sync/http11.py | 17 ++++++++++++----- httpcore/_sync/http2.py | 24 ++++++++++++++++++------ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index 3a1dc8fc8..57a575dec 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -7,6 +7,8 @@ import types import typing from collections.abc import Generator +from contextlib import ExitStack +from inspect import isasyncgen import h11 @@ -156,9 +158,14 @@ def _send_request_body(self, request: Request) -> None: timeout = timeouts.get("write", None) assert isinstance(request.stream, typing.Iterable) - for chunk in request.stream: - event = h11.Data(data=chunk) - self._send_event(event, timeout=timeout) + with ExitStack() as stack: + iterator = request.stream.__iter__() + if isasyncgen(iterator): + stack.callback(iterator.close) + + for chunk in iterator: + event = h11.Data(data=chunk) + self._send_event(event, timeout=timeout) self._send_event(h11.EndOfMessage(), timeout=timeout) @@ -333,8 +340,8 @@ def __iter__(self) -> Generator[bytes]: with Trace("receive_response_body", logger, self._request, kwargs): with closing( self._connection._receive_response_body(**kwargs) - ) as body: - for chunk in body: + ) as iterator: + for chunk in iterator: yield chunk except BaseException as exc: # If we get an exception while streaming the response, diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index e69ef7d2b..db7621b2f 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -6,6 +6,8 @@ import types import typing from collections.abc import Generator +from contextlib import ExitStack +from inspect import isasyncgen import h2.config import h2.connection @@ -22,6 +24,7 @@ from .._models import Origin, Request, Response from .._synchronization import Lock, Semaphore, ShieldCancellation from .._trace import Trace +from contextlib import closing from .interfaces import ConnectionInterface logger = logging.getLogger("httpcore.http2") @@ -259,8 +262,14 @@ def _send_request_body(self, request: Request, stream_id: int) -> None: return assert isinstance(request.stream, typing.Iterable) - for data in request.stream: - self._send_stream_data(request, stream_id, data) + with ExitStack() as stack: + iterator = request.stream.__iter__() + if isasyncgen(iterator): + stack.callback(iterator.close) + + for chunk in iterator: + self._send_stream_data(request, stream_id, chunk) + self._send_end_stream(request, stream_id) def _send_stream_data( @@ -573,10 +582,13 @@ def __iter__(self) -> Generator[bytes]: kwargs = {"request": self._request, "stream_id": self._stream_id} try: with Trace("receive_response_body", logger, self._request, kwargs): - for chunk in self._connection._receive_response_body( - request=self._request, stream_id=self._stream_id - ): - yield chunk + with closing( + self._connection._receive_response_body( + request=self._request, stream_id=self._stream_id + ) + ) as iterator: + for chunk in iterator: + yield chunk except BaseException as exc: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) From 2e528d3e84c6d03c9a39eb8846d70d4675038bb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 26 Jun 2025 18:22:08 +0300 Subject: [PATCH 15/17] Replaced AsyncExitStack with safe_async_iterate() --- httpcore/_async/connection_pool.py | 8 +--- httpcore/_async/http11.py | 4 +- httpcore/_async/http2.py | 4 +- httpcore/_models.py | 4 +- httpcore/_sync/connection_pool.py | 8 +--- httpcore/_sync/http11.py | 6 +-- httpcore/_sync/http2.py | 6 +-- httpcore/_utils.py | 61 ++++++++++++++++++++---------- scripts/unasync.py | 4 +- 9 files changed, 58 insertions(+), 47 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index da42aa2be..f95b7186d 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -5,13 +5,13 @@ import types import typing from collections.abc import AsyncGenerator -from contextlib import AsyncExitStack from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Proxy, Request, Response from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock +from .._utils import safe_async_iterate from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface @@ -401,11 +401,7 @@ def __init__( self._closed = False async def __aiter__(self) -> AsyncGenerator[bytes]: - async with AsyncExitStack() as stack: - iterator = self._stream.__aiter__() - if hasattr(iterator, "aclose"): - stack.push_async_callback(iterator.aclose) - + async with safe_async_iterate(self._stream) as iterator: async for chunk in iterator: yield chunk diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 446be4601..32de6fe05 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -23,7 +23,7 @@ from .._models import Origin, Request, Response from .._synchronization import AsyncLock, AsyncShieldCancellation from .._trace import Trace -from .._utils import aclosing +from .._utils import safe_async_iterate from .interfaces import AsyncConnectionInterface logger = logging.getLogger("httpcore.http11") @@ -338,7 +338,7 @@ async def __aiter__(self) -> AsyncGenerator[bytes]: kwargs = {"request": self._request} try: async with Trace("receive_response_body", logger, self._request, kwargs): - async with aclosing( + async with safe_async_iterate( self._connection._receive_response_body(**kwargs) ) as iterator: async for chunk in iterator: diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 1cd66dcc9..d989c9978 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -24,7 +24,7 @@ from .._models import Origin, Request, Response from .._synchronization import AsyncLock, AsyncSemaphore, AsyncShieldCancellation from .._trace import Trace -from .._utils import aclosing +from .._utils import safe_async_iterate from .interfaces import AsyncConnectionInterface logger = logging.getLogger("httpcore.http2") @@ -582,7 +582,7 @@ async def __aiter__(self) -> AsyncGenerator[bytes]: kwargs = {"request": self._request, "stream_id": self._stream_id} try: async with Trace("receive_response_body", logger, self._request, kwargs): - async with aclosing( + async with safe_async_iterate( self._connection._receive_response_body( request=self._request, stream_id=self._stream_id ) diff --git a/httpcore/_models.py b/httpcore/_models.py index 46301e149..b4b95108d 100644 --- a/httpcore/_models.py +++ b/httpcore/_models.py @@ -8,7 +8,7 @@ from contextlib import AsyncExitStack from inspect import isasyncgen -from ._utils import aclosing +from ._utils import safe_async_iterate # Functions for typechecking... @@ -468,7 +468,7 @@ async def aread(self) -> bytes: "You should use 'response.read()' instead." ) if not hasattr(self, "_content"): - async with aclosing(self.aiter_stream()) as parts: + async with safe_async_iterate(self.aiter_stream()) as parts: self._content = b"".join([part async for part in parts]) return self._content diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 2038bf3a2..32c57ebc8 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -5,13 +5,13 @@ import types import typing from collections.abc import Generator -from contextlib import ExitStack from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Proxy, Request, Response from .._synchronization import Event, ShieldCancellation, ThreadLock +from .._utils import safe_iterate from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface @@ -401,11 +401,7 @@ def __init__( self._closed = False def __iter__(self) -> Generator[bytes]: - with ExitStack() as stack: - iterator = self._stream.__iter__() - if hasattr(iterator, "close"): - stack.callback(iterator.close) - + with safe_iterate(self._stream) as iterator: for chunk in iterator: yield chunk diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index 57a575dec..0f42aaf4f 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -23,7 +23,7 @@ from .._models import Origin, Request, Response from .._synchronization import Lock, ShieldCancellation from .._trace import Trace -from contextlib import closing +from .._utils import safe_iterate from .interfaces import ConnectionInterface logger = logging.getLogger("httpcore.http11") @@ -161,7 +161,7 @@ def _send_request_body(self, request: Request) -> None: with ExitStack() as stack: iterator = request.stream.__iter__() if isasyncgen(iterator): - stack.callback(iterator.close) + stack.push_async_callback(iterator.close) for chunk in iterator: event = h11.Data(data=chunk) @@ -338,7 +338,7 @@ def __iter__(self) -> Generator[bytes]: kwargs = {"request": self._request} try: with Trace("receive_response_body", logger, self._request, kwargs): - with closing( + with safe_iterate( self._connection._receive_response_body(**kwargs) ) as iterator: for chunk in iterator: diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index db7621b2f..a46b71e7d 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -24,7 +24,7 @@ from .._models import Origin, Request, Response from .._synchronization import Lock, Semaphore, ShieldCancellation from .._trace import Trace -from contextlib import closing +from .._utils import safe_iterate from .interfaces import ConnectionInterface logger = logging.getLogger("httpcore.http2") @@ -265,7 +265,7 @@ def _send_request_body(self, request: Request, stream_id: int) -> None: with ExitStack() as stack: iterator = request.stream.__iter__() if isasyncgen(iterator): - stack.callback(iterator.close) + stack.push_async_callback(iterator.close) for chunk in iterator: self._send_stream_data(request, stream_id, chunk) @@ -582,7 +582,7 @@ def __iter__(self) -> Generator[bytes]: kwargs = {"request": self._request, "stream_id": self._stream_id} try: with Trace("receive_response_body", logger, self._request, kwargs): - with closing( + with safe_iterate( self._connection._receive_response_body( request=self._request, stream_id=self._stream_id ) diff --git a/httpcore/_utils.py b/httpcore/_utils.py index 4c57a1415..6951457df 100644 --- a/httpcore/_utils.py +++ b/httpcore/_utils.py @@ -3,27 +3,19 @@ import select import socket import sys +import typing +from collections.abc import ( + AsyncGenerator, + AsyncIterable, + AsyncIterator, + Generator, + Iterable, + Iterator, +) +from contextlib import asynccontextmanager, contextmanager +from inspect import isasyncgen -if sys.version_info >= (3, 10): # pragma: no cover - from contextlib import aclosing as aclosing -else: # pragma: no cover - from contextlib import AbstractAsyncContextManager - from typing import Any, Awaitable, Protocol, TypeVar - - class _SupportsAclose(Protocol): - def aclose(self) -> Awaitable[object]: ... - - _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) - - class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]): - def __init__(self, thing: _SupportsAcloseT) -> None: - self.thing = thing - - async def __aenter__(self) -> _SupportsAcloseT: - return self.thing - - async def __aexit__(self, *exc_info: Any) -> None: - await self.thing.aclose() +T = typing.TypeVar("T") def is_socket_readable(sock: socket.socket | None) -> bool: @@ -56,3 +48,32 @@ def is_socket_readable(sock: socket.socket | None) -> bool: p = select.poll() p.register(sock_fd, select.POLLIN) return bool(p.poll(0)) + + +@asynccontextmanager +async def safe_async_iterate( + iterable_or_iterator: AsyncIterable[T] | AsyncIterator[T], / +) -> AsyncGenerator[AsyncIterator[T]]: + iterator = ( + iterable_or_iterator + if isinstance(iterable_or_iterator, AsyncIterator) + else iterable_or_iterator.__aiter__() + ) + try: + yield iterator + finally: + if isasyncgen(iterator): + await iterator.aclose() + + +@contextmanager +def safe_iterate( + iterable_or_iterator: Iterable[T] | Iterator[T], / +) -> Generator[Iterator[T], None, None]: + # This is boilerplate code, only needed to make unasync happy + iterator = ( + iterable_or_iterator + if isinstance(iterable_or_iterator, Iterator) + else iterable_or_iterator.__iter__() + ) + yield iterator diff --git a/scripts/unasync.py b/scripts/unasync.py index ca2dc3882..4dc173576 100644 --- a/scripts/unasync.py +++ b/scripts/unasync.py @@ -9,7 +9,6 @@ "from .._backends.auto import AutoBackend", "from .._backends.sync import SyncBackend", ), - ("from .._utils import aclosing", "from contextlib import closing"), ("import trio as concurrency", "from tests import concurrency"), ("AsyncIterator", "Iterator"), ("Async([A-Z][A-Za-z0-9_]*)", r"\2"), @@ -22,8 +21,7 @@ ("aiter_stream", "iter_stream"), ("aread", "read"), ("asynccontextmanager", "contextmanager"), - ("push_async_callback", "callback"), - ("aclosing", "closing"), + ("safe_async_iterate", "safe_iterate"), ("__aenter__", "__enter__"), ("__aexit__", "__exit__"), ("__aiter__", "__iter__"), From 2f242e782bfe1c87a86db6b2ca02e2932fa8f567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 26 Jun 2025 18:26:01 +0300 Subject: [PATCH 16/17] Replaced remaining AsyncExitStack uses --- httpcore/_async/http11.py | 8 +------- httpcore/_async/http2.py | 8 +------- httpcore/_models.py | 8 +------- httpcore/_sync/http11.py | 8 +------- httpcore/_sync/http2.py | 8 +------- 5 files changed, 5 insertions(+), 35 deletions(-) diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 32de6fe05..20eb66188 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -7,8 +7,6 @@ import types import typing from collections.abc import AsyncGenerator -from contextlib import AsyncExitStack -from inspect import isasyncgen import h11 @@ -158,11 +156,7 @@ async def _send_request_body(self, request: Request) -> None: timeout = timeouts.get("write", None) assert isinstance(request.stream, typing.AsyncIterable) - async with AsyncExitStack() as stack: - iterator = request.stream.__aiter__() - if isasyncgen(iterator): - stack.push_async_callback(iterator.aclose) - + async with safe_async_iterate(request.stream) as iterator: async for chunk in iterator: event = h11.Data(data=chunk) await self._send_event(event, timeout=timeout) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index d989c9978..4e9d8fb5b 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -6,8 +6,6 @@ import types import typing from collections.abc import AsyncGenerator -from contextlib import AsyncExitStack -from inspect import isasyncgen import h2.config import h2.connection @@ -262,11 +260,7 @@ async def _send_request_body(self, request: Request, stream_id: int) -> None: return assert isinstance(request.stream, typing.AsyncIterable) - async with AsyncExitStack() as stack: - iterator = request.stream.__aiter__() - if isasyncgen(iterator): - stack.push_async_callback(iterator.aclose) - + async with safe_async_iterate(request.stream) as iterator: async for chunk in iterator: await self._send_stream_data(request, stream_id, chunk) diff --git a/httpcore/_models.py b/httpcore/_models.py index b4b95108d..cc5403f79 100644 --- a/httpcore/_models.py +++ b/httpcore/_models.py @@ -5,8 +5,6 @@ import typing import urllib.parse from collections.abc import AsyncGenerator -from contextlib import AsyncExitStack -from inspect import isasyncgen from ._utils import safe_async_iterate @@ -485,11 +483,7 @@ async def aiter_stream(self) -> AsyncGenerator[bytes]: "more than once." ) self._stream_consumed = True - async with AsyncExitStack() as stack: - iterator = self.stream.__aiter__() - if isasyncgen(iterator): - stack.push_async_callback(iterator.aclose) - + async with safe_async_iterate(self.stream) as iterator: async for chunk in iterator: yield chunk diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index 0f42aaf4f..a6763c5ff 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -7,8 +7,6 @@ import types import typing from collections.abc import Generator -from contextlib import ExitStack -from inspect import isasyncgen import h11 @@ -158,11 +156,7 @@ def _send_request_body(self, request: Request) -> None: timeout = timeouts.get("write", None) assert isinstance(request.stream, typing.Iterable) - with ExitStack() as stack: - iterator = request.stream.__iter__() - if isasyncgen(iterator): - stack.push_async_callback(iterator.close) - + with safe_iterate(request.stream) as iterator: for chunk in iterator: event = h11.Data(data=chunk) self._send_event(event, timeout=timeout) diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index a46b71e7d..238f805ec 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -6,8 +6,6 @@ import types import typing from collections.abc import Generator -from contextlib import ExitStack -from inspect import isasyncgen import h2.config import h2.connection @@ -262,11 +260,7 @@ def _send_request_body(self, request: Request, stream_id: int) -> None: return assert isinstance(request.stream, typing.Iterable) - with ExitStack() as stack: - iterator = request.stream.__iter__() - if isasyncgen(iterator): - stack.push_async_callback(iterator.close) - + with safe_iterate(request.stream) as iterator: for chunk in iterator: self._send_stream_data(request, stream_id, chunk) From 01cdac7ccf1d7e7b3685d521b5fb5e890c78b3d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 15 Jul 2025 02:00:32 +0300 Subject: [PATCH 17/17] Restored newer mypy --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index e25fc3ad1..b87315b8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ twine==6.1.0 # Tests & Linting coverage[toml]==7.5.4 ruff==0.5.0 -mypy==1.14.1 +mypy==1.16.1 trio-typing==0.10.0 pytest==8.2.2 pytest-httpbin==2.0.0