Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/apify_client/_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def call(
json: JSONSerializable | None = None,
stream: bool | None = None,
parse_response: bool | None = True,
timeout_secs: int | None = None,
) -> httpx.Response:
log_context.method.set(method)
log_context.url.set(url)
Expand Down Expand Up @@ -170,6 +171,16 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
params=params,
content=content,
)

# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
request.extensions['timeout'] = {
'connect': timeout,
'pool': timeout,
'read': timeout,
'write': timeout,
}

response = httpx_client.send(
request=request,
stream=stream or False,
Expand Down Expand Up @@ -225,6 +236,7 @@ async def call(
json: JSONSerializable | None = None,
stream: bool | None = None,
parse_response: bool | None = True,
timeout_secs: int | None = None,
) -> httpx.Response:
log_context.method.set(method)
log_context.url.set(url)
Expand All @@ -249,6 +261,16 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response
params=params,
content=content,
)

# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
request.extensions['timeout'] = {
'connect': timeout,
'pool': timeout,
'read': timeout,
'write': timeout,
}

response = await httpx_async_client.send(
request=request,
stream=stream or False,
Expand Down
9 changes: 5 additions & 4 deletions src/apify_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
)

DEFAULT_API_URL = 'https://api.apify.com'
DEFAULT_TIMEOUT = 360
API_VERSION = 'v2'


Expand All @@ -68,7 +69,7 @@ def __init__(
api_url: str | None = None,
max_retries: int | None = 8,
min_delay_between_retries_millis: int | None = 500,
timeout_secs: int | None = 360,
timeout_secs: int | None = DEFAULT_TIMEOUT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if you pass None, it will still use the default timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is same as the previous implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I understand it's BC and BC is sacred, but I would expect None to mean "no timeout"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To limit the scope of this change, I will avoid adding unnecessary breaking change to this PR, but it might be added in own PR if needed.

) -> None:
"""Initialize a new instance.

Expand All @@ -85,7 +86,7 @@ def __init__(
self.base_url = f'{api_url}/{API_VERSION}'
self.max_retries = max_retries or 8
self.min_delay_between_retries_millis = min_delay_between_retries_millis or 500
self.timeout_secs = timeout_secs or 360
self.timeout_secs = timeout_secs or DEFAULT_TIMEOUT

def _options(self) -> dict:
return {
Expand All @@ -107,7 +108,7 @@ def __init__(
api_url: str | None = None,
max_retries: int | None = 8,
min_delay_between_retries_millis: int | None = 500,
timeout_secs: int | None = 360,
timeout_secs: int | None = DEFAULT_TIMEOUT,
) -> None:
"""Initialize a new instance.

