diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 95af78a1..519cd95a 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from collections import deque from datetime import datetime, timedelta, timezone from logging import getLogger @@ -84,6 +85,9 @@ def __init__( self._assumed_handled_count = 0 """The number of requests we assume have been handled (tracked manually for this instance).""" + self._fetch_lock = asyncio.Lock() + """Fetch lock to minimize race conditions when communicating with API.""" + @override async def get_metadata(self) -> RequestQueueMetadata: total_count = self._initial_total_count + self._assumed_total_count @@ -290,15 +294,17 @@ async def fetch_next_request(self) -> Request | None: Returns: The request or `None` if there are no more pending requests. """ - # Ensure the queue head has requests if available - await self._ensure_head_is_non_empty() + # Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions. + async with self._fetch_lock: + await self._ensure_head_is_non_empty() - # If queue head is empty after ensuring, there are no requests - if not self._queue_head: - return None + # If queue head is empty after ensuring, there are no requests + if not self._queue_head: + return None + + # Get the next request ID from the queue head + next_request_id = self._queue_head.popleft() - # Get the next request ID from the queue head - next_request_id = self._queue_head.popleft() request = await self._get_or_hydrate_request(next_request_id) # Handle potential inconsistency where request might not be in the main table yet @@ -344,6 +350,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) + if cached_request := self._requests_cache[request.id]: + cached_request.was_already_handled = request.was_already_handled try: # Update the request in the API processed_request = await self._update_request(request) @@ -389,39 +397,41 @@ async def reclaim_request( if request.was_already_handled: request.handled_at = None - try: - # Update the request in the API. - processed_request = await self._update_request(request, forefront=forefront) - processed_request.unique_key = request.unique_key - - # If the request was previously handled, decrement our handled count since - # we're putting it back for processing. - if request.was_already_handled and not processed_request.was_already_handled: - self._assumed_handled_count -= 1 - - # Update the cache - cache_key = unique_key_to_request_id(request.unique_key) - self._cache_request( - cache_key, - processed_request, - hydrated_request=request, - ) + # Reclaim with lock to prevent race conditions that could lead to double processing of the same request. + async with self._fetch_lock: + try: + # Update the request in the API. + processed_request = await self._update_request(request, forefront=forefront) + processed_request.unique_key = request.unique_key + + # If the request was previously handled, decrement our handled count since + # we're putting it back for processing. + if request.was_already_handled and not processed_request.was_already_handled: + self._assumed_handled_count -= 1 + + # Update the cache + cache_key = unique_key_to_request_id(request.unique_key) + self._cache_request( + cache_key, + processed_request, + hydrated_request=request, + ) - # If we're adding to the forefront, we need to check for forefront requests - # in the next list_head call - if forefront: - self._should_check_for_forefront_requests = True + # If we're adding to the forefront, we need to check for forefront requests + # in the next list_head call + if forefront: + self._should_check_for_forefront_requests = True - # Try to release the lock on the request - try: - await self._delete_request_lock(request.id, forefront=forefront) - except Exception as err: - logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err) - except Exception as exc: - logger.debug(f'Error reclaiming request {request.id}: {exc!s}') - return None - else: - return processed_request + # Try to release the lock on the request + try: + await self._delete_request_lock(request.id, forefront=forefront) + except Exception as err: + logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err) + except Exception as exc: + logger.debug(f'Error reclaiming request {request.id}: {exc!s}') + return None + else: + return processed_request @override async def is_empty(self) -> bool: @@ -430,9 +440,11 @@ async def is_empty(self) -> bool: Returns: True if the queue is empty, False otherwise. """ - head = await self._list_head(limit=1, lock_time=None) - - return len(head.items) == 0 and not self._queue_has_locked_requests + # Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent. + # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. + async with self._fetch_lock: + head = await self._list_head(limit=1, lock_time=None) + return len(head.items) == 0 and not self._queue_has_locked_requests async def _ensure_head_is_non_empty(self) -> None: """Ensure that the queue head has requests if they are available in the queue.""" @@ -545,7 +557,6 @@ async def _list_head( # Return from cache if available and we're not checking for new forefront requests if self._queue_head and not self._should_check_for_forefront_requests: logger.debug(f'Using cached queue head with {len(self._queue_head)} requests') - # Create a list of requests from the cached queue head items = [] for request_id in list(self._queue_head)[:limit]: @@ -563,7 +574,6 @@ async def _list_head( queue_has_locked_requests=self._queue_has_locked_requests, lock_time=lock_time, ) - leftover_buffer = list[str]() if self._should_check_for_forefront_requests: leftover_buffer = list(self._queue_head) @@ -607,13 +617,11 @@ async def _list_head( ), hydrated_request=request, ) - self._queue_head.append(request.id) for leftover_request_id in leftover_buffer: # After adding new requests to the forefront, any existing leftover locked request is kept in the end. self._queue_head.append(leftover_request_id) - return RequestQueueHead.model_validate(response) async def _prolong_request_lock( diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index d4730b00..64a846b5 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -98,15 +98,18 @@ async def test_request_queue_is_finished( request_queue_name = generate_unique_resource_name('request_queue') async with Actor: - request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) - await request_queue.add_request(Request.from_url('http://example.com')) - assert not await request_queue.is_finished() - - request = await request_queue.fetch_next_request() - assert request is not None - assert not await request_queue.is_finished(), ( - 'RequestQueue should not be finished unless the request is marked as handled.' - ) - - await request_queue.mark_request_as_handled(request) - assert await request_queue.is_finished() + try: + request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) + await request_queue.add_request(Request.from_url('http://example.com')) + assert not await request_queue.is_finished() + + request = await request_queue.fetch_next_request() + assert request is not None + assert not await request_queue.is_finished(), ( + 'RequestQueue should not be finished unless the request is marked as handled.' + ) + + await request_queue.mark_request_as_handled(request) + assert await request_queue.is_finished() + finally: + await request_queue.drop() diff --git a/tests/integration/test_crawlers_with_storages.py b/tests/integration/test_crawlers_with_storages.py index 3dd32707..a2ba1e4d 100644 --- a/tests/integration/test_crawlers_with_storages.py +++ b/tests/integration/test_crawlers_with_storages.py @@ -2,8 +2,6 @@ from typing import TYPE_CHECKING -import pytest - if TYPE_CHECKING: from .conftest import MakeActorFunction, RunActorFunction @@ -78,7 +76,6 @@ async def default_handler(context: ParselCrawlingContext) -> None: assert run_result.status == 'SUCCEEDED' -@pytest.mark.skip(reason='Sometimes crawler does not respect max_request_retries argument, see issue #540') async def test_actor_on_platform_max_request_retries( make_actor: MakeActorFunction, run_actor: RunActorFunction, @@ -87,6 +84,7 @@ async def test_actor_on_platform_max_request_retries( async def main() -> None: """The crawler entry point.""" + from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext from apify import Actor