Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/apify_client/_dynamic_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from __future__ import annotations

from collections.abc import AsyncIterable, Iterable
from typing import Protocol, Union

RequestContent = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]


class DynamicTimeoutFunction(Protocol):
"""A function for dynamically creating suitable timeout for an http request."""

def __call__(self, method: str, url: str, content: RequestContent) -> None | int:
"""Generate suitable timeout [s] for the request."""
24 changes: 24 additions & 0 deletions src/apify_client/_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
if TYPE_CHECKING:
from apify_shared.types import JSONSerializable

from apify_client._dynamic_timeout import DynamicTimeoutFunction


DEFAULT_BACKOFF_EXPONENTIAL_FACTOR = 2
DEFAULT_BACKOFF_RANDOM_FACTOR = 1
Expand All @@ -36,11 +38,13 @@ def __init__(
max_retries: int = 8,
min_delay_between_retries_millis: int = 500,
timeout_secs: int = 360,
get_dynamic_timeout: DynamicTimeoutFunction | None = None,
stats: Statistics | None = None,
) -> None:
self.max_retries = max_retries
self.min_delay_between_retries_millis = min_delay_between_retries_millis
self.timeout_secs = timeout_secs
self._get_dynamic_timeout = get_dynamic_timeout

headers = {'Accept': 'application/json, */*'}

Expand Down Expand Up @@ -170,6 +174,16 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
params=params,
content=content,
)

if self._get_dynamic_timeout:
timeout = self._get_dynamic_timeout(method, url, content) or self.timeout_secs
request.extensions['timeout'] = {
'connect': timeout,
'pool': timeout,
'read': timeout,
'write': timeout,
}

response = httpx_client.send(
request=request,
stream=stream or False,
Expand Down Expand Up @@ -249,6 +263,16 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response
params=params,
content=content,
)

if self._get_dynamic_timeout:
timeout = self._get_dynamic_timeout(method, url, content) or self.timeout_secs
request.extensions['timeout'] = {
'connect': timeout,
'pool': timeout,
'read': timeout,
'write': timeout,
}

response = await httpx_async_client.send(
request=request,
stream=stream or False,
Expand Down
11 changes: 11 additions & 0 deletions src/apify_client/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from apify_shared.utils import ignore_docs

from apify_client._http_client import HTTPClient, HTTPClientAsync
Expand Down Expand Up @@ -53,6 +55,9 @@
WebhookDispatchCollectionClientAsync,
)

if TYPE_CHECKING:
from apify_client._dynamic_timeout import DynamicTimeoutFunction

DEFAULT_API_URL = 'https://api.apify.com'
API_VERSION = 'v2'