Expand Down Expand Up @@ -290,7 +291,7 @@ def __init__(
api_url: str | None = None,
max_retries: int | None = 8,
min_delay_between_retries_millis: int | None = 500,
timeout_secs: int | None = 360,
timeout_secs: int | None = DEFAULT_TIMEOUT,
) -> None:
"""Initialize a new instance.

Expand Down
18 changes: 12 additions & 6 deletions src/apify_client/clients/base/resource_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
class ResourceClient(BaseClient):
"""Base class for sub-clients manipulating a single resource."""

def _get(self) -> dict | None:
def _get(self, timeout_secs: int | None = None) -> dict | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could have the default timeout constant as the default as well, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could. I guess this makes only difference if someone decides to use the client directly and not through ApifyClient(A)sync. I am not strongly against this, but I lean towards not doing it as having same timeout constant used on several levels, can create some confusion about which one is the one being applied.

Currently it set on ApifyClient(A)sync and it falls through all the way down through the HTTPClient(Async)

try:
response = self.http_client.call(
url=self.url,
method='GET',
params=self._params(),
timeout_secs=timeout_secs,
)

return parse_date_fields(pluck_data(response.json()))
Expand All @@ -26,22 +27,24 @@ def _get(self) -> dict | None:

return None

def _update(self, updated_fields: dict) -> dict:
def _update(self, updated_fields: dict, timeout_secs: int | None = None) -> dict:
response = self.http_client.call(
url=self._url(),
method='PUT',
params=self._params(),
json=updated_fields,
timeout_secs=timeout_secs,
)

return parse_date_fields(pluck_data(response.json()))

def _delete(self) -> None:
def _delete(self, timeout_secs: int | None = None) -> None:
try:
self.http_client.call(
url=self._url(),
method='DELETE',
params=self._params(),
timeout_secs=timeout_secs,
)

except ApifyApiError as exc:
Expand All @@ -52,12 +55,13 @@ def _delete(self) -> None:
class ResourceClientAsync(BaseClientAsync):
"""Base class for async sub-clients manipulating a single resource."""

async def _get(self) -> dict | None:
async def _get(self, timeout_secs: int | None = None) -> dict | None:
try:
response = await self.http_client.call(
url=self.url,
method='GET',
params=self._params(),
timeout_secs=timeout_secs,
)

return parse_date_fields(pluck_data(response.json()))
Expand All @@ -67,22 +71,24 @@ async def _get(self) -> dict | None:

return None

async def _update(self, updated_fields: dict) -> dict:
async def _update(self, updated_fields: dict, timeout_secs: int | None = None) -> dict:
response = await self.http_client.call(
url=self._url(),
method='PUT',
params=self._params(),
json=updated_fields,
timeout_secs=timeout_secs,
)

return parse_date_fields(pluck_data(response.json()))

async def _delete(self) -> None:
async def _delete(self, timeout_secs: int | None = None) -> None:
try:
await self.http_client.call(
url=self._url(),
method='DELETE',
params=self._params(),
timeout_secs=timeout_secs,
)

except ApifyApiError as exc:
Expand Down
19 changes: 13 additions & 6 deletions src/apify_client/clients/resource_clients/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import httpx
from apify_shared.types import JSONSerializable

_SMALL_TIMEOUT = 5 # For fast and common actions. Suitable for idempotent actions.
_MEDIUM_TIMEOUT = 30 # For actions that may take longer.


class DatasetClient(ResourceClient):
"""Sub-client for manipulating a single dataset."""
Expand All @@ -34,7 +37,7 @@ def get(self) -> dict | None:
Returns:
The retrieved dataset, or None, if it does not exist.
"""
return self._get()
return self._get(timeout_secs=_SMALL_TIMEOUT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange to have no way to override this... Could we have a timeout parameter here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any use-case for this?


def update(self, *, name: str | None = None) -> dict:
"""Update the dataset with specified fields.
Expand All @@ -49,14 +52,14 @@ def update(self, *, name: str | None = None) -> dict:
"""
updated_fields = {'name': name}

return self._update(filter_out_none_values_recursively(updated_fields))
return self._update(filter_out_none_values_recursively(updated_fields), timeout_secs=_SMALL_TIMEOUT)

def delete(self) -> None:
"""Delete the dataset.

https://docs.apify.com/api/v2#/reference/datasets/dataset/delete-dataset
"""
return self._delete()
return self._delete(timeout_secs=_SMALL_TIMEOUT)

def list_items(
self,
Expand Down Expand Up @@ -539,6 +542,7 @@ def push_items(self, items: JSONSerializable) -> None:
params=self._params(),
data=data,
json=json,
timeout_secs=_MEDIUM_TIMEOUT,
)

def get_statistics(self) -> dict | None:
Expand All @@ -554,6 +558,7 @@ def get_statistics(self) -> dict | None:
url=self._url('statistics'),
method='GET',
params=self._params(),
timeout_secs=_SMALL_TIMEOUT,
)
return pluck_data(response.json())
except ApifyApiError as exc:
Expand All @@ -578,7 +583,7 @@ async def get(self) -> dict | None:
Returns:
The retrieved dataset, or None, if it does not exist.
"""
return await self._get()
return await self._get(timeout_secs=_SMALL_TIMEOUT)

async def update(self, *, name: str | None = None) -> dict:
"""Update the dataset with specified fields.
Expand All @@ -593,14 +598,14 @@ async def update(self, *, name: str | None = None) -> dict:
"""
updated_fields = {'name': name}

return await self._update(filter_out_none_values_recursively(updated_fields))
return await self._update(filter_out_none_values_recursively(updated_fields), timeout_secs=_SMALL_TIMEOUT)

async def delete(self) -> None:
"""Delete the dataset.

