From 9c3db0cc6f530051b08b3e2dfe645ab32129dc06 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 11 Aug 2025 15:03:40 +0200 Subject: [PATCH 01/11] Disable `test_actor_on_platform_max_request_retries` --- tests/integration/test_crawlers_with_storages.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index cb1f7e2b..a1f78bbc 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING +import pytest + if TYPE_CHECKING: from .conftest import MakeActorFunction, RunActorFunction @@ -76,6 +78,7 @@ async def default_handler(context: ParselCrawlingContext) -> None: assert run_result.status == 'SUCCEEDED' +@pytest.mark.skip(reason='https://github.com/apify/apify-sdk-python/issues/540') async def test_actor_on_platform_max_request_retries( make_actor: MakeActorFunction, run_actor: RunActorFunction, From ea9607ef8fd34c582168eac779d4c6f7fd0cdddf Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 12 Aug 2025 10:23:27 +0200 Subject: [PATCH 02/11] Try to solve race conditions by lock Use this commit for testing --- .../_apify/_request_queue_client.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 95af78a1..ecd2d3b7 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from collections import deque from datetime import datetime, timedelta, timezone from logging import getLogger @@ -84,6 +85,9 @@ def __init__( self._assumed_handled_count = 0 """The number of requests we assume have been handled (tracked manually for this instance).""" + self._fetch_lock = asyncio.Lock() + """Fetch lock to minimize race conditions when communicationg with API.""" + @override async def get_metadata(self) -> RequestQueueMetadata: total_count = self._initial_total_count + self._assumed_total_count @@ -291,15 +295,16 @@ async def fetch_next_request(self) -> Request | None: The request or `None` if there are no more pending requests. """ # Ensure the queue head has requests if available - await self._ensure_head_is_non_empty() + async with self._fetch_lock: + await self._ensure_head_is_non_empty() - # If queue head is empty after ensuring, there are no requests - if not self._queue_head: - return None + # If queue head is empty after ensuring, there are no requests + if not self._queue_head: + return None - # Get the next request ID from the queue head - next_request_id = self._queue_head.popleft() - request = await self._get_or_hydrate_request(next_request_id) + # Get the next request ID from the queue head + next_request_id = self._queue_head.popleft() + request = await self._get_or_hydrate_request(next_request_id) # Handle potential inconsistency where request might not be in the main table yet if request is None: @@ -344,6 +349,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) + if cached_request := self._requests_cache[request.id]: + cached_request.was_already_handled = request.was_already_handled try: # Update the request in the API processed_request = await self._update_request(request) From e3c9fc27403f8960a735ec5d8dede4320c952472 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 12 Aug 2025 10:34:21 +0200 Subject: [PATCH 03/11] Reduce the scope of the lock to the minimal needed size --- src/apify/storage_clients/_apify/_request_queue_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index ecd2d3b7..8e73d4a2 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -294,7 +294,7 @@ async def fetch_next_request(self) -> Request | None: Returns: The request or `None` if there are no more pending requests. """ - # Ensure the queue head has requests if available + # Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions. async with self._fetch_lock: await self._ensure_head_is_non_empty() @@ -304,7 +304,8 @@ async def fetch_next_request(self) -> Request | None: # Get the next request ID from the queue head next_request_id = self._queue_head.popleft() - request = await self._get_or_hydrate_request(next_request_id) + + request = await self._get_or_hydrate_request(next_request_id) # Handle potential inconsistency where request might not be in the main table yet if request is None: From 134e8beb5349b640a8c5937794f9951247c55bcf Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 12 Aug 2025 13:49:07 +0200 Subject: [PATCH 04/11] Add with debug stuff --- .../_apify/_request_queue_client.py | 100 +++++++++++------- tests/integration/test_actor_request_queue.py | 45 ++++++-- .../test_crawlers_with_storages.py | 8 +- 3 files changed, 103 insertions(+), 50 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 8e73d4a2..c34ee59e 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -26,6 +26,7 @@ logger = getLogger(__name__) +COUNTER = iter(range(10000)) class ApifyRequestQueueClient(RequestQueueClient): """An Apify platform implementation of the request queue client.""" @@ -294,18 +295,25 @@ async def fetch_next_request(self) -> Request | None: Returns: The request or `None` if there are no more pending requests. """ + call_time = next(COUNTER) # Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions. + logger.debug(f'Before _fetch_lock, {call_time}') async with self._fetch_lock: + logger.debug(f'Fetching, {call_time}') await self._ensure_head_is_non_empty() # If queue head is empty after ensuring, there are no requests if not self._queue_head: + logger.debug(f'Empty, {call_time}') return None # Get the next request ID from the queue head next_request_id = self._queue_head.popleft() + logger.debug(f'New request, {call_time}') + logger.debug(f'Before hydrate, {call_time}') request = await self._get_or_hydrate_request(next_request_id) + logger.debug(f'After hydrate, {call_time}') # Handle potential inconsistency where request might not be in the main table yet if request is None: @@ -324,7 +332,7 @@ async def fetch_next_request(self) -> Request | None: return None # Use get request to ensure we have the full request object. - request = await self.get_request(request.id) + #request = await self.get_request(request.id) This seems redundant if request is None: logger.debug( 'Request fetched from the beginning of queue was not found in the RQ', @@ -332,6 +340,7 @@ async def fetch_next_request(self) -> Request | None: ) return None + logger.debug(f'{request.retry_count=}, {call_time}') return request @override @@ -394,42 +403,48 @@ async def reclaim_request( """ # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. + call_time = next(COUNTER) if request.was_already_handled: request.handled_at = None - try: - # Update the request in the API. - processed_request = await self._update_request(request, forefront=forefront) - processed_request.unique_key = request.unique_key - - # If the request was previously handled, decrement our handled count since - # we're putting it back for processing. - if request.was_already_handled and not processed_request.was_already_handled: - self._assumed_handled_count -= 1 - - # Update the cache - cache_key = unique_key_to_request_id(request.unique_key) - self._cache_request( - cache_key, - processed_request, - hydrated_request=request, - ) + async with self._fetch_lock: + try: + # Update the request in the API. + logger.debug(f'Before _update_request reclaiming, {call_time}') + processed_request = await self._update_request(request, forefront=forefront) + logger.debug(f'After _update_request reclaiming, {call_time}') + processed_request.unique_key = request.unique_key + + # If the request was previously handled, decrement our handled count since + # we're putting it back for processing. + if request.was_already_handled and not processed_request.was_already_handled: + self._assumed_handled_count -= 1 + + # Update the cache + cache_key = unique_key_to_request_id(request.unique_key) + self._cache_request( + cache_key, + processed_request, + hydrated_request=request, + ) - # If we're adding to the forefront, we need to check for forefront requests - # in the next list_head call - if forefront: - self._should_check_for_forefront_requests = True + # If we're adding to the forefront, we need to check for forefront requests + # in the next list_head call + if forefront: + self._should_check_for_forefront_requests = True - # Try to release the lock on the request - try: - await self._delete_request_lock(request.id, forefront=forefront) - except Exception as err: - logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err) - except Exception as exc: - logger.debug(f'Error reclaiming request {request.id}: {exc!s}') - return None - else: - return processed_request + # Try to release the lock on the request + try: + logger.debug(f'Before _delete_request_lock reclaiming, {call_time}') + await self._delete_request_lock(request.id, forefront=forefront) + logger.debug(f'After _delete_request_lock reclaiming, {call_time}') + except Exception as err: + logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err) + except Exception as exc: + logger.debug(f'Error reclaiming request {request.id}: {exc!s}') + return None + else: + return processed_request @override async def is_empty(self) -> bool: @@ -438,9 +453,14 @@ async def is_empty(self) -> bool: Returns: True if the queue is empty, False otherwise. """ - head = await self._list_head(limit=1, lock_time=None) - - return len(head.items) == 0 and not self._queue_has_locked_requests + call_time = next(COUNTER) + logger.debug(f'Before _list_head is_empty, {call_time}') + async with self._fetch_lock: + logger.debug(f'During _list_head is_empty, {call_time}') + head = await self._list_head(limit=1, lock_time=None) + logger.debug(f'After _list_head is_empty, {call_time}') + logger.debug(f'Finish _list_head is_empty, {call_time}') + return len(head.items) == 0 and not self._queue_has_locked_requests async def _ensure_head_is_non_empty(self) -> None: """Ensure that the queue head has requests if they are available in the queue.""" @@ -551,8 +571,9 @@ async def _list_head( A collection of requests from the beginning of the queue. """ # Return from cache if available and we're not checking for new forefront requests + call_time = next(COUNTER) if self._queue_head and not self._should_check_for_forefront_requests: - logger.debug(f'Using cached queue head with {len(self._queue_head)} requests') + logger.debug(f'Using cached queue head with {len(self._queue_head)} requests, {call_time}') # Create a list of requests from the cached queue head items = [] @@ -571,7 +592,7 @@ async def _list_head( queue_has_locked_requests=self._queue_has_locked_requests, lock_time=lock_time, ) - + logger.debug(f'Updating cached queue head with {len(self._queue_head)} requests, {call_time}') leftover_buffer = list[str]() if self._should_check_for_forefront_requests: leftover_buffer = list(self._queue_head) @@ -615,13 +636,14 @@ async def _list_head( ), hydrated_request=request, ) - + logger.debug(f'Adding to head, {call_time}') self._queue_head.append(request.id) + logger.debug(f'Cached queue head with {len(self._queue_head)} requests, {call_time}') for leftover_request_id in leftover_buffer: # After adding new requests to the forefront, any existing leftover locked request is kept in the end. self._queue_head.append(leftover_request_id) - + logger.debug(f'Cached queue head with {len(self._queue_head)} requests, {call_time}') return RequestQueueHead.model_validate(response) async def _prolong_request_lock( diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index d4730b00..1b596d3c 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -98,15 +98,40 @@ async def test_request_queue_is_finished( request_queue_name = generate_unique_resource_name('request_queue') async with Actor: - request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) - await request_queue.add_request(Request.from_url('http://example.com')) - assert not await request_queue.is_finished() + try: + request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) + await request_queue.add_request(Request.from_url('http://example.com')) + assert not await request_queue.is_finished() + + request = await request_queue.fetch_next_request() + assert request is not None + assert not await request_queue.is_finished(), ( + 'RequestQueue should not be finished unless the request is marked as handled.' + ) + + await request_queue.mark_request_as_handled(request) + assert await request_queue.is_finished() + finally: + await request_queue.drop() + - request = await request_queue.fetch_next_request() - assert request is not None - assert not await request_queue.is_finished(), ( - 'RequestQueue should not be finished unless the request is marked as handled.' - ) +async def test_same_request_fetched_twice( + apify_client_async: ApifyClientAsync, + monkeypatch: pytest.MonkeyPatch): + """Test that the same request can be fetched twice from the request queue.""" + monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) - await request_queue.mark_request_as_handled(request) - assert await request_queue.is_finished() + request_queue_name = generate_unique_resource_name('request_queue') + async with Actor: + try: + request_queue = await Actor.open_request_queue(name='same-request-fetch', force_cloud=request_queue_name) + + request = Request.from_url('http://example.com') + await request_queue.add_request(request) + + fetched_request_1 = await request_queue.fetch_next_request() + assert fetched_request_1 is not None + assert fetched_request_1.url == 'http://example.com' + await request_queue.reclaim_request(fetched_request_1) + finally: + await request_queue.drop() diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index cb1f7e2b..e6af7d99 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING +import pytest + if TYPE_CHECKING: from .conftest import MakeActorFunction, RunActorFunction @@ -76,7 +78,8 @@ async def default_handler(context: ParselCrawlingContext) -> None: assert run_result.status == 'SUCCEEDED' -async def test_actor_on_platform_max_request_retries( +@pytest.mark.parametrize('_', range(10)) +async def test_actor_on_platform_max_request_retries(_, make_actor: MakeActorFunction, run_actor: RunActorFunction, ) -> None: @@ -84,11 +87,14 @@ async def test_actor_on_platform_max_request_retries( async def main() -> None: """The crawler entry point.""" + import logging + from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext from apify import Actor async with Actor: + logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG) max_retries = 3 crawler = ParselCrawler(max_request_retries=max_retries) failed_counter = 0 From dde4b46d770fbc600091f79f08ceb5e6472146f0 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 12 Aug 2025 13:51:55 +0200 Subject: [PATCH 05/11] Without debug --- .../_apify/_request_queue_client.py | 35 ------------------- tests/integration/test_actor_request_queue.py | 22 ------------ .../test_crawlers_with_storages.py | 7 +--- 3 files changed, 1 insertion(+), 63 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index c34ee59e..e7f4206a 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -26,7 +26,6 @@ logger = getLogger(__name__) -COUNTER = iter(range(10000)) class ApifyRequestQueueClient(RequestQueueClient): """An Apify platform implementation of the request queue client.""" @@ -295,25 +294,18 @@ async def fetch_next_request(self) -> Request | None: Returns: The request or `None` if there are no more pending requests. """ - call_time = next(COUNTER) # Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions. - logger.debug(f'Before _fetch_lock, {call_time}') async with self._fetch_lock: - logger.debug(f'Fetching, {call_time}') await self._ensure_head_is_non_empty() # If queue head is empty after ensuring, there are no requests if not self._queue_head: - logger.debug(f'Empty, {call_time}') return None # Get the next request ID from the queue head next_request_id = self._queue_head.popleft() - logger.debug(f'New request, {call_time}') - logger.debug(f'Before hydrate, {call_time}') request = await self._get_or_hydrate_request(next_request_id) - logger.debug(f'After hydrate, {call_time}') # Handle potential inconsistency where request might not be in the main table yet if request is None: @@ -331,16 +323,6 @@ async def fetch_next_request(self) -> Request | None: ) return None - # Use get request to ensure we have the full request object. - #request = await self.get_request(request.id) This seems redundant - if request is None: - logger.debug( - 'Request fetched from the beginning of queue was not found in the RQ', - extra={'nextRequestId': next_request_id}, - ) - return None - - logger.debug(f'{request.retry_count=}, {call_time}') return request @override @@ -403,16 +385,13 @@ async def reclaim_request( """ # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. - call_time = next(COUNTER) if request.was_already_handled: request.handled_at = None async with self._fetch_lock: try: # Update the request in the API. - logger.debug(f'Before _update_request reclaiming, {call_time}') processed_request = await self._update_request(request, forefront=forefront) - logger.debug(f'After _update_request reclaiming, {call_time}') processed_request.unique_key = request.unique_key # If the request was previously handled, decrement our handled count since @@ -435,9 +414,7 @@ async def reclaim_request( # Try to release the lock on the request try: - logger.debug(f'Before _delete_request_lock reclaiming, {call_time}') await self._delete_request_lock(request.id, forefront=forefront) - logger.debug(f'After _delete_request_lock reclaiming, {call_time}') except Exception as err: logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err) except Exception as exc: @@ -453,13 +430,8 @@ async def is_empty(self) -> bool: Returns: True if the queue is empty, False otherwise. """ - call_time = next(COUNTER) - logger.debug(f'Before _list_head is_empty, {call_time}') async with self._fetch_lock: - logger.debug(f'During _list_head is_empty, {call_time}') head = await self._list_head(limit=1, lock_time=None) - logger.debug(f'After _list_head is_empty, {call_time}') - logger.debug(f'Finish _list_head is_empty, {call_time}') return len(head.items) == 0 and not self._queue_has_locked_requests async def _ensure_head_is_non_empty(self) -> None: @@ -571,10 +543,7 @@ async def _list_head( A collection of requests from the beginning of the queue. """ # Return from cache if available and we're not checking for new forefront requests - call_time = next(COUNTER) if self._queue_head and not self._should_check_for_forefront_requests: - logger.debug(f'Using cached queue head with {len(self._queue_head)} requests, {call_time}') - # Create a list of requests from the cached queue head items = [] for request_id in list(self._queue_head)[:limit]: @@ -592,7 +561,6 @@ async def _list_head( queue_has_locked_requests=self._queue_has_locked_requests, lock_time=lock_time, ) - logger.debug(f'Updating cached queue head with {len(self._queue_head)} requests, {call_time}') leftover_buffer = list[str]() if self._should_check_for_forefront_requests: leftover_buffer = list(self._queue_head) @@ -636,14 +604,11 @@ async def _list_head( ), hydrated_request=request, ) - logger.debug(f'Adding to head, {call_time}') self._queue_head.append(request.id) - logger.debug(f'Cached queue head with {len(self._queue_head)} requests, {call_time}') for leftover_request_id in leftover_buffer: # After adding new requests to the forefront, any existing leftover locked request is kept in the end. self._queue_head.append(leftover_request_id) - logger.debug(f'Cached queue head with {len(self._queue_head)} requests, {call_time}') return RequestQueueHead.model_validate(response) async def _prolong_request_lock( diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index 1b596d3c..64a846b5 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -113,25 +113,3 @@ async def test_request_queue_is_finished( assert await request_queue.is_finished() finally: await request_queue.drop() - - -async def test_same_request_fetched_twice( - apify_client_async: ApifyClientAsync, - monkeypatch: pytest.MonkeyPatch): - """Test that the same request can be fetched twice from the request queue.""" - monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token) - - request_queue_name = generate_unique_resource_name('request_queue') - async with Actor: - try: - request_queue = await Actor.open_request_queue(name='same-request-fetch', force_cloud=request_queue_name) - - request = Request.from_url('http://example.com') - await request_queue.add_request(request) - - fetched_request_1 = await request_queue.fetch_next_request() - assert fetched_request_1 is not None - assert fetched_request_1.url == 'http://example.com' - await request_queue.reclaim_request(fetched_request_1) - finally: - await request_queue.drop() diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index e6af7d99..a2ba1e4d 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -2,8 +2,6 @@ from typing import TYPE_CHECKING -import pytest - if TYPE_CHECKING: from .conftest import MakeActorFunction, RunActorFunction @@ -78,8 +76,7 @@ async def default_handler(context: ParselCrawlingContext) -> None: assert run_result.status == 'SUCCEEDED' -@pytest.mark.parametrize('_', range(10)) -async def test_actor_on_platform_max_request_retries(_, +async def test_actor_on_platform_max_request_retries( make_actor: MakeActorFunction, run_actor: RunActorFunction, ) -> None: @@ -87,14 +84,12 @@ async def test_actor_on_platform_max_request_retries(_, async def main() -> None: """The crawler entry point.""" - import logging from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext from apify import Actor async with Actor: - logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG) max_retries = 3 crawler = ParselCrawler(max_request_retries=max_retries) failed_counter = 0 From 6c7f128ec4edb6721b1f7186a68bf9c1e0a6c26c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20Proch=C3=A1zka?= Date: Tue, 12 Aug 2025 14:10:27 +0200 Subject: [PATCH 06/11] Update _request_queue_client.py Revert one log deletion --- src/apify/storage_clients/_apify/_request_queue_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index e7f4206a..34f030d0 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -544,6 +544,7 @@ async def _list_head( """ # Return from cache if available and we're not checking for new forefront requests if self._queue_head and not self._should_check_for_forefront_requests: + logger.debug(f'Using cached queue head with {len(self._queue_head)} requests, {call_time}') # Create a list of requests from the cached queue head items = [] for request_id in list(self._queue_head)[:limit]: From 9a5c81215681fd0d42225e12b1b2bb52e0f8480a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20Proch=C3=A1zka?= Date: Tue, 12 Aug 2025 14:15:03 +0200 Subject: [PATCH 07/11] Update _request_queue_client.py --- src/apify/storage_clients/_apify/_request_queue_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 34f030d0..2abff29d 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -544,7 +544,7 @@ async def _list_head( """ # Return from cache if available and we're not checking for new forefront requests if self._queue_head and not self._should_check_for_forefront_requests: - logger.debug(f'Using cached queue head with {len(self._queue_head)} requests, {call_time}') + logger.debug(f'Using cached queue head with {len(self._queue_head)} requests') # Create a list of requests from the cached queue head items = [] for request_id in list(self._queue_head)[:limit]: From f2637d5d1dc7fb801ea016c0e201fc531baf807c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20Proch=C3=A1zka?= Date: Tue, 12 Aug 2025 14:32:38 +0200 Subject: [PATCH 08/11] Return actually usefull code --- .../storage_clients/_apify/_request_queue_client.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 2abff29d..e7012120 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -323,6 +323,15 @@ async def fetch_next_request(self) -> Request | None: ) return None + # Use get request to ensure we have the full request object. + request = await self.get_request(request.id) + if request is None: + logger.debug( + 'Request fetched from the beginning of queue was not found in the RQ', + extra={'nextRequestId': next_request_id}, + ) + return None + return request @override From 060d9d5bb86317c0cecd1e60f23bc7f813b3839c Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 12 Aug 2025 14:38:55 +0200 Subject: [PATCH 09/11] Remove skip from test_actor_on_platform_max_request_retries --- tests/integration/test_crawlers_with_storages.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index 62554e8b..a2ba1e4d 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -2,8 +2,6 @@ from typing import TYPE_CHECKING -import pytest - if TYPE_CHECKING: from .conftest import MakeActorFunction, RunActorFunction @@ -78,7 +76,6 @@ async def default_handler(context: ParselCrawlingContext) -> None: assert run_result.status == 'SUCCEEDED' -@pytest.mark.skip(reason='https://github.com/apify/apify-sdk-python/issues/540') async def test_actor_on_platform_max_request_retries( make_actor: MakeActorFunction, run_actor: RunActorFunction, From 7aa82847a8db041787e0e1e213871e7eb1bb8f80 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 12 Aug 2025 15:10:50 +0200 Subject: [PATCH 10/11] Remove the skipped decorator again --- tests/integration/test_crawlers_with_storages.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index c1238167..a2ba1e4d 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -2,8 +2,6 @@ from typing import TYPE_CHECKING -import pytest - if TYPE_CHECKING: from .conftest import MakeActorFunction, RunActorFunction @@ -78,7 +76,6 @@ async def default_handler(context: ParselCrawlingContext) -> None: assert run_result.status == 'SUCCEEDED' -@pytest.mark.skip(reason='Sometimes crawler does not respect max_request_retries argument, see issue #540') async def test_actor_on_platform_max_request_retries( make_actor: MakeActorFunction, run_actor: RunActorFunction, From 5395029598d204875ca871eb5ef16b98dddf7625 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 12 Aug 2025 16:47:24 +0200 Subject: [PATCH 11/11] Add more explanation comments for the locks --- src/apify/storage_clients/_apify/_request_queue_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index e7012120..519cd95a 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -86,7 +86,7 @@ def __init__( """The number of requests we assume have been handled (tracked manually for this instance).""" self._fetch_lock = asyncio.Lock() - """Fetch lock to minimize race conditions when communicationg with API.""" + """Fetch lock to minimize race conditions when communicating with API.""" @override async def get_metadata(self) -> RequestQueueMetadata: @@ -397,6 +397,7 @@ async def reclaim_request( if request.was_already_handled: request.handled_at = None + # Reclaim with lock to prevent race conditions that could lead to double processing of the same request. async with self._fetch_lock: try: # Update the request in the API. @@ -439,6 +440,8 @@ async def is_empty(self) -> bool: Returns: True if the queue is empty, False otherwise. """ + # Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent. + # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. async with self._fetch_lock: head = await self._list_head(limit=1, lock_time=None) return len(head.items) == 0 and not self._queue_has_locked_requests