From 87911c413ea41f18d08cb1a4fefba7484c173f59 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 9 Apr 2025 11:47:57 +0200 Subject: [PATCH 1/5] Updated exp backoff to increse timeout as well. Updated some clients endpoints with spefici timeouts. Added tests. TODO: Add async tests. Make some deal with Mypy --- src/apify_client/_http_client.py | 22 +++ .../clients/base/resource_client.py | 18 ++- .../clients/resource_clients/dataset.py | 19 ++- .../resource_clients/key_value_store.py | 15 +- .../clients/resource_clients/request_queue.py | 37 ++++- tests/unit/test_client_timeouts.py | 131 ++++++++++++++++++ 6 files changed, 220 insertions(+), 22 deletions(-) create mode 100644 tests/unit/test_client_timeouts.py diff --git a/src/apify_client/_http_client.py b/src/apify_client/_http_client.py index 2a23516f..a3fb20bc 100644 --- a/src/apify_client/_http_client.py +++ b/src/apify_client/_http_client.py @@ -143,6 +143,7 @@ def call( json: JSONSerializable | None = None, stream: bool | None = None, parse_response: bool | None = True, + timeout_secs: int | None = None, ) -> httpx.Response: log_context.method.set(method) log_context.url.set(url) @@ -170,6 +171,16 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response: params=params, content=content, ) + + # Increase timeout with each attempt. Max timeout is bounded by the client timeout. + timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1)) + request.extensions['timeout'] = { + 'connect': timeout, + 'pool': timeout, + 'read': timeout, + 'write': timeout, + } + response = httpx_client.send( request=request, stream=stream or False, @@ -225,6 +236,7 @@ async def call( json: JSONSerializable | None = None, stream: bool | None = None, parse_response: bool | None = True, + timeout_secs: int | None = None, ) -> httpx.Response: log_context.method.set(method) log_context.url.set(url) @@ -249,6 +261,16 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response params=params, content=content, ) + + # Increase timeout with each attempt. Max timeout is bounded by the client timeout. + timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1)) + request.extensions['timeout'] = { + 'connect': timeout, + 'pool': timeout, + 'read': timeout, + 'write': timeout, + } + response = await httpx_async_client.send( request=request, stream=stream or False, diff --git a/src/apify_client/clients/base/resource_client.py b/src/apify_client/clients/base/resource_client.py index f0d39401..37e2d3b0 100644 --- a/src/apify_client/clients/base/resource_client.py +++ b/src/apify_client/clients/base/resource_client.py @@ -11,12 +11,13 @@ class ResourceClient(BaseClient): """Base class for sub-clients manipulating a single resource.""" - def _get(self) -> dict | None: + def _get(self, timeout_secs: int | None = None) -> dict | None: try: response = self.http_client.call( url=self.url, method='GET', params=self._params(), + timeout_secs=timeout_secs, ) return parse_date_fields(pluck_data(response.json())) @@ -26,22 +27,24 @@ def _get(self) -> dict | None: return None - def _update(self, updated_fields: dict) -> dict: + def _update(self, updated_fields: dict, timeout_secs: int | None = None) -> dict: response = self.http_client.call( url=self._url(), method='PUT', params=self._params(), json=updated_fields, + timeout_secs=timeout_secs, ) return parse_date_fields(pluck_data(response.json())) - def _delete(self) -> None: + def _delete(self, timeout_secs: int | None = None) -> None: try: self.http_client.call( url=self._url(), method='DELETE', params=self._params(), + timeout_secs=timeout_secs, ) except ApifyApiError as exc: @@ -52,12 +55,13 @@ def _delete(self) -> None: class ResourceClientAsync(BaseClientAsync): """Base class for async sub-clients manipulating a single resource.""" - async def _get(self) -> dict | None: + async def _get(self, timeout_secs: int | None = None) -> dict | None: try: response = await self.http_client.call( url=self.url, method='GET', params=self._params(), + timeout_secs=timeout_secs, ) return parse_date_fields(pluck_data(response.json())) @@ -67,22 +71,24 @@ async def _get(self) -> dict | None: return None - async def _update(self, updated_fields: dict) -> dict: + async def _update(self, updated_fields: dict, timeout_secs: int | None = None) -> dict: response = await self.http_client.call( url=self._url(), method='PUT', params=self._params(), json=updated_fields, + timeout_secs=timeout_secs, ) return parse_date_fields(pluck_data(response.json())) - async def _delete(self) -> None: + async def _delete(self, timeout_secs: int | None = None) -> None: try: await self.http_client.call( url=self._url(), method='DELETE', params=self._params(), + timeout_secs=timeout_secs, ) except ApifyApiError as exc: diff --git a/src/apify_client/clients/resource_clients/dataset.py b/src/apify_client/clients/resource_clients/dataset.py index 433ccb7c..adf7134b 100644 --- a/src/apify_client/clients/resource_clients/dataset.py +++ b/src/apify_client/clients/resource_clients/dataset.py @@ -17,6 +17,9 @@ import httpx from apify_shared.types import JSONSerializable +_SMALL_TIMEOUT = 5 # For fast and common actions. Suitable for idempotent actions. +_MEDIUM_TIMEOUT = 30 # For actions that may take longer. + class DatasetClient(ResourceClient): """Sub-client for manipulating a single dataset.""" @@ -34,7 +37,7 @@ def get(self) -> dict | None: Returns: The retrieved dataset, or None, if it does not exist. """ - return self._get() + return self._get(timeout_secs=_SMALL_TIMEOUT) def update(self, *, name: str | None = None) -> dict: """Update the dataset with specified fields. @@ -49,14 +52,14 @@ def update(self, *, name: str | None = None) -> dict: """ updated_fields = {'name': name} - return self._update(filter_out_none_values_recursively(updated_fields)) + return self._update(filter_out_none_values_recursively(updated_fields), timeout_secs=_SMALL_TIMEOUT) def delete(self) -> None: """Delete the dataset. https://docs.apify.com/api/v2#/reference/datasets/dataset/delete-dataset """ - return self._delete() + return self._delete(timeout_secs=_SMALL_TIMEOUT) def list_items( self, @@ -539,6 +542,7 @@ def push_items(self, items: JSONSerializable) -> None: params=self._params(), data=data, json=json, + timeout_secs=_MEDIUM_TIMEOUT, ) def get_statistics(self) -> dict | None: @@ -554,6 +558,7 @@ def get_statistics(self) -> dict | None: url=self._url('statistics'), method='GET', params=self._params(), + timeout_secs=_SMALL_TIMEOUT, ) return pluck_data(response.json()) except ApifyApiError as exc: @@ -578,7 +583,7 @@ async def get(self) -> dict | None: Returns: The retrieved dataset, or None, if it does not exist. """ - return await self._get() + return await self._get(timeout_secs=_SMALL_TIMEOUT) async def update(self, *, name: str | None = None) -> dict: """Update the dataset with specified fields. @@ -593,14 +598,14 @@ async def update(self, *, name: str | None = None) -> dict: """ updated_fields = {'name': name} - return await self._update(filter_out_none_values_recursively(updated_fields)) + return await self._update(filter_out_none_values_recursively(updated_fields), timeout_secs=_SMALL_TIMEOUT) async def delete(self) -> None: """Delete the dataset. https://docs.apify.com/api/v2#/reference/datasets/dataset/delete-dataset """ - return await self._delete() + return await self._delete(timeout_secs=_SMALL_TIMEOUT) async def list_items( self, @@ -990,6 +995,7 @@ async def push_items(self, items: JSONSerializable) -> None: params=self._params(), data=data, json=json, + timeout_secs=_MEDIUM_TIMEOUT, ) async def get_statistics(self) -> dict | None: @@ -1005,6 +1011,7 @@ async def get_statistics(self) -> dict | None: url=self._url('statistics'), method='GET', params=self._params(), + timeout_secs=_SMALL_TIMEOUT, ) return pluck_data(response.json()) except ApifyApiError as exc: diff --git a/src/apify_client/clients/resource_clients/key_value_store.py b/src/apify_client/clients/resource_clients/key_value_store.py index 22796882..6175a96a 100644 --- a/src/apify_client/clients/resource_clients/key_value_store.py +++ b/src/apify_client/clients/resource_clients/key_value_store.py @@ -13,6 +13,9 @@ if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterator +_SMALL_TIMEOUT = 5 # For fast and common actions. Suitable for idempotent actions. +_MEDIUM_TIMEOUT = 30 # For actions that may take longer. + class KeyValueStoreClient(ResourceClient): """Sub-client for manipulating a single key-value store.""" @@ -30,7 +33,7 @@ def get(self) -> dict | None: Returns: The retrieved key-value store, or None if it does not exist. """ - return self._get() + return self._get(timeout_secs=_SMALL_TIMEOUT) def update(self, *, name: str | None = None) -> dict: """Update the key-value store with specified fields. @@ -54,7 +57,7 @@ def delete(self) -> None: https://docs.apify.com/api/v2#/reference/key-value-stores/store-object/delete-store """ - return self._delete() + return self._delete(timeout_secs=_SMALL_TIMEOUT) def list_keys(self, *, limit: int | None = None, exclusive_start_key: str | None = None) -> dict: """List the keys in the key-value store. @@ -74,6 +77,7 @@ def list_keys(self, *, limit: int | None = None, exclusive_start_key: str | None url=self._url('keys'), method='GET', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -236,6 +240,7 @@ def delete_record(self, key: str) -> None: url=self._url(f'records/{key}'), method='DELETE', params=self._params(), + timeout_secs=_SMALL_TIMEOUT, ) @@ -255,7 +260,7 @@ async def get(self) -> dict | None: Returns: The retrieved key-value store, or None if it does not exist. """ - return await self._get() + return await self._get(timeout_secs=_SMALL_TIMEOUT) async def update(self, *, name: str | None = None) -> dict: """Update the key-value store with specified fields. @@ -279,7 +284,7 @@ async def delete(self) -> None: https://docs.apify.com/api/v2#/reference/key-value-stores/store-object/delete-store """ - return await self._delete() + return await self._delete(timeout_secs=_SMALL_TIMEOUT) async def list_keys(self, *, limit: int | None = None, exclusive_start_key: str | None = None) -> dict: """List the keys in the key-value store. @@ -299,6 +304,7 @@ async def list_keys(self, *, limit: int | None = None, exclusive_start_key: str url=self._url('keys'), method='GET', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -440,4 +446,5 @@ async def delete_record(self, key: str) -> None: url=self._url(f'records/{key}'), method='DELETE', params=self._params(), + timeout_secs=_SMALL_TIMEOUT, ) diff --git a/src/apify_client/clients/resource_clients/request_queue.py b/src/apify_client/clients/resource_clients/request_queue.py index d9bc8325..bb028b0a 100644 --- a/src/apify_client/clients/resource_clients/request_queue.py +++ b/src/apify_client/clients/resource_clients/request_queue.py @@ -25,6 +25,9 @@ _MAX_PAYLOAD_SIZE_BYTES = 9 * 1024 * 1024 # 9 MB _SAFETY_BUFFER_PERCENT = 0.01 / 100 # 0.01% +_SMALL_TIMEOUT = 5 # For fast and common actions. Suitable for idempotent actions. +_MEDIUM_TIMEOUT = 30 # For actions that may take longer. + class BatchAddRequestsResult(TypedDict): """Result of the batch add requests operation. @@ -78,7 +81,7 @@ def get(self) -> dict | None: Returns: The retrieved request queue, or None, if it does not exist. """ - return self._get() + return self._get(timeout_secs=_SMALL_TIMEOUT) def update(self, *, name: str | None = None) -> dict: """Update the request queue with specified fields. @@ -95,14 +98,14 @@ def update(self, *, name: str | None = None) -> dict: 'name': name, } - return self._update(filter_out_none_values_recursively(updated_fields)) + return self._update(filter_out_none_values_recursively(updated_fields), timeout_secs=_SMALL_TIMEOUT) def delete(self) -> None: """Delete the request queue. https://docs.apify.com/api/v2#/reference/request-queues/queue/delete-request-queue """ - return self._delete() + return self._delete(timeout_secs=_SMALL_TIMEOUT) def list_head(self, *, limit: int | None = None) -> dict: """Retrieve a given number of requests from the beginning of the queue. @@ -121,6 +124,7 @@ def list_head(self, *, limit: int | None = None) -> dict: url=self._url('head'), method='GET', params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -143,6 +147,7 @@ def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> dic url=self._url('head/lock'), method='POST', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -166,6 +171,7 @@ def add_request(self, request: dict, *, forefront: bool | None = None) -> dict: method='POST', json=request, params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -186,6 +192,7 @@ def get_request(self, request_id: str) -> dict | None: url=self._url(f'requests/{request_id}'), method='GET', params=self._params(), + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -215,6 +222,7 @@ def update_request(self, request: dict, *, forefront: bool | None = None) -> dic method='PUT', json=request, params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -235,6 +243,7 @@ def delete_request(self, request_id: str) -> None: url=self._url(f'requests/{request_id}'), method='DELETE', params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) def prolong_request_lock( @@ -259,6 +268,7 @@ def prolong_request_lock( url=self._url(f'requests/{request_id}/lock'), method='PUT', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -278,6 +288,7 @@ def delete_request_lock(self, request_id: str, *, forefront: bool | None = None) url=self._url(f'requests/{request_id}/lock'), method='DELETE', params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) def batch_add_requests( @@ -342,6 +353,7 @@ def batch_add_requests( method='POST', params=request_params, json=list(batch.requests), + timeout_secs=_MEDIUM_TIMEOUT, ) # Retry if the request failed and the retry limit has not been reached. @@ -376,6 +388,7 @@ def batch_delete_requests(self, requests: list[dict]) -> dict: method='DELETE', params=request_params, json=requests, + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -400,6 +413,7 @@ def list_requests( url=self._url('requests'), method='GET', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -432,7 +446,7 @@ async def get(self) -> dict | None: Returns: The retrieved request queue, or None, if it does not exist. """ - return await self._get() + return await self._get(timeout_secs=_SMALL_TIMEOUT) async def update(self, *, name: str | None = None) -> dict: """Update the request queue with specified fields. @@ -449,14 +463,14 @@ async def update(self, *, name: str | None = None) -> dict: 'name': name, } - return await self._update(filter_out_none_values_recursively(updated_fields)) + return await self._update(filter_out_none_values_recursively(updated_fields), timeout_secs=_SMALL_TIMEOUT) async def delete(self) -> None: """Delete the request queue. https://docs.apify.com/api/v2#/reference/request-queues/queue/delete-request-queue """ - return await self._delete() + return await self._delete(timeout_secs=_SMALL_TIMEOUT) async def list_head(self, *, limit: int | None = None) -> dict: """Retrieve a given number of requests from the beginning of the queue. @@ -475,6 +489,7 @@ async def list_head(self, *, limit: int | None = None) -> dict: url=self._url('head'), method='GET', params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -497,6 +512,7 @@ async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) url=self._url('head/lock'), method='POST', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -520,6 +536,7 @@ async def add_request(self, request: dict, *, forefront: bool | None = None) -> method='POST', json=request, params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -540,6 +557,7 @@ async def get_request(self, request_id: str) -> dict | None: url=self._url(f'requests/{request_id}'), method='GET', params=self._params(), + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -569,6 +587,7 @@ async def update_request(self, request: dict, *, forefront: bool | None = None) method='PUT', json=request, params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -587,6 +606,7 @@ async def delete_request(self, request_id: str) -> None: url=self._url(f'requests/{request_id}'), method='DELETE', params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) async def prolong_request_lock( @@ -611,6 +631,7 @@ async def prolong_request_lock( url=self._url(f'requests/{request_id}/lock'), method='PUT', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -635,6 +656,7 @@ async def delete_request_lock( url=self._url(f'requests/{request_id}/lock'), method='DELETE', params=request_params, + timeout_secs=_SMALL_TIMEOUT, ) async def _batch_add_requests_worker( @@ -667,6 +689,7 @@ async def _batch_add_requests_worker( method='POST', params=request_params, json=list(batch.requests), + timeout_secs=_MEDIUM_TIMEOUT, ) response_parsed = parse_date_fields(pluck_data(response.json())) @@ -787,6 +810,7 @@ async def batch_delete_requests(self, requests: list[dict]) -> dict: method='DELETE', params=request_params, json=requests, + timeout_secs=_SMALL_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) @@ -810,6 +834,7 @@ async def list_requests( url=self._url('requests'), method='GET', params=request_params, + timeout_secs=_MEDIUM_TIMEOUT, ) return parse_date_fields(pluck_data(response.json())) diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py new file mode 100644 index 00000000..43cc3058 --- /dev/null +++ b/tests/unit/test_client_timeouts.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from functools import partial + +import pytest +import respx +from httpx import Request, Response, TimeoutException + +from apify_client import ApifyClient +from apify_client._http_client import HTTPClient, HTTPClientAsync +from apify_client.clients import DatasetClient, KeyValueStoreClient, RequestQueueClient +from apify_client.clients.resource_clients import dataset, request_queue +from apify_client.clients.resource_clients import key_value_store as kvs + + +class EndOfTestError(Exception): + """Custom exception that is raised after the relevant part of the code is executed to stop the test.""" + + +def assert_timeout(expected_timeout: int, request: Request) -> Response: + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + raise EndOfTestError + + +@respx.mock +async def test_dynamic_timeout_async_client() -> None: + """Tests timeout values for request with retriable errors. + + Values should increase with each attempt, starting from initial call value and bounded by the client timeout value. + """ + should_raise_error = iter((True, True, True, False)) + call_timeout = 1 + client_timeout = 5 + expected_timeouts = iter((call_timeout, 2, 4, client_timeout)) + + def check_timeout(request: Request) -> Response: + expected_timeout = next(expected_timeouts) + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + if next(should_raise_error): + raise TimeoutException('This error can be retired') + return Response(200) + + respx.get('https://example.com').mock(side_effect=check_timeout) + await HTTPClientAsync(timeout_secs=client_timeout).call( + method='GET', url='https://example.com', timeout_secs=call_timeout + ) + + +@respx.mock +def test_dynamic_timeout_sync_client() -> None: + """Tests timeout values for request with retriable errors. + + Values should increase with each attempt, starting from initial call value and bounded by the client timeout value. + """ + should_raise_error = iter((True, True, True, False)) + call_timeout = 1 + client_timeout = 5 + expected_timeouts = iter((call_timeout, 2, 4, client_timeout)) + + def check_timeout(request: Request) -> Response: + expected_timeout = next(expected_timeouts) + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + if next(should_raise_error): + raise TimeoutException('This error can be retired') + return Response(200) + + respx.get('https://example.com').mock(side_effect=check_timeout) + HTTPClient(timeout_secs=client_timeout).call(method='GET', url='https://example.com', timeout_secs=call_timeout) + + +@pytest.mark.parametrize( + ('client_type', 'method', 'expected_timeout', 'kwargs'), + [ + (DatasetClient, 'get', dataset._SMALL_TIMEOUT, {}), + (DatasetClient, 'update', dataset._SMALL_TIMEOUT, {}), + (DatasetClient, 'delete', dataset._SMALL_TIMEOUT, {}), + (DatasetClient, 'list_items', 360, {}), + (DatasetClient, 'download_items', 360, {}), + (DatasetClient, 'get_items_as_bytes', 360, {}), + (DatasetClient, 'push_items', dataset._MEDIUM_TIMEOUT, {'items': {}}), + (DatasetClient, 'get_statistics', dataset._SMALL_TIMEOUT, {}), + (KeyValueStoreClient, 'get', kvs._SMALL_TIMEOUT, {}), + (KeyValueStoreClient, 'update', 360, {}), + (KeyValueStoreClient, 'delete', kvs._SMALL_TIMEOUT, {}), + (KeyValueStoreClient, 'list_keys', kvs._MEDIUM_TIMEOUT, {}), + (KeyValueStoreClient, 'get_record', 360, {'key': 'some_key'}), + (KeyValueStoreClient, 'get_record_as_bytes', 360, {'key': 'some_key'}), + (KeyValueStoreClient, 'set_record', 360, {'key': 'some_key', 'value': 'some_value'}), + (KeyValueStoreClient, 'delete_record', kvs._SMALL_TIMEOUT, {'key': 'some_key'}), + (RequestQueueClient, 'get', request_queue._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'update', kvs._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'delete', kvs._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'list_head', kvs._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'list_and_lock_head', kvs._MEDIUM_TIMEOUT, {'lock_secs': 1}), + (RequestQueueClient, 'add_request', kvs._SMALL_TIMEOUT, {'request': {}}), + (RequestQueueClient, 'get_request', kvs._SMALL_TIMEOUT, {'request_id': 'some_id'}), + (RequestQueueClient, 'update_request', kvs._MEDIUM_TIMEOUT, {'request': {'id': 123}}), + (RequestQueueClient, 'delete_request', kvs._SMALL_TIMEOUT, {'request_id': 123}), + (RequestQueueClient, 'prolong_request_lock', kvs._MEDIUM_TIMEOUT, {'request_id': 123, 'lock_secs': 1}), + (RequestQueueClient, 'delete_request_lock', kvs._SMALL_TIMEOUT, {'request_id': 123}), + (RequestQueueClient, 'batch_add_requests', kvs._MEDIUM_TIMEOUT, {'requests': [{}]}), + (RequestQueueClient, 'batch_delete_requests', kvs._SMALL_TIMEOUT, {'requests': [{}]}), + (RequestQueueClient, 'list_requests', kvs._MEDIUM_TIMEOUT, {}), + ], +) +@respx.mock +def test_specific_timeouts_for_specific_endpoints_sync( + client_type: type[DatasetClient, KeyValueStoreClient, RequestQueueClient], + method: str, + kwargs: dict, + expected_timeout: int, +) -> None: + respx.route(host='example.com').mock(side_effect=partial(assert_timeout, expected_timeout)) + client = client_type(base_url='https://example.com', root_client=ApifyClient(), http_client=HTTPClient()) + with pytest.raises(EndOfTestError): + getattr(client, method)(**kwargs) From 2aa165f4243393936f22d3cee4e30f6bc9620264 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 9 Apr 2025 11:57:02 +0200 Subject: [PATCH 2/5] Add async test, fix Mypy issues --- tests/unit/test_client_timeouts.py | 86 ++++++++++++++++++------------ 1 file changed, 53 insertions(+), 33 deletions(-) diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py index 43cc3058..5c3d58cb 100644 --- a/tests/unit/test_client_timeouts.py +++ b/tests/unit/test_client_timeouts.py @@ -83,44 +83,47 @@ def check_timeout(request: Request) -> Response: HTTPClient(timeout_secs=client_timeout).call(method='GET', url='https://example.com', timeout_secs=call_timeout) +_timeout_params = [ + (DatasetClient, 'get', dataset._SMALL_TIMEOUT, {}), + (DatasetClient, 'update', dataset._SMALL_TIMEOUT, {}), + (DatasetClient, 'delete', dataset._SMALL_TIMEOUT, {}), + (DatasetClient, 'list_items', 360, {}), + (DatasetClient, 'download_items', 360, {}), + (DatasetClient, 'get_items_as_bytes', 360, {}), + (DatasetClient, 'push_items', dataset._MEDIUM_TIMEOUT, {'items': {}}), + (DatasetClient, 'get_statistics', dataset._SMALL_TIMEOUT, {}), + (KeyValueStoreClient, 'get', kvs._SMALL_TIMEOUT, {}), + (KeyValueStoreClient, 'update', 360, {}), + (KeyValueStoreClient, 'delete', kvs._SMALL_TIMEOUT, {}), + (KeyValueStoreClient, 'list_keys', kvs._MEDIUM_TIMEOUT, {}), + (KeyValueStoreClient, 'get_record', 360, {'key': 'some_key'}), + (KeyValueStoreClient, 'get_record_as_bytes', 360, {'key': 'some_key'}), + (KeyValueStoreClient, 'set_record', 360, {'key': 'some_key', 'value': 'some_value'}), + (KeyValueStoreClient, 'delete_record', kvs._SMALL_TIMEOUT, {'key': 'some_key'}), + (RequestQueueClient, 'get', request_queue._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'update', kvs._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'delete', kvs._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'list_head', kvs._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'list_and_lock_head', kvs._MEDIUM_TIMEOUT, {'lock_secs': 1}), + (RequestQueueClient, 'add_request', kvs._SMALL_TIMEOUT, {'request': {}}), + (RequestQueueClient, 'get_request', kvs._SMALL_TIMEOUT, {'request_id': 'some_id'}), + (RequestQueueClient, 'update_request', kvs._MEDIUM_TIMEOUT, {'request': {'id': 123}}), + (RequestQueueClient, 'delete_request', kvs._SMALL_TIMEOUT, {'request_id': 123}), + (RequestQueueClient, 'prolong_request_lock', kvs._MEDIUM_TIMEOUT, {'request_id': 123, 'lock_secs': 1}), + (RequestQueueClient, 'delete_request_lock', kvs._SMALL_TIMEOUT, {'request_id': 123}), + (RequestQueueClient, 'batch_add_requests', kvs._MEDIUM_TIMEOUT, {'requests': [{}]}), + (RequestQueueClient, 'batch_delete_requests', kvs._SMALL_TIMEOUT, {'requests': [{}]}), + (RequestQueueClient, 'list_requests', kvs._MEDIUM_TIMEOUT, {}), +] + + @pytest.mark.parametrize( ('client_type', 'method', 'expected_timeout', 'kwargs'), - [ - (DatasetClient, 'get', dataset._SMALL_TIMEOUT, {}), - (DatasetClient, 'update', dataset._SMALL_TIMEOUT, {}), - (DatasetClient, 'delete', dataset._SMALL_TIMEOUT, {}), - (DatasetClient, 'list_items', 360, {}), - (DatasetClient, 'download_items', 360, {}), - (DatasetClient, 'get_items_as_bytes', 360, {}), - (DatasetClient, 'push_items', dataset._MEDIUM_TIMEOUT, {'items': {}}), - (DatasetClient, 'get_statistics', dataset._SMALL_TIMEOUT, {}), - (KeyValueStoreClient, 'get', kvs._SMALL_TIMEOUT, {}), - (KeyValueStoreClient, 'update', 360, {}), - (KeyValueStoreClient, 'delete', kvs._SMALL_TIMEOUT, {}), - (KeyValueStoreClient, 'list_keys', kvs._MEDIUM_TIMEOUT, {}), - (KeyValueStoreClient, 'get_record', 360, {'key': 'some_key'}), - (KeyValueStoreClient, 'get_record_as_bytes', 360, {'key': 'some_key'}), - (KeyValueStoreClient, 'set_record', 360, {'key': 'some_key', 'value': 'some_value'}), - (KeyValueStoreClient, 'delete_record', kvs._SMALL_TIMEOUT, {'key': 'some_key'}), - (RequestQueueClient, 'get', request_queue._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'update', kvs._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'delete', kvs._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'list_head', kvs._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'list_and_lock_head', kvs._MEDIUM_TIMEOUT, {'lock_secs': 1}), - (RequestQueueClient, 'add_request', kvs._SMALL_TIMEOUT, {'request': {}}), - (RequestQueueClient, 'get_request', kvs._SMALL_TIMEOUT, {'request_id': 'some_id'}), - (RequestQueueClient, 'update_request', kvs._MEDIUM_TIMEOUT, {'request': {'id': 123}}), - (RequestQueueClient, 'delete_request', kvs._SMALL_TIMEOUT, {'request_id': 123}), - (RequestQueueClient, 'prolong_request_lock', kvs._MEDIUM_TIMEOUT, {'request_id': 123, 'lock_secs': 1}), - (RequestQueueClient, 'delete_request_lock', kvs._SMALL_TIMEOUT, {'request_id': 123}), - (RequestQueueClient, 'batch_add_requests', kvs._MEDIUM_TIMEOUT, {'requests': [{}]}), - (RequestQueueClient, 'batch_delete_requests', kvs._SMALL_TIMEOUT, {'requests': [{}]}), - (RequestQueueClient, 'list_requests', kvs._MEDIUM_TIMEOUT, {}), - ], + _timeout_params, ) @respx.mock def test_specific_timeouts_for_specific_endpoints_sync( - client_type: type[DatasetClient, KeyValueStoreClient, RequestQueueClient], + client_type: type[DatasetClient | KeyValueStoreClient | RequestQueueClient], method: str, kwargs: dict, expected_timeout: int, @@ -129,3 +132,20 @@ def test_specific_timeouts_for_specific_endpoints_sync( client = client_type(base_url='https://example.com', root_client=ApifyClient(), http_client=HTTPClient()) with pytest.raises(EndOfTestError): getattr(client, method)(**kwargs) + + +@pytest.mark.parametrize( + ('client_type', 'method', 'expected_timeout', 'kwargs'), + _timeout_params, +) +@respx.mock +async def test_specific_timeouts_for_specific_endpoints_async( + client_type: type[DatasetClient | KeyValueStoreClient | RequestQueueClient], + method: str, + kwargs: dict, + expected_timeout: int, +) -> None: + respx.route(host='example.com').mock(side_effect=partial(assert_timeout, expected_timeout)) + client = client_type(base_url='https://example.com', root_client=ApifyClient(), http_client=HTTPClient()) + with pytest.raises(EndOfTestError): + await getattr(client, method)(**kwargs) From bdb814ae0b5b8b8ac4f8b4bc15fdb0f453890da6 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 9 Apr 2025 13:10:44 +0200 Subject: [PATCH 3/5] Introduce default client timeout --- src/apify_client/client.py | 9 ++++--- tests/unit/test_client_timeouts.py | 39 +++++++++++++++++------------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/apify_client/client.py b/src/apify_client/client.py index 7aae143b..435ef546 100644 --- a/src/apify_client/client.py +++ b/src/apify_client/client.py @@ -54,6 +54,7 @@ ) DEFAULT_API_URL = 'https://api.apify.com' +DEFAULT_TIMEOUT = 360 API_VERSION = 'v2' @@ -68,7 +69,7 @@ def __init__( api_url: str | None = None, max_retries: int | None = 8, min_delay_between_retries_millis: int | None = 500, - timeout_secs: int | None = 360, + timeout_secs: int | None = DEFAULT_TIMEOUT, ) -> None: """Initialize a new instance. @@ -85,7 +86,7 @@ def __init__( self.base_url = f'{api_url}/{API_VERSION}' self.max_retries = max_retries or 8 self.min_delay_between_retries_millis = min_delay_between_retries_millis or 500 - self.timeout_secs = timeout_secs or 360 + self.timeout_secs = timeout_secs or DEFAULT_TIMEOUT def _options(self) -> dict: return { @@ -107,7 +108,7 @@ def __init__( api_url: str | None = None, max_retries: int | None = 8, min_delay_between_retries_millis: int | None = 500, - timeout_secs: int | None = 360, + timeout_secs: int | None = DEFAULT_TIMEOUT, ) -> None: """Initialize a new instance. @@ -290,7 +291,7 @@ def __init__( api_url: str | None = None, max_retries: int | None = 8, min_delay_between_retries_millis: int | None = 500, - timeout_secs: int | None = 360, + timeout_secs: int | None = DEFAULT_TIMEOUT, ) -> None: """Initialize a new instance. diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py index 5c3d58cb..a3ceacb5 100644 --- a/tests/unit/test_client_timeouts.py +++ b/tests/unit/test_client_timeouts.py @@ -8,6 +8,7 @@ from apify_client import ApifyClient from apify_client._http_client import HTTPClient, HTTPClientAsync +from apify_client.client import DEFAULT_TIMEOUT from apify_client.clients import DatasetClient, KeyValueStoreClient, RequestQueueClient from apify_client.clients.resource_clients import dataset, request_queue from apify_client.clients.resource_clients import key_value_store as kvs @@ -17,16 +18,6 @@ class EndOfTestError(Exception): """Custom exception that is raised after the relevant part of the code is executed to stop the test.""" -def assert_timeout(expected_timeout: int, request: Request) -> Response: - assert request.extensions['timeout'] == { - 'connect': expected_timeout, - 'pool': expected_timeout, - 'read': expected_timeout, - 'write': expected_timeout, - } - raise EndOfTestError - - @respx.mock async def test_dynamic_timeout_async_client() -> None: """Tests timeout values for request with retriable errors. @@ -83,22 +74,36 @@ def check_timeout(request: Request) -> Response: HTTPClient(timeout_secs=client_timeout).call(method='GET', url='https://example.com', timeout_secs=call_timeout) +def assert_timeout(expected_timeout: int, request: Request) -> Response: + """Assert that correct timeouts are set on the request and raise `EndOfTestError`. + + This is intended for tests that are only testing timeout value and further execution of the code is not desired. + """ + assert request.extensions['timeout'] == { + 'connect': expected_timeout, + 'pool': expected_timeout, + 'read': expected_timeout, + 'write': expected_timeout, + } + raise EndOfTestError + + _timeout_params = [ (DatasetClient, 'get', dataset._SMALL_TIMEOUT, {}), (DatasetClient, 'update', dataset._SMALL_TIMEOUT, {}), (DatasetClient, 'delete', dataset._SMALL_TIMEOUT, {}), - (DatasetClient, 'list_items', 360, {}), - (DatasetClient, 'download_items', 360, {}), - (DatasetClient, 'get_items_as_bytes', 360, {}), + (DatasetClient, 'list_items', DEFAULT_TIMEOUT, {}), + (DatasetClient, 'download_items', DEFAULT_TIMEOUT, {}), + (DatasetClient, 'get_items_as_bytes', DEFAULT_TIMEOUT, {}), (DatasetClient, 'push_items', dataset._MEDIUM_TIMEOUT, {'items': {}}), (DatasetClient, 'get_statistics', dataset._SMALL_TIMEOUT, {}), (KeyValueStoreClient, 'get', kvs._SMALL_TIMEOUT, {}), - (KeyValueStoreClient, 'update', 360, {}), + (KeyValueStoreClient, 'update', DEFAULT_TIMEOUT, {}), (KeyValueStoreClient, 'delete', kvs._SMALL_TIMEOUT, {}), (KeyValueStoreClient, 'list_keys', kvs._MEDIUM_TIMEOUT, {}), - (KeyValueStoreClient, 'get_record', 360, {'key': 'some_key'}), - (KeyValueStoreClient, 'get_record_as_bytes', 360, {'key': 'some_key'}), - (KeyValueStoreClient, 'set_record', 360, {'key': 'some_key', 'value': 'some_value'}), + (KeyValueStoreClient, 'get_record', DEFAULT_TIMEOUT, {'key': 'some_key'}), + (KeyValueStoreClient, 'get_record_as_bytes', DEFAULT_TIMEOUT, {'key': 'some_key'}), + (KeyValueStoreClient, 'set_record', DEFAULT_TIMEOUT, {'key': 'some_key', 'value': 'some_value'}), (KeyValueStoreClient, 'delete_record', kvs._SMALL_TIMEOUT, {'key': 'some_key'}), (RequestQueueClient, 'get', request_queue._SMALL_TIMEOUT, {}), (RequestQueueClient, 'update', kvs._SMALL_TIMEOUT, {}), From 431a45833e3f4521897e818bf2297df5a0af87b7 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 9 Apr 2025 14:34:53 +0200 Subject: [PATCH 4/5] Use correct timeout consts in tests --- tests/unit/test_client_timeouts.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py index a3ceacb5..c485c378 100644 --- a/tests/unit/test_client_timeouts.py +++ b/tests/unit/test_client_timeouts.py @@ -106,19 +106,19 @@ def assert_timeout(expected_timeout: int, request: Request) -> Response: (KeyValueStoreClient, 'set_record', DEFAULT_TIMEOUT, {'key': 'some_key', 'value': 'some_value'}), (KeyValueStoreClient, 'delete_record', kvs._SMALL_TIMEOUT, {'key': 'some_key'}), (RequestQueueClient, 'get', request_queue._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'update', kvs._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'delete', kvs._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'list_head', kvs._SMALL_TIMEOUT, {}), - (RequestQueueClient, 'list_and_lock_head', kvs._MEDIUM_TIMEOUT, {'lock_secs': 1}), - (RequestQueueClient, 'add_request', kvs._SMALL_TIMEOUT, {'request': {}}), - (RequestQueueClient, 'get_request', kvs._SMALL_TIMEOUT, {'request_id': 'some_id'}), - (RequestQueueClient, 'update_request', kvs._MEDIUM_TIMEOUT, {'request': {'id': 123}}), - (RequestQueueClient, 'delete_request', kvs._SMALL_TIMEOUT, {'request_id': 123}), - (RequestQueueClient, 'prolong_request_lock', kvs._MEDIUM_TIMEOUT, {'request_id': 123, 'lock_secs': 1}), - (RequestQueueClient, 'delete_request_lock', kvs._SMALL_TIMEOUT, {'request_id': 123}), - (RequestQueueClient, 'batch_add_requests', kvs._MEDIUM_TIMEOUT, {'requests': [{}]}), - (RequestQueueClient, 'batch_delete_requests', kvs._SMALL_TIMEOUT, {'requests': [{}]}), - (RequestQueueClient, 'list_requests', kvs._MEDIUM_TIMEOUT, {}), + (RequestQueueClient, 'update', request_queue._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'delete', request_queue._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'list_head', request_queue._SMALL_TIMEOUT, {}), + (RequestQueueClient, 'list_and_lock_head', request_queue._MEDIUM_TIMEOUT, {'lock_secs': 1}), + (RequestQueueClient, 'add_request', request_queue._SMALL_TIMEOUT, {'request': {}}), + (RequestQueueClient, 'get_request', request_queue._SMALL_TIMEOUT, {'request_id': 'some_id'}), + (RequestQueueClient, 'update_request', request_queue._MEDIUM_TIMEOUT, {'request': {'id': 123}}), + (RequestQueueClient, 'delete_request', request_queue._SMALL_TIMEOUT, {'request_id': 123}), + (RequestQueueClient, 'prolong_request_lock', request_queue._MEDIUM_TIMEOUT, {'request_id': 123, 'lock_secs': 1}), + (RequestQueueClient, 'delete_request_lock', request_queue._SMALL_TIMEOUT, {'request_id': 123}), + (RequestQueueClient, 'batch_add_requests', request_queue._MEDIUM_TIMEOUT, {'requests': [{}]}), + (RequestQueueClient, 'batch_delete_requests', request_queue._SMALL_TIMEOUT, {'requests': [{}]}), + (RequestQueueClient, 'list_requests', request_queue._MEDIUM_TIMEOUT, {}), ] From b1d81db3a358f3c4abf04e365256f2e9d2561934 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 22 Apr 2025 10:50:22 +0200 Subject: [PATCH 5/5] Fix typo --- tests/unit/test_client_timeouts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py index c485c378..82362644 100644 --- a/tests/unit/test_client_timeouts.py +++ b/tests/unit/test_client_timeouts.py @@ -38,7 +38,7 @@ def check_timeout(request: Request) -> Response: 'write': expected_timeout, } if next(should_raise_error): - raise TimeoutException('This error can be retired') + raise TimeoutException('This error can be retried') return Response(200) respx.get('https://example.com').mock(side_effect=check_timeout)