Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ jobs:

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: "actions/checkout@v4"
- uses: "actions/setup-python@v4"
- uses: "actions/setup-python@v5"
with:
python-version: "${{ matrix.python-version }}"
allow-prereleases: true
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Unreleased]

- Drop Python 3.8 support
- Explicitly close all async generators to ensure predictable behavior

## Version 1.0.9 (April 24th, 2025)

- Resolve https://github.com/advisories/GHSA-vqfr-h8mv-ghfj with h11 dependency update. (#1008)
Expand Down
13 changes: 6 additions & 7 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import sys
import types
import typing
from collections.abc import AsyncGenerator

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Proxy, Request, Response
from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
from .._utils import safe_async_iterate
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

Expand Down Expand Up @@ -398,13 +400,10 @@ def __init__(
self._pool = pool
self._closed = False

async def __aiter__(self) -> typing.AsyncIterator[bytes]:
try:
async for part in self._stream:
yield part
except BaseException as exc:
await self.aclose()
raise exc from None
async def __aiter__(self) -> AsyncGenerator[bytes]:
async with safe_async_iterate(self._stream) as iterator:
async for chunk in iterator:
yield chunk

async def aclose(self) -> None:
if not self._closed:
Expand Down
22 changes: 13 additions & 9 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import types
import typing
from collections.abc import AsyncGenerator

import h11

Expand All @@ -20,6 +21,7 @@
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncShieldCancellation
from .._trace import Trace
from .._utils import safe_async_iterate
from .interfaces import AsyncConnectionInterface

logger = logging.getLogger("httpcore.http11")
Expand Down Expand Up @@ -154,9 +156,10 @@ async def _send_request_body(self, request: Request) -> None:
timeout = timeouts.get("write", None)

assert isinstance(request.stream, typing.AsyncIterable)
async for chunk in request.stream:
event = h11.Data(data=chunk)
await self._send_event(event, timeout=timeout)
async with safe_async_iterate(request.stream) as iterator:
async for chunk in iterator:
event = h11.Data(data=chunk)
await self._send_event(event, timeout=timeout)

await self._send_event(h11.EndOfMessage(), timeout=timeout)

Expand Down Expand Up @@ -193,9 +196,7 @@ async def _receive_response_headers(

return http_version, event.status_code, event.reason, headers, trailing_data

async def _receive_response_body(
self, request: Request
) -> typing.AsyncIterator[bytes]:
async def _receive_response_body(self, request: Request) -> AsyncGenerator[bytes]:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("read", None)

Expand Down Expand Up @@ -327,12 +328,15 @@ def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None:
self._request = request
self._closed = False

async def __aiter__(self) -> typing.AsyncIterator[bytes]:
async def __aiter__(self) -> AsyncGenerator[bytes]:
kwargs = {"request": self._request}
try:
async with Trace("receive_response_body", logger, self._request, kwargs):
async for chunk in self._connection._receive_response_body(**kwargs):
yield chunk
async with safe_async_iterate(
self._connection._receive_response_body(**kwargs)
) as iterator:
async for chunk in iterator:
yield chunk
except BaseException as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
Expand Down
23 changes: 15 additions & 8 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import types
import typing
from collections.abc import AsyncGenerator

import h2.config
import h2.connection
Expand All @@ -21,6 +22,7 @@
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncSemaphore, AsyncShieldCancellation
from .._trace import Trace
from .._utils import safe_async_iterate
from .interfaces import AsyncConnectionInterface

logger = logging.getLogger("httpcore.http2")
Expand Down Expand Up @@ -258,8 +260,10 @@ async def _send_request_body(self, request: Request, stream_id: int) -> None:
return

assert isinstance(request.stream, typing.AsyncIterable)
async for data in request.stream:
await self._send_stream_data(request, stream_id, data)
async with safe_async_iterate(request.stream) as iterator:
async for chunk in iterator:
await self._send_stream_data(request, stream_id, chunk)

await self._send_end_stream(request, stream_id)

async def _send_stream_data(
Expand Down Expand Up @@ -308,7 +312,7 @@ async def _receive_response(

async def _receive_response_body(
self, request: Request, stream_id: int
) -> typing.AsyncIterator[bytes]:
) -> AsyncGenerator[bytes]:
"""
Iterator that returns the bytes of the response body for a given stream ID.
"""
Expand Down Expand Up @@ -568,14 +572,17 @@ def __init__(
self._stream_id = stream_id
self._closed = False

async def __aiter__(self) -> typing.AsyncIterator[bytes]:
async def __aiter__(self) -> AsyncGenerator[bytes]:
kwargs = {"request": self._request, "stream_id": self._stream_id}
try:
async with Trace("receive_response_body", logger, self._request, kwargs):
async for chunk in self._connection._receive_response_body(
request=self._request, stream_id=self._stream_id
):
yield chunk
async with safe_async_iterate(
self._connection._receive_response_body(
request=self._request, stream_id=self._stream_id
)
) as iterator:
async for chunk in iterator:
yield chunk
except BaseException as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
Expand Down
3 changes: 2 additions & 1 deletion httpcore/_async/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import contextlib
import typing
from collections.abc import AsyncGenerator

from .._models import (
URL,
Expand Down Expand Up @@ -58,7 +59,7 @@ async def stream(
headers: HeaderTypes = None,
content: bytes | typing.AsyncIterator[bytes] | None = None,
extensions: Extensions | None = None,
) -> typing.AsyncIterator[Response]:
) -> AsyncGenerator[Response]:
# Strict type checking on our parameters.
method = enforce_bytes(method, name="method")
url = enforce_url(url, name="url")
Expand Down
15 changes: 10 additions & 5 deletions httpcore/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import ssl
import typing
import urllib.parse
from collections.abc import AsyncGenerator

from ._utils import safe_async_iterate

# Functions for typechecking...

Expand Down Expand Up @@ -151,7 +154,7 @@ def __init__(self, content: bytes) -> None:
def __iter__(self) -> typing.Iterator[bytes]:
yield self._content

async def __aiter__(self) -> typing.AsyncIterator[bytes]:
async def __aiter__(self) -> AsyncGenerator[bytes]:
yield self._content

def __repr__(self) -> str:
Expand Down Expand Up @@ -463,10 +466,11 @@ async def aread(self) -> bytes:
"You should use 'response.read()' instead."
)
if not hasattr(self, "_content"):
self._content = b"".join([part async for part in self.aiter_stream()])
async with safe_async_iterate(self.aiter_stream()) as parts:
self._content = b"".join([part async for part in parts])
return self._content

async def aiter_stream(self) -> typing.AsyncIterator[bytes]:
async def aiter_stream(self) -> AsyncGenerator[bytes]:
if not isinstance(self.stream, typing.AsyncIterable): # pragma: nocover
raise RuntimeError(
"Attempted to stream an synchronous response using 'async for ... in "
Expand All @@ -479,8 +483,9 @@ async def aiter_stream(self) -> typing.AsyncIterator[bytes]:
"more than once."
)
self._stream_consumed = True
async for chunk in self.stream:
yield chunk
async with safe_async_iterate(self.stream) as iterator:
async for chunk in iterator:
yield chunk

async def aclose(self) -> None:
if not isinstance(self.stream, typing.AsyncIterable): # pragma: nocover
Expand Down
13 changes: 6 additions & 7 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import sys
import types
import typing
from collections.abc import Generator

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Proxy, Request, Response
from .._synchronization import Event, ShieldCancellation, ThreadLock
from .._utils import safe_iterate
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

Expand Down Expand Up @@ -398,13 +400,10 @@ def __init__(
self._pool = pool
self._closed = False

def __iter__(self) -> typing.Iterator[bytes]:
try:
for part in self._stream:
yield part
except BaseException as exc:
self.close()
raise exc from None
def __iter__(self) -> Generator[bytes]:
with safe_iterate(self._stream) as iterator:
for chunk in iterator:
yield chunk

def close(self) -> None:
if not self._closed:
Expand Down
22 changes: 13 additions & 9 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import types
import typing
from collections.abc import Generator

import h11

Expand All @@ -20,6 +21,7 @@
from .._models import Origin, Request, Response
from .._synchronization import Lock, ShieldCancellation
from .._trace import Trace
from .._utils import safe_iterate
from .interfaces import ConnectionInterface

logger = logging.getLogger("httpcore.http11")
Expand Down Expand Up @@ -154,9 +156,10 @@ def _send_request_body(self, request: Request) -> None:
timeout = timeouts.get("write", None)

assert isinstance(request.stream, typing.Iterable)
for chunk in request.stream:
event = h11.Data(data=chunk)
self._send_event(event, timeout=timeout)
with safe_iterate(request.stream) as iterator:
for chunk in iterator:
event = h11.Data(data=chunk)
self._send_event(event, timeout=timeout)

self._send_event(h11.EndOfMessage(), timeout=timeout)

Expand Down Expand Up @@ -193,9 +196,7 @@ def _receive_response_headers(

return http_version, event.status_code, event.reason, headers, trailing_data

def _receive_response_body(
self, request: Request
) -> typing.Iterator[bytes]:
def _receive_response_body(self, request: Request) -> Generator[bytes]:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("read", None)

Expand Down Expand Up @@ -327,12 +328,15 @@ def __init__(self, connection: HTTP11Connection, request: Request) -> None:
self._request = request
self._closed = False

def __iter__(self) -> typing.Iterator[bytes]:
def __iter__(self) -> Generator[bytes]:
kwargs = {"request": self._request}
try:
with Trace("receive_response_body", logger, self._request, kwargs):
for chunk in self._connection._receive_response_body(**kwargs):
yield chunk
with safe_iterate(
self._connection._receive_response_body(**kwargs)
) as iterator:
for chunk in iterator:
yield chunk
except BaseException as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
Expand Down
23 changes: 15 additions & 8 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import types
import typing
from collections.abc import Generator

import h2.config
import h2.connection
Expand All @@ -21,6 +22,7 @@
from .._models import Origin, Request, Response
from .._synchronization import Lock, Semaphore, ShieldCancellation
from .._trace import Trace
from .._utils import safe_iterate
from .interfaces import ConnectionInterface

logger = logging.getLogger("httpcore.http2")
Expand Down Expand Up @@ -258,8 +260,10 @@ def _send_request_body(self, request: Request, stream_id: int) -> None:
return

assert isinstance(request.stream, typing.Iterable)
for data in request.stream:
self._send_stream_data(request, stream_id, data)
with safe_iterate(request.stream) as iterator:
for chunk in iterator:
self._send_stream_data(request, stream_id, chunk)

self._send_end_stream(request, stream_id)

def _send_stream_data(
Expand Down Expand Up @@ -308,7 +312,7 @@ def _receive_response(

def _receive_response_body(
self, request: Request, stream_id: int
) -> typing.Iterator[bytes]:
) -> Generator[bytes]:
"""
Iterator that returns the bytes of the response body for a given stream ID.
"""
Expand Down Expand Up @@ -568,14 +572,17 @@ def __init__(
self._stream_id = stream_id
self._closed = False

def __iter__(self) -> typing.Iterator[bytes]:
def __iter__(self) -> Generator[bytes]:
kwargs = {"request": self._request, "stream_id": self._stream_id}
try:
with Trace("receive_response_body", logger, self._request, kwargs):
for chunk in self._connection._receive_response_body(
request=self._request, stream_id=self._stream_id
):
yield chunk
with safe_iterate(
self._connection._receive_response_body(
request=self._request, stream_id=self._stream_id
)
) as iterator:
for chunk in iterator:
yield chunk
except BaseException as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
Expand Down
Loading