diff --git a/docs/installation.rst b/docs/installation.rst index a6025e1a..01f1c9a9 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -22,6 +22,7 @@ The following HTTP libraries are supported: - ``urllib2`` - ``urllib3`` - ``httpx`` +- ``httpcore`` Speed ----- diff --git a/setup.py b/setup.py index f3f06c11..a3291b02 100644 --- a/setup.py +++ b/setup.py @@ -48,6 +48,7 @@ def find_version(*file_paths): "tests": [ "aiohttp", "boto3", + "httpcore", "httplib2", "httpx", "pytest-aiohttp", diff --git a/vcr/patch.py b/vcr/patch.py index 256a58c6..448c1e1c 100644 --- a/vcr/patch.py +++ b/vcr/patch.py @@ -92,12 +92,12 @@ try: - import httpx + import httpcore except ImportError: # pragma: no cover pass else: - _HttpxSyncClient_send_single_request = httpx.Client._send_single_request - _HttpxAsyncClient_send_single_request = httpx.AsyncClient._send_single_request + _HttpcoreConnectionPool_handle_request = httpcore.ConnectionPool.handle_request + _HttpcoreAsyncConnectionPool_handle_async_request = httpcore.AsyncConnectionPool.handle_async_request class CassettePatcherBuilder: @@ -121,7 +121,7 @@ def build(self): self._httplib2(), self._tornado(), self._aiohttp(), - self._httpx(), + self._httpcore(), self._build_patchers_from_mock_triples(self._cassette.custom_patches), ) @@ -304,19 +304,22 @@ def _aiohttp(self): yield client.ClientSession, "_request", new_request @_build_patchers_from_mock_triples_decorator - def _httpx(self): + def _httpcore(self): try: - import httpx + import httpcore except ImportError: # pragma: no cover return else: - from .stubs.httpx_stubs import async_vcr_send, sync_vcr_send + from .stubs.httpcore_stubs import vcr_handle_async_request, vcr_handle_request - new_async_client_send = async_vcr_send(self._cassette, _HttpxAsyncClient_send_single_request) - yield httpx.AsyncClient, "_send_single_request", new_async_client_send + new_handle_async_request = vcr_handle_async_request( + self._cassette, + _HttpcoreAsyncConnectionPool_handle_async_request, + ) + yield httpcore.AsyncConnectionPool, "handle_async_request", new_handle_async_request - new_sync_client_send = sync_vcr_send(self._cassette, _HttpxSyncClient_send_single_request) - yield httpx.Client, "_send_single_request", new_sync_client_send + new_handle_request = vcr_handle_request(self._cassette, _HttpcoreConnectionPool_handle_request) + yield httpcore.ConnectionPool, "handle_request", new_handle_request def _urllib3_patchers(self, cpool, conn, stubs): http_connection_remover = ConnectionRemover( diff --git a/vcr/stubs/httpcore_stubs.py b/vcr/stubs/httpcore_stubs.py new file mode 100644 index 00000000..3b9fd9fb --- /dev/null +++ b/vcr/stubs/httpcore_stubs.py @@ -0,0 +1,215 @@ +import asyncio +import functools +import logging +from collections import defaultdict +from collections.abc import AsyncIterable, Iterable + +from httpcore import Response +from httpcore._models import ByteStream + +from vcr.errors import CannotOverwriteExistingCassetteException +from vcr.filters import decode_response +from vcr.request import Request as VcrRequest +from vcr.serializers.compat import convert_body_to_bytes + +_logger = logging.getLogger(__name__) + + +async def _convert_byte_stream(stream): + if isinstance(stream, Iterable): + return list(stream) + + if isinstance(stream, AsyncIterable): + return [part async for part in stream] + + raise TypeError( + f"_convert_byte_stream: stream must be Iterable or AsyncIterable, got {type(stream).__name__}", + ) + + +def _serialize_headers(real_response): + """ + Some headers can appear multiple times, like "Set-Cookie". + Therefore serialize every header key to a list of values. + """ + + headers = defaultdict(list) + + for name, value in real_response.headers: + headers[name.decode("ascii")].append(value.decode("ascii")) + + return dict(headers) + + +async def _serialize_response(real_response): + # The reason_phrase may not exist + try: + reason_phrase = real_response.extensions["reason_phrase"].decode("ascii") + except KeyError: + reason_phrase = None + + # Reading the response stream consumes the iterator, so we need to restore it afterwards + content = b"".join(await _convert_byte_stream(real_response.stream)) + real_response.stream = ByteStream(content) + + return { + "status": {"code": real_response.status, "message": reason_phrase}, + "headers": _serialize_headers(real_response), + "body": {"string": content}, + } + + +def _deserialize_headers(headers): + """ + httpcore accepts headers as list of tuples of header key and value. + """ + + return [ + (name.encode("ascii"), value.encode("ascii")) for name, values in headers.items() for value in values + ] + + +def _deserialize_response(vcr_response): + # Cassette format generated for HTTPX requests by older versions of + # vcrpy. We restructure the content to resemble what a regular + # cassette looks like. + if "status_code" in vcr_response: + vcr_response = decode_response( + convert_body_to_bytes( + { + "headers": vcr_response["headers"], + "body": {"string": vcr_response["content"]}, + "status": {"code": vcr_response["status_code"]}, + }, + ), + ) + extensions = None + else: + extensions = ( + {"reason_phrase": vcr_response["status"]["message"].encode("ascii")} + if vcr_response["status"]["message"] + else None + ) + + return Response( + vcr_response["status"]["code"], + headers=_deserialize_headers(vcr_response["headers"]), + content=vcr_response["body"]["string"], + extensions=extensions, + ) + + +async def _make_vcr_request(real_request): + # Reading the request stream consumes the iterator, so we need to restore it afterwards + body = b"".join(await _convert_byte_stream(real_request.stream)) + real_request.stream = ByteStream(body) + + uri = bytes(real_request.url).decode("ascii") + + # As per HTTPX: If there are multiple headers with the same key, then we concatenate them with commas + headers = defaultdict(list) + + for name, value in real_request.headers: + headers[name.decode("ascii")].append(value.decode("ascii")) + + headers = {name: ", ".join(values) for name, values in headers.items()} + + return VcrRequest(real_request.method.decode("ascii"), uri, body, headers) + + +async def _vcr_request(cassette, real_request): + vcr_request = await _make_vcr_request(real_request) + + if cassette.can_play_response_for(vcr_request): + return vcr_request, _play_responses(cassette, vcr_request) + + if cassette.write_protected and cassette.filter_request(vcr_request): + raise CannotOverwriteExistingCassetteException( + cassette=cassette, + failed_request=vcr_request, + ) + + _logger.info("%s not in cassette, sending to real server", vcr_request) + + return vcr_request, None + + +async def _record_responses(cassette, vcr_request, real_response): + cassette.append(vcr_request, await _serialize_response(real_response)) + + +def _play_responses(cassette, vcr_request): + vcr_response = cassette.play_response(vcr_request) + real_response = _deserialize_response(vcr_response) + + return real_response + + +async def _vcr_handle_async_request( + cassette, + real_handle_async_request, + self, + real_request, +): + vcr_request, vcr_response = await _vcr_request(cassette, real_request) + + if vcr_response: + return vcr_response + + real_response = await real_handle_async_request(self, real_request) + await _record_responses(cassette, vcr_request, real_response) + + return real_response + + +def vcr_handle_async_request(cassette, real_handle_async_request): + @functools.wraps(real_handle_async_request) + def _inner_handle_async_request(self, real_request): + return _vcr_handle_async_request( + cassette, + real_handle_async_request, + self, + real_request, + ) + + return _inner_handle_async_request + + +def _run_async_function(sync_func, *args, **kwargs): + """ + Safely run an asynchronous function from a synchronous context. + Handles both cases: + - An event loop is already running. + - No event loop exists yet. + """ + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(sync_func(*args, **kwargs)) + else: + # If inside a running loop, create a task and wait for it + return asyncio.ensure_future(sync_func(*args, **kwargs)) + + +def _vcr_handle_request(cassette, real_handle_request, self, real_request): + vcr_request, vcr_response = _run_async_function( + _vcr_request, + cassette, + real_request, + ) + + if vcr_response: + return vcr_response + + real_response = real_handle_request(self, real_request) + _run_async_function(_record_responses, cassette, vcr_request, real_response) + + return real_response + + +def vcr_handle_request(cassette, real_handle_request): + @functools.wraps(real_handle_request) + def _inner_handle_request(self, real_request): + return _vcr_handle_request(cassette, real_handle_request, self, real_request) + + return _inner_handle_request diff --git a/vcr/stubs/httpx_stubs.py b/vcr/stubs/httpx_stubs.py deleted file mode 100644 index 508996b1..00000000 --- a/vcr/stubs/httpx_stubs.py +++ /dev/null @@ -1,202 +0,0 @@ -import asyncio -import functools -import inspect -import logging -from unittest.mock import MagicMock, patch - -import httpx - -from vcr.errors import CannotOverwriteExistingCassetteException -from vcr.filters import decode_response -from vcr.request import Request as VcrRequest -from vcr.serializers.compat import convert_body_to_bytes - -_httpx_signature = inspect.signature(httpx.Client.request) - -try: - HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["follow_redirects"] -except KeyError: - HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["allow_redirects"] - - -_logger = logging.getLogger(__name__) - - -def _transform_headers(httpx_response): - """ - Some headers can appear multiple times, like "Set-Cookie". - Therefore transform to every header key to list of values. - """ - - out = {} - for key, var in httpx_response.headers.raw: - decoded_key = key.decode("utf-8") - out.setdefault(decoded_key, []) - out[decoded_key].append(var.decode("utf-8")) - return out - - -async def _to_serialized_response(resp, aread): - # The content shouldn't already have been read in by HTTPX. - assert not hasattr(resp, "_decoder") - - # Retrieve the content, but without decoding it. - with patch.dict(resp.headers, {"Content-Encoding": ""}): - if aread: - await resp.aread() - else: - resp.read() - - result = { - "status": {"code": resp.status_code, "message": resp.reason_phrase}, - "headers": _transform_headers(resp), - "body": {"string": resp.content}, - } - - # As the content wasn't decoded, we restore the response to a state which - # will be capable of decoding the content for the consumer. - del resp._decoder - resp._content = resp._get_content_decoder().decode(resp.content) - return result - - -def _from_serialized_headers(headers): - """ - httpx accepts headers as list of tuples of header key and value. - """ - - header_list = [] - for key, values in headers.items(): - for v in values: - header_list.append((key, v)) - return header_list - - -@patch("httpx.Response.close", MagicMock()) -@patch("httpx.Response.read", MagicMock()) -def _from_serialized_response(request, serialized_response, history=None): - # Cassette format generated for HTTPX requests by older versions of - # vcrpy. We restructure the content to resemble what a regular - # cassette looks like. - if "status_code" in serialized_response: - serialized_response = decode_response( - convert_body_to_bytes( - { - "headers": serialized_response["headers"], - "body": {"string": serialized_response["content"]}, - "status": {"code": serialized_response["status_code"]}, - }, - ), - ) - extensions = None - else: - extensions = {"reason_phrase": serialized_response["status"]["message"].encode()} - - response = httpx.Response( - status_code=serialized_response["status"]["code"], - request=request, - headers=_from_serialized_headers(serialized_response["headers"]), - content=serialized_response["body"]["string"], - history=history or [], - extensions=extensions, - ) - - return response - - -def _make_vcr_request(httpx_request, **kwargs): - body = httpx_request.read().decode("utf-8") - uri = str(httpx_request.url) - headers = dict(httpx_request.headers) - return VcrRequest(httpx_request.method, uri, body, headers) - - -def _shared_vcr_send(cassette, real_send, *args, **kwargs): - real_request = args[1] - - vcr_request = _make_vcr_request(real_request, **kwargs) - - if cassette.can_play_response_for(vcr_request): - return vcr_request, _play_responses(cassette, real_request, vcr_request, args[0], kwargs) - - if cassette.write_protected and cassette.filter_request(vcr_request): - raise CannotOverwriteExistingCassetteException(cassette=cassette, failed_request=vcr_request) - - _logger.info("%s not in cassette, sending to real server", vcr_request) - return vcr_request, None - - -async def _record_responses(cassette, vcr_request, real_response, aread): - for past_real_response in real_response.history: - past_vcr_request = _make_vcr_request(past_real_response.request) - cassette.append(past_vcr_request, await _to_serialized_response(past_real_response, aread)) - - if real_response.history: - # If there was a redirection keep we want the request which will hold the - # final redirect value - vcr_request = _make_vcr_request(real_response.request) - - cassette.append(vcr_request, await _to_serialized_response(real_response, aread)) - return real_response - - -def _play_responses(cassette, request, vcr_request, client, kwargs): - vcr_response = cassette.play_response(vcr_request) - response = _from_serialized_response(request, vcr_response) - return response - - -async def _async_vcr_send(cassette, real_send, *args, **kwargs): - vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs) - if response: - # add cookies from response to session cookie store - args[0].cookies.extract_cookies(response) - return response - - real_response = await real_send(*args, **kwargs) - await _record_responses(cassette, vcr_request, real_response, aread=True) - return real_response - - -def async_vcr_send(cassette, real_send): - @functools.wraps(real_send) - def _inner_send(*args, **kwargs): - return _async_vcr_send(cassette, real_send, *args, **kwargs) - - return _inner_send - - -def _run_async_function(sync_func, *args, **kwargs): - """ - Safely run an asynchronous function from a synchronous context. - Handles both cases: - - An event loop is already running. - - No event loop exists yet. - """ - try: - asyncio.get_running_loop() - except RuntimeError: - return asyncio.run(sync_func(*args, **kwargs)) - else: - # If inside a running loop, create a task and wait for it - return asyncio.ensure_future(sync_func(*args, **kwargs)) - - -def _sync_vcr_send(cassette, real_send, *args, **kwargs): - vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs) - if response: - # add cookies from response to session cookie store - args[0].cookies.extract_cookies(response) - return response - - real_response = real_send(*args, **kwargs) - _run_async_function(_record_responses, cassette, vcr_request, real_response, aread=False) - return real_response - - -def sync_vcr_send(cassette, real_send): - @functools.wraps(real_send) - def _inner_send(*args, **kwargs): - return _sync_vcr_send(cassette, real_send, *args, **kwargs) - - return _inner_send