From 7ee8af4f20ba94bbc31f76e1272c09a2ecb8ba7c Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 8 Aug 2025 10:54:42 +0200 Subject: [PATCH 1/3] Suggest solving the forefront handling of reclaimed requests Drop forefront info from local chache as it is unreliable and complicates the flow. Only the platform knows the real forefront, do not try to keep imperfect copy of it locally and rather design the system to work without being aware of the cached forefront. --- src/apify/storage_clients/_apify/_models.py | 3 -- .../_apify/_request_queue_client.py | 54 +++++-------------- .../test_crawlers_with_storages.py | 49 +++++++++++++++++ 3 files changed, 62 insertions(+), 44 deletions(-) diff --git a/src/apify/storage_clients/_apify/_models.py b/src/apify/storage_clients/_apify/_models.py index d41e33b2..df981121 100644 --- a/src/apify/storage_clients/_apify/_models.py +++ b/src/apify/storage_clients/_apify/_models.py @@ -105,6 +105,3 @@ class CachedRequest(BaseModel): lock_expires_at: datetime | None = None """The expiration time of the lock on the request.""" - - forefront: bool = False - """Whether the request was added to the forefront of the queue.""" diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 8a6dfa89..2ac9b037 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -358,7 +358,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | self._cache_request( cache_key, processed_request, - forefront=False, hydrated_request=request, ) except Exception as exc: @@ -405,7 +404,6 @@ async def reclaim_request( self._cache_request( cache_key, processed_request, - forefront=forefront, hydrated_request=request, ) @@ -463,9 +461,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None: # Try to prolong the lock if it's expired try: lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds()) - response = await self._prolong_request_lock( - request_id, forefront=cached_entry.forefront, lock_secs=lock_secs - ) + response = await self._prolong_request_lock(request_id, lock_secs=lock_secs) cached_entry.lock_expires_at = response.lock_expires_at except Exception: # If prolonging the lock fails, we lost the request @@ -478,7 +474,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None: try: # Try to acquire or prolong the lock lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds()) - await self._prolong_request_lock(request_id, forefront=False, lock_secs=lock_secs) + await self._prolong_request_lock(request_id, lock_secs=lock_secs) # Fetch the request data request = await self.get_request(request_id) @@ -498,7 +494,6 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None: was_already_present=True, was_already_handled=request.handled_at is not None, ), - forefront=False, hydrated_request=request, ) except Exception as exc: @@ -568,6 +563,8 @@ async def _list_head( queue_has_locked_requests=self._queue_has_locked_requests, lock_time=lock_time, ) + if self._should_check_for_forefront_requests: + self._should_check_for_forefront_requests = False # Otherwise fetch from API lock_time = lock_time or self._DEFAULT_LOCK_TIME @@ -581,16 +578,9 @@ async def _list_head( # Update the queue head cache self._queue_has_locked_requests = response.get('queueHasLockedRequests', False) - # Clear current queue head if we're checking for forefront requests - if self._should_check_for_forefront_requests: - self._queue_head.clear() - self._should_check_for_forefront_requests = False - - # Process and cache the requests - head_id_buffer = list[str]() - forefront_head_id_buffer = list[str]() - - for request_data in response.get('items', []): + # Iterate over new requests and push them to the front of the queue. + # Since we push to the front of the queue, we have to iterate in reverse order to preserve the intended order. + for request_data in reversed(response.get('items', [])): request = Request.model_validate(request_data) # Skip requests without ID or unique key @@ -604,36 +594,20 @@ async def _list_head( ) continue - # Check if this request was already cached and if it was added to forefront - cache_key = unique_key_to_request_id(request.unique_key) - cached_request = self._requests_cache.get(cache_key) - forefront = cached_request.forefront if cached_request else False - - # Add to appropriate buffer based on forefront flag - if forefront: - forefront_head_id_buffer.insert(0, request.id) - else: - head_id_buffer.append(request.id) - # Cache the request self._cache_request( - cache_key, + unique_key_to_request_id(request.unique_key), ProcessedRequest( id=request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=False, ), - forefront=forefront, hydrated_request=request, ) - # Update the queue head deque - for request_id in head_id_buffer: - self._queue_head.append(request_id) - - for request_id in forefront_head_id_buffer: - self._queue_head.appendleft(request_id) + # All new requests are added to the forefront, existing leftover locked requests kept in the end. + self._queue_head.appendleft(request.id) return RequestQueueHead.model_validate(response) @@ -641,14 +615,12 @@ async def _prolong_request_lock( self, request_id: str, *, - forefront: bool = False, lock_secs: int, ) -> ProlongRequestLockResponse: """Prolong the lock on a specific request in the queue. Args: request_id: The identifier of the request whose lock is to be prolonged. - forefront: Whether to put the request in the beginning or the end of the queue after lock expires. lock_secs: The additional amount of time, in seconds, that the request will remain locked. Returns: @@ -656,7 +628,9 @@ async def _prolong_request_lock( """ response = await self._api_client.prolong_request_lock( request_id=request_id, - forefront=forefront, + # All requests reaching this code were the tip of the queue at the moment when they were fetched, + # so if their lock expires, they should be put back to the forefront as their handling is long overdue. + forefront=True, lock_secs=lock_secs, ) @@ -703,7 +677,6 @@ def _cache_request( cache_key: str, processed_request: ProcessedRequest, *, - forefront: bool, hydrated_request: Request | None = None, ) -> None: """Cache a request for future use. @@ -719,5 +692,4 @@ def _cache_request( was_already_handled=processed_request.was_already_handled, hydrated=hydrated_request, lock_expires_at=None, - forefront=forefront, ) diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index cb1f7e2b..525a13d8 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -109,3 +109,52 @@ async def default_handler(_: ParselCrawlingContext) -> None: run_result = await run_actor(actor) assert run_result.status == 'SUCCEEDED' + + +async def test_a() -> None: + """Test that the actor respects max_request_retries.""" + + from apify import Actor + + async with Actor: + rq = await Actor.open_request_queue(name='asdasdd', force_cloud=True) + await rq.drop() + rq = await Actor.open_request_queue(name='asdasdd', force_cloud=True) + Actor.log.info('Request queue opened') + + # Add initial requests + await rq.add_request('https://example.com/1') + await rq.add_request('https://example.com/2') + Actor.log.info('Added initial requests') + + # Fetch one and reclaim to forefront + request1 = await rq.fetch_next_request() + assert request1 is not None, f'request1={request1}' + assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' + Actor.log.info(f'Fetched request: {request1.url}') + + await rq.reclaim_request(request1, forefront=True) + Actor.log.info('Reclaimed request to forefront') + + # Add forefront request + await rq.add_request('https://example.com/priority', forefront=True) + Actor.log.info('Added new forefront request') + + # Fetch all requests and verify forefront behavior + urls_ordered = list[str]() + while next_request := await rq.fetch_next_request(): + urls_ordered.append(next_request.url) + await rq.mark_request_as_handled(next_request) + + Actor.log.info(f'Final order of fetched URLs: {urls_ordered}') + + # Verify that we got all 3 requests + assert len(urls_ordered) == 3, f'len(urls_ordered)={len(urls_ordered)}' + + assert urls_ordered[0] == 'https://example.com/priority', f'urls_ordered[0]={urls_ordered[0]}' + assert urls_ordered[1] == request1.url, ( + f'urls_ordered[1]={urls_ordered[1]}', + f'request1.url={request1.url}', + ) + assert urls_ordered[2] == 'https://example.com/2', f'urls_ordered[2]={urls_ordered[2]}' + Actor.log.info('Request ordering verified successfully') From fd2b804d9b53f5c776dfddde449737dd4c11ee1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20Proch=C3=A1zka?= Date: Fri, 8 Aug 2025 11:02:05 +0200 Subject: [PATCH 2/3] Update test_crawlers_with_storages.py --- .../test_crawlers_with_storages.py | 49 ------------------- 1 file changed, 49 deletions(-) diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index 525a13d8..cb1f7e2b 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -109,52 +109,3 @@ async def default_handler(_: ParselCrawlingContext) -> None: run_result = await run_actor(actor) assert run_result.status == 'SUCCEEDED' - - -async def test_a() -> None: - """Test that the actor respects max_request_retries.""" - - from apify import Actor - - async with Actor: - rq = await Actor.open_request_queue(name='asdasdd', force_cloud=True) - await rq.drop() - rq = await Actor.open_request_queue(name='asdasdd', force_cloud=True) - Actor.log.info('Request queue opened') - - # Add initial requests - await rq.add_request('https://example.com/1') - await rq.add_request('https://example.com/2') - Actor.log.info('Added initial requests') - - # Fetch one and reclaim to forefront - request1 = await rq.fetch_next_request() - assert request1 is not None, f'request1={request1}' - assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' - Actor.log.info(f'Fetched request: {request1.url}') - - await rq.reclaim_request(request1, forefront=True) - Actor.log.info('Reclaimed request to forefront') - - # Add forefront request - await rq.add_request('https://example.com/priority', forefront=True) - Actor.log.info('Added new forefront request') - - # Fetch all requests and verify forefront behavior - urls_ordered = list[str]() - while next_request := await rq.fetch_next_request(): - urls_ordered.append(next_request.url) - await rq.mark_request_as_handled(next_request) - - Actor.log.info(f'Final order of fetched URLs: {urls_ordered}') - - # Verify that we got all 3 requests - assert len(urls_ordered) == 3, f'len(urls_ordered)={len(urls_ordered)}' - - assert urls_ordered[0] == 'https://example.com/priority', f'urls_ordered[0]={urls_ordered[0]}' - assert urls_ordered[1] == request1.url, ( - f'urls_ordered[1]={urls_ordered[1]}', - f'request1.url={request1.url}', - ) - assert urls_ordered[2] == 'https://example.com/2', f'urls_ordered[2]={urls_ordered[2]}' - Actor.log.info('Request ordering verified successfully') From 373b0118c9cfa2a40f65e6d547288d028036ab11 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 11 Aug 2025 09:17:30 +0200 Subject: [PATCH 3/3] Prevent request to linger in the end of the queue unless new forefront requests were added --- .../_apify/_request_queue_client.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 2ac9b037..95af78a1 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -563,7 +563,11 @@ async def _list_head( queue_has_locked_requests=self._queue_has_locked_requests, lock_time=lock_time, ) + + leftover_buffer = list[str]() if self._should_check_for_forefront_requests: + leftover_buffer = list(self._queue_head) + self._queue_head.clear() self._should_check_for_forefront_requests = False # Otherwise fetch from API @@ -578,9 +582,7 @@ async def _list_head( # Update the queue head cache self._queue_has_locked_requests = response.get('queueHasLockedRequests', False) - # Iterate over new requests and push them to the front of the queue. - # Since we push to the front of the queue, we have to iterate in reverse order to preserve the intended order. - for request_data in reversed(response.get('items', [])): + for request_data in response.get('items', []): request = Request.model_validate(request_data) # Skip requests without ID or unique key @@ -606,8 +608,11 @@ async def _list_head( hydrated_request=request, ) - # All new requests are added to the forefront, existing leftover locked requests kept in the end. - self._queue_head.appendleft(request.id) + self._queue_head.append(request.id) + + for leftover_request_id in leftover_buffer: + # After adding new requests to the forefront, any existing leftover locked request is kept in the end. + self._queue_head.append(leftover_request_id) return RequestQueueHead.model_validate(response)