diff --git a/src/apify/storage_clients/_apify/_models.py b/src/apify/storage_clients/_apify/_models.py index d41e33b2..df981121 100644 --- a/src/apify/storage_clients/_apify/_models.py +++ b/src/apify/storage_clients/_apify/_models.py @@ -105,6 +105,3 @@ class CachedRequest(BaseModel): lock_expires_at: datetime | None = None """The expiration time of the lock on the request.""" - - forefront: bool = False - """Whether the request was added to the forefront of the queue.""" diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 8a6dfa89..95af78a1 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -358,7 +358,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | self._cache_request( cache_key, processed_request, - forefront=False, hydrated_request=request, ) except Exception as exc: @@ -405,7 +404,6 @@ async def reclaim_request( self._cache_request( cache_key, processed_request, - forefront=forefront, hydrated_request=request, ) @@ -463,9 +461,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None: # Try to prolong the lock if it's expired try: lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds()) - response = await self._prolong_request_lock( - request_id, forefront=cached_entry.forefront, lock_secs=lock_secs - ) + response = await self._prolong_request_lock(request_id, lock_secs=lock_secs) cached_entry.lock_expires_at = response.lock_expires_at except Exception: # If prolonging the lock fails, we lost the request @@ -478,7 +474,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None: try: # Try to acquire or prolong the lock lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds()) - await self._prolong_request_lock(request_id, forefront=False, lock_secs=lock_secs) + await self._prolong_request_lock(request_id, lock_secs=lock_secs) # Fetch the request data request = await self.get_request(request_id) @@ -498,7 +494,6 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None: was_already_present=True, was_already_handled=request.handled_at is not None, ), - forefront=False, hydrated_request=request, ) except Exception as exc: @@ -569,6 +564,12 @@ async def _list_head( lock_time=lock_time, ) + leftover_buffer = list[str]() + if self._should_check_for_forefront_requests: + leftover_buffer = list(self._queue_head) + self._queue_head.clear() + self._should_check_for_forefront_requests = False + # Otherwise fetch from API lock_time = lock_time or self._DEFAULT_LOCK_TIME lock_secs = int(lock_time.total_seconds()) @@ -581,15 +582,6 @@ async def _list_head( # Update the queue head cache self._queue_has_locked_requests = response.get('queueHasLockedRequests', False) - # Clear current queue head if we're checking for forefront requests - if self._should_check_for_forefront_requests: - self._queue_head.clear() - self._should_check_for_forefront_requests = False - - # Process and cache the requests - head_id_buffer = list[str]() - forefront_head_id_buffer = list[str]() - for request_data in response.get('items', []): request = Request.model_validate(request_data) @@ -604,36 +596,23 @@ async def _list_head( ) continue - # Check if this request was already cached and if it was added to forefront - cache_key = unique_key_to_request_id(request.unique_key) - cached_request = self._requests_cache.get(cache_key) - forefront = cached_request.forefront if cached_request else False - - # Add to appropriate buffer based on forefront flag - if forefront: - forefront_head_id_buffer.insert(0, request.id) - else: - head_id_buffer.append(request.id) - # Cache the request self._cache_request( - cache_key, + unique_key_to_request_id(request.unique_key), ProcessedRequest( id=request.id, unique_key=request.unique_key, was_already_present=True, was_already_handled=False, ), - forefront=forefront, hydrated_request=request, ) - # Update the queue head deque - for request_id in head_id_buffer: - self._queue_head.append(request_id) + self._queue_head.append(request.id) - for request_id in forefront_head_id_buffer: - self._queue_head.appendleft(request_id) + for leftover_request_id in leftover_buffer: + # After adding new requests to the forefront, any existing leftover locked request is kept in the end. + self._queue_head.append(leftover_request_id) return RequestQueueHead.model_validate(response) @@ -641,14 +620,12 @@ async def _prolong_request_lock( self, request_id: str, *, - forefront: bool = False, lock_secs: int, ) -> ProlongRequestLockResponse: """Prolong the lock on a specific request in the queue. Args: request_id: The identifier of the request whose lock is to be prolonged. - forefront: Whether to put the request in the beginning or the end of the queue after lock expires. lock_secs: The additional amount of time, in seconds, that the request will remain locked. Returns: @@ -656,7 +633,9 @@ async def _prolong_request_lock( """ response = await self._api_client.prolong_request_lock( request_id=request_id, - forefront=forefront, + # All requests reaching this code were the tip of the queue at the moment when they were fetched, + # so if their lock expires, they should be put back to the forefront as their handling is long overdue. + forefront=True, lock_secs=lock_secs, ) @@ -703,7 +682,6 @@ def _cache_request( cache_key: str, processed_request: ProcessedRequest, *, - forefront: bool, hydrated_request: Request | None = None, ) -> None: """Cache a request for future use. @@ -719,5 +697,4 @@ def _cache_request( was_already_handled=processed_request.was_already_handled, hydrated=hydrated_request, lock_expires_at=None, - forefront=forefront, )