Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/02_concepts/09_streaming.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Supported streaming methods:
- [`KeyValueStoreClient.stream_record`](/reference/class/KeyValueStoreClient#stream_record) - Stream key-value store records as raw data.
- [`LogClient.stream`](/reference/class/LogClient#stream) - Stream logs in real time.

These methods return a raw, context-managed `httpx.Response` object. The response must be consumed within a with block to ensure that the connection is closed automatically, preventing memory leaks or unclosed connections.
These methods return a raw, context-managed `impit.Response` object. The response must be consumed within a with block to ensure that the connection is closed automatically, preventing memory leaks or unclosed connections.

The following example demonstrates how to stream the logs of an Actor run incrementally:

Expand Down
4 changes: 2 additions & 2 deletions docs/02_concepts/code/01_async_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ async def main() -> None:
# Stream the logs
async with log_client.stream() as async_log_stream:
if async_log_stream:
async for line in async_log_stream.aiter_lines():
print(line)
async for bytes_chunk in async_log_stream.aiter_bytes():
print(bytes_chunk)


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions docs/02_concepts/code/09_streaming_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ async def main() -> None:

async with log_client.stream() as log_stream:
if log_stream:
for line in log_stream.iter_lines():
print(line)
async for bytes_chunk in log_stream.aiter_bytes():
print(bytes_chunk)
4 changes: 2 additions & 2 deletions docs/02_concepts/code/09_streaming_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ def main() -> None:

with log_client.stream() as log_stream:
if log_stream:
for line in log_stream.iter_lines():
print(line)
for bytes_chunk in log_stream.iter_bytes():
print(bytes_chunk)
18 changes: 18 additions & 0 deletions docs/04_upgrading/upgrading_to_v2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
id: upgrading-to-v2
title: Upgrading to v2
---

This page summarizes the breaking changes between Apify Python API client v1.x and v2.0.

## Python version support

<!-- TODO -->

## Change underlying HTTP library

In v2.0, the Apify Python API client switched from using `httpx` to [`impit`](https://github.com/apify/impit) as the underlying HTTP library.

## Update signature of methods

<!-- TODO -->
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ keywords = ["apify", "api", "client", "automation", "crawling", "scraping"]
dependencies = [
"apify-shared<2.0.0",
"colorama>=0.4.0",
"httpx>=0.25",
"impit>=0.5.2",
"more_itertools>=10.0.0",
]

Expand All @@ -54,7 +54,6 @@ dev = [
"pytest~=8.4.0",
"pytest-httpserver>=1.1.3",
"redbaron~=0.9.0",
"respx~=0.22.0",
"ruff~=0.12.0",
"setuptools", # setuptools are used by pytest but not explicitly required
"types-colorama~=0.4.15.20240106",
Expand Down Expand Up @@ -175,7 +174,7 @@ warn_unused_ignores = true
exclude = []

[[tool.mypy.overrides]]
module = ["pandas", "respx"]
module = ["pandas"]
ignore_missing_imports = true

[tool.basedpyright]
Expand Down
19 changes: 11 additions & 8 deletions src/apify_client/_errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import httpx
import json as jsonlib

import impit
from apify_shared.utils import ignore_docs


Expand All @@ -17,20 +19,21 @@ class ApifyApiError(ApifyClientError):
"""

@ignore_docs
def __init__(self, response: httpx.Response, attempt: int) -> None:
def __init__(self, response: impit.Response, attempt: int, method: str = 'GET') -> None:
"""Initialize a new instance.

Args:
response: The response to the failed API call.
attempt: Which attempt was the request that failed.
method: The HTTP method used for the request.
"""
self.message: str | None = None
self.type: str | None = None
self.data = dict[str, str]()

self.message = f'Unexpected error: {response.text}'
try:
response_data = response.json()
response_data = jsonlib.loads(response.text)
if 'error' in response_data:
self.message = response_data['error']['message']
self.type = response_data['error']['type']
Expand All @@ -44,7 +47,7 @@ def __init__(self, response: httpx.Response, attempt: int) -> None:
self.name = 'ApifyApiError'
self.status_code = response.status_code
self.attempt = attempt
self.http_method = response.request.method
self.http_method = method

# TODO: self.client_method # noqa: TD003
# TODO: self.original_stack # noqa: TD003
Expand All @@ -61,7 +64,7 @@ class InvalidResponseBodyError(ApifyClientError):
"""

@ignore_docs
def __init__(self, response: httpx.Response) -> None:
def __init__(self, response: impit.Response) -> None:
"""Initialize a new instance.

Args:
Expand All @@ -80,8 +83,8 @@ def is_retryable_error(exc: Exception) -> bool:
exc,
(
InvalidResponseBodyError,
httpx.NetworkError,
httpx.TimeoutException,
httpx.RemoteProtocolError,
impit.NetworkError,
impit.TimeoutException,
impit.RemoteProtocolError,
),
)
90 changes: 45 additions & 45 deletions src/apify_client/_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from http import HTTPStatus
from importlib import metadata
from typing import TYPE_CHECKING, Any
from urllib.parse import urlencode

import httpx
import impit
from apify_shared.utils import ignore_docs, is_content_type_json, is_content_type_text, is_content_type_xml

from apify_client._errors import ApifyApiError, InvalidResponseBodyError, is_retryable_error
Expand Down Expand Up @@ -59,13 +60,13 @@ def __init__(
if token is not None:
headers['Authorization'] = f'Bearer {token}'

self.httpx_client = httpx.Client(headers=headers, follow_redirects=True, timeout=timeout_secs)
self.httpx_async_client = httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=timeout_secs)
self.impit_client = impit.Client(headers=headers, follow_redirects=True, timeout=timeout_secs)
self.impit_async_client = impit.AsyncClient(headers=headers, follow_redirects=True, timeout=timeout_secs)

self.stats = stats or Statistics()

@staticmethod
def _maybe_parse_response(response: httpx.Response) -> Any:
def _maybe_parse_response(response: impit.Response) -> Any:
if response.status_code == HTTPStatus.NO_CONTENT:
return None

Expand All @@ -75,7 +76,7 @@ def _maybe_parse_response(response: httpx.Response) -> Any:

try:
if is_content_type_json(content_type):
return response.json()
return jsonlib.loads(response.text)
elif is_content_type_xml(content_type) or is_content_type_text(content_type): # noqa: RET505
return response.text
else:
Expand Down Expand Up @@ -131,6 +132,21 @@ def _prepare_request_call(
data,
)

def _build_url_with_params(self, url: str, params: dict | None = None) -> str:
if not params:
return url

param_pairs: list[tuple[str, str]] = []
for key, value in params.items():
if isinstance(value, list):
param_pairs.extend((key, str(v)) for v in value)
else:
param_pairs.append((key, str(value)))

query_string = urlencode(param_pairs)

return f'{url}?{query_string}'


class HTTPClient(_BaseHTTPClient):
def call(
Expand All @@ -145,7 +161,7 @@ def call(
stream: bool | None = None,
parse_response: bool | None = True,
timeout_secs: int | None = None,
) -> httpx.Response:
) -> impit.Response:
log_context.method.set(method)
log_context.url.set(url)

Expand All @@ -156,34 +172,26 @@ def call(

headers, params, content = self._prepare_request_call(headers, params, data, json)

httpx_client = self.httpx_client
impit_client = self.impit_client

def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response:
log_context.attempt.set(attempt)
logger.debug('Sending request')

self.stats.requests += 1

try:
request = httpx_client.build_request(
# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))

url_with_params = self._build_url_with_params(url, params)

response = impit_client.request(
method=method,
url=url,
url=url_with_params,
headers=headers,
params=params,
content=content,
)

# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
request.extensions['timeout'] = {
'connect': timeout,
'pool': timeout,
'read': timeout,
'write': timeout,
}

response = httpx_client.send(
request=request,
timeout=timeout,
stream=stream or False,
)

Expand Down Expand Up @@ -217,7 +225,7 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:

# Read the response in case it is a stream, so we can raise the error properly
response.read()
raise ApifyApiError(response, attempt)
raise ApifyApiError(response, attempt, method=method)

return retry_with_exp_backoff(
_make_request,
Expand All @@ -241,7 +249,7 @@ async def call(
stream: bool | None = None,
parse_response: bool | None = True,
timeout_secs: int | None = None,
) -> httpx.Response:
) -> impit.Response:
log_context.method.set(method)
log_context.url.set(url)

Expand All @@ -252,31 +260,23 @@ async def call(

headers, params, content = self._prepare_request_call(headers, params, data, json)

httpx_async_client = self.httpx_async_client
impit_async_client = self.impit_async_client

async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
async def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response:
log_context.attempt.set(attempt)
logger.debug('Sending request')
try:
request = httpx_async_client.build_request(
# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))

url_with_params = self._build_url_with_params(url, params)

response = await impit_async_client.request(
method=method,
url=url,
url=url_with_params,
headers=headers,
params=params,
content=content,
)

# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
request.extensions['timeout'] = {
'connect': timeout,
'pool': timeout,
'read': timeout,
'write': timeout,
}

response = await httpx_async_client.send(
request=request,
timeout=timeout,
stream=stream or False,
)

Expand Down Expand Up @@ -310,7 +310,7 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response

# Read the response in case it is a stream, so we can raise the error properly
await response.aread()
raise ApifyApiError(response, attempt)
raise ApifyApiError(response, attempt, method=method)

return await retry_with_exp_backoff_async(
_make_request,
Expand Down
9 changes: 5 additions & 4 deletions src/apify_client/clients/base/actor_job_base_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import json as jsonlib
import math
import time
from datetime import datetime, timezone
Expand Down Expand Up @@ -39,7 +40,7 @@ def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
method='GET',
params=self._params(waitForFinish=wait_for_finish),
)
job = parse_date_fields(pluck_data(response.json()))
job = parse_date_fields(pluck_data(jsonlib.loads(response.text)))

seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
if ActorJobStatus(job['status']).is_terminal or (
Expand Down Expand Up @@ -70,7 +71,7 @@ def _abort(self, *, gracefully: bool | None = None) -> dict:
method='POST',
params=self._params(gracefully=gracefully),
)
return parse_date_fields(pluck_data(response.json()))
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))


@ignore_docs
Expand All @@ -94,7 +95,7 @@ async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
method='GET',
params=self._params(waitForFinish=wait_for_finish),
)
job = parse_date_fields(pluck_data(response.json()))
job = parse_date_fields(pluck_data(jsonlib.loads(response.text)))

seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
if ActorJobStatus(job['status']).is_terminal or (
Expand Down Expand Up @@ -125,4 +126,4 @@ async def _abort(self, *, gracefully: bool | None = None) -> dict:
method='POST',
params=self._params(gracefully=gracefully),
)
return parse_date_fields(pluck_data(response.json()))
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))
Loading
Loading