https://docs.apify.com/api/v2#/reference/datasets/dataset/delete-dataset
"""
return await self._delete()
return await self._delete(timeout_secs=_SMALL_TIMEOUT)

async def list_items(
self,
Expand Down Expand Up @@ -990,6 +995,7 @@ async def push_items(self, items: JSONSerializable) -> None:
params=self._params(),
data=data,
json=json,
timeout_secs=_MEDIUM_TIMEOUT,
)

async def get_statistics(self) -> dict | None:
Expand All @@ -1005,6 +1011,7 @@ async def get_statistics(self) -> dict | None:
url=self._url('statistics'),
method='GET',
params=self._params(),
timeout_secs=_SMALL_TIMEOUT,
)
return pluck_data(response.json())
except ApifyApiError as exc:
Expand Down
15 changes: 11 additions & 4 deletions src/apify_client/clients/resource_clients/key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterator

_SMALL_TIMEOUT = 5 # For fast and common actions. Suitable for idempotent actions.
_MEDIUM_TIMEOUT = 30 # For actions that may take longer.


class KeyValueStoreClient(ResourceClient):
"""Sub-client for manipulating a single key-value store."""
Expand All @@ -30,7 +33,7 @@ def get(self) -> dict | None:
Returns:
The retrieved key-value store, or None if it does not exist.
"""
return self._get()
return self._get(timeout_secs=_SMALL_TIMEOUT)

def update(self, *, name: str | None = None) -> dict:
"""Update the key-value store with specified fields.
Expand All @@ -54,7 +57,7 @@ def delete(self) -> None:

https://docs.apify.com/api/v2#/reference/key-value-stores/store-object/delete-store
"""
return self._delete()
return self._delete(timeout_secs=_SMALL_TIMEOUT)

def list_keys(self, *, limit: int | None = None, exclusive_start_key: str | None = None) -> dict:
"""List the keys in the key-value store.
Expand All @@ -74,6 +77,7 @@ def list_keys(self, *, limit: int | None = None, exclusive_start_key: str | None
url=self._url('keys'),
method='GET',
params=request_params,
timeout_secs=_MEDIUM_TIMEOUT,
)

return parse_date_fields(pluck_data(response.json()))
Expand Down Expand Up @@ -236,6 +240,7 @@ def delete_record(self, key: str) -> None:
url=self._url(f'records/{key}'),
method='DELETE',
params=self._params(),
timeout_secs=_SMALL_TIMEOUT,
)


Expand All @@ -255,7 +260,7 @@ async def get(self) -> dict | None:
Returns:
The retrieved key-value store, or None if it does not exist.
"""
return await self._get()
return await self._get(timeout_secs=_SMALL_TIMEOUT)

async def update(self, *, name: str | None = None) -> dict:
"""Update the key-value store with specified fields.
Expand All @@ -279,7 +284,7 @@ async def delete(self) -> None:

https://docs.apify.com/api/v2#/reference/key-value-stores/store-object/delete-store
"""
return await self._delete()
return await self._delete(timeout_secs=_SMALL_TIMEOUT)

async def list_keys(self, *, limit: int | None = None, exclusive_start_key: str | None = None) -> dict:
"""List the keys in the key-value store.
Expand All @@ -299,6 +304,7 @@ async def list_keys(self, *, limit: int | None = None, exclusive_start_key: str
url=self._url('keys'),
method='GET',
params=request_params,
timeout_secs=_MEDIUM_TIMEOUT,
)

return parse_date_fields(pluck_data(response.json()))
Expand Down Expand Up @@ -440,4 +446,5 @@ async def delete_record(self, key: str) -> None:
url=self._url(f'records/{key}'),
method='DELETE',
params=self._params(),
timeout_secs=_SMALL_TIMEOUT,
)
Loading