Expand Down Expand Up @@ -108,6 +113,7 @@ def __init__(
max_retries: int | None = 8,
min_delay_between_retries_millis: int | None = 500,
timeout_secs: int | None = 360,
get_dynamic_timeout: DynamicTimeoutFunction | None = None,
) -> None:
"""Initialize a new instance.

Expand All @@ -118,6 +124,7 @@ def __init__(
min_delay_between_retries_millis: How long will the client wait between retrying requests
(increases exponentially from this value).
timeout_secs: The socket timeout of the HTTP requests sent to the Apify API.
get_dynamic_timeout: A function that can be called for each request to get suitable custom timeout for it.
"""
super().__init__(
token,
Expand All @@ -133,6 +140,7 @@ def __init__(
max_retries=self.max_retries,
min_delay_between_retries_millis=self.min_delay_between_retries_millis,
timeout_secs=self.timeout_secs,
get_dynamic_timeout=get_dynamic_timeout,
stats=self.stats,
)

Expand Down Expand Up @@ -291,6 +299,7 @@ def __init__(
max_retries: int | None = 8,
min_delay_between_retries_millis: int | None = 500,
timeout_secs: int | None = 360,
get_dynamic_timeout: DynamicTimeoutFunction | None = None,
) -> None:
"""Initialize a new instance.

Expand All @@ -301,6 +310,7 @@ def __init__(
min_delay_between_retries_millis: How long will the client wait between retrying requests
(increases exponentially from this value).
timeout_secs: The socket timeout of the HTTP requests sent to the Apify API.
get_dynamic_timeout: A function that can be called for each request to get suitable custom timeout for it.
"""
super().__init__(
token,
Expand All @@ -316,6 +326,7 @@ def __init__(
max_retries=self.max_retries,
min_delay_between_retries_millis=self.min_delay_between_retries_millis,
timeout_secs=self.timeout_secs,
get_dynamic_timeout=get_dynamic_timeout,
stats=self.stats,
)

Expand Down
117 changes: 117 additions & 0 deletions tests/unit/test_dynamic_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import pytest
import respx
from httpx import Request, Response

from apify_client import ApifyClient, ApifyClientAsync

if TYPE_CHECKING:
from apify_client._dynamic_timeout import DynamicTimeoutFunction, RequestContent


@pytest.fixture
def get_dynamic_timeout_function() -> DynamicTimeoutFunction:
"""Example of a dynamic timeout function."""

def get_dynamic_timeout(method: str, url: str, content: RequestContent) -> int | None:
"""Return suitable timeout.

For POST on endpoint v2/datasets/whatever/items timeout is proportional to the size of the content.
For everything else return fixed 30."""
if isinstance(content, bytes) and method == 'POST' and url.endswith('v2/datasets/whatever/items'):
Copy link

Copilot AI Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dynamic timeout function only handles content that is of type bytes, but the tests pass strings (e.g., 'abcd'). To ensure the expected custom timeouts (e.g., 5, 9), update the type check or convert the content to bytes.

Copilot uses AI. Check for mistakes.

dynamic_timeout_based_on_size = int(len(content) / 10)
return min(360, max(5, dynamic_timeout_based_on_size)) # Saturate in range 5-360 seconds
return 30

return get_dynamic_timeout


@respx.mock
@pytest.mark.parametrize(
('content', 'expected_timeout'),
[
pytest.param('abcd', 5, id='Small payload'),
pytest.param('abcd' * 10000, 9, id='Payload in the dynamic timeout interval interval'),
Copy link

Copilot AI Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test case id contains a repeated phrase ('interval interval'); consider revising it to remove the duplication.

Suggested change
pytest.param('abcd' * 10000, 9, id='Payload in the dynamic timeout interval interval'),
pytest.param('abcd' * 10000, 9, id='Payload in the dynamic timeout interval'),

Copilot uses AI. Check for mistakes.

pytest.param('abcd' * 1000000, 360, id='Large payload'),
],
)
async def test_dynamic_timeout_async_client(
get_dynamic_timeout_function: DynamicTimeoutFunction, content: str, expected_timeout: int
) -> None:
def check_timeout(request: Request) -> Response:
assert request.extensions['timeout'] == {
'connect': expected_timeout,
'pool': expected_timeout,
'read': expected_timeout,
'write': expected_timeout,
}
return Response(201)

respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout)
client = ApifyClientAsync(get_dynamic_timeout=get_dynamic_timeout_function)
await client.dataset(dataset_id='whatever').push_items({'some_key': content})


@respx.mock
async def test_dynamic_timeout_async_client_default() -> None:
expected_timeout = 360

def check_timeout(request: Request) -> Response:
assert request.extensions['timeout'] == {
'connect': expected_timeout,
'pool': expected_timeout,
'read': expected_timeout,
'write': expected_timeout,
}
return Response(201)

respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout)
client = ApifyClientAsync()
await client.dataset(dataset_id='whatever').push_items({'some_key': 'abcd'})


@respx.mock
@pytest.mark.parametrize(
('content', 'expected_timeout'),
[
pytest.param('abcd', 5, id='Small payload'),
pytest.param('abcd' * 10000, 9, id='Payload in the dynamic timeout interval interval'),
pytest.param('abcd' * 1000000, 360, id='Large payload'),
],
)
def test_dynamic_timeout_sync_client(
get_dynamic_timeout_function: DynamicTimeoutFunction, content: str, expected_timeout: int
) -> None:
def check_timeout(request: Request) -> Response:
assert request.extensions['timeout'] == {
'connect': expected_timeout,
'pool': expected_timeout,
'read': expected_timeout,
'write': expected_timeout,
}
return Response(201)

respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout)
client = ApifyClient(get_dynamic_timeout=get_dynamic_timeout_function)
client.dataset(dataset_id='whatever').push_items({'some_key': content})


@respx.mock
def test_dynamic_timeout_sync_client_default() -> None:
expected_timeout = 360

def check_timeout(request: Request) -> Response:
assert request.extensions['timeout'] == {
'connect': expected_timeout,
'pool': expected_timeout,
'read': expected_timeout,
'write': expected_timeout,
}
return Response(201)

respx.post('https://api.apify.com/v2/datasets/whatever/items').mock(side_effect=check_timeout)
client = ApifyClient()
client.dataset(dataset_id='whatever').push_items({'some_key': 'abcd'})
Loading