Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 53 additions & 45 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from collections import deque
from datetime import datetime, timedelta, timezone
from logging import getLogger
Expand Down Expand Up @@ -84,6 +85,9 @@ def __init__(
self._assumed_handled_count = 0
"""The number of requests we assume have been handled (tracked manually for this instance)."""

self._fetch_lock = asyncio.Lock()
"""Fetch lock to minimize race conditions when communicating with API."""

@override
async def get_metadata(self) -> RequestQueueMetadata:
total_count = self._initial_total_count + self._assumed_total_count
Expand Down Expand Up @@ -290,15 +294,17 @@ async def fetch_next_request(self) -> Request | None:
Returns:
The request or `None` if there are no more pending requests.
"""
# Ensure the queue head has requests if available
await self._ensure_head_is_non_empty()
# Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions.
async with self._fetch_lock:
await self._ensure_head_is_non_empty()

# If queue head is empty after ensuring, there are no requests
if not self._queue_head:
return None
# If queue head is empty after ensuring, there are no requests
if not self._queue_head:
return None

# Get the next request ID from the queue head
next_request_id = self._queue_head.popleft()

# Get the next request ID from the queue head
next_request_id = self._queue_head.popleft()
request = await self._get_or_hydrate_request(next_request_id)

# Handle potential inconsistency where request might not be in the main table yet
Expand Down Expand Up @@ -344,6 +350,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
if request.handled_at is None:
request.handled_at = datetime.now(tz=timezone.utc)

if cached_request := self._requests_cache[request.id]:
cached_request.was_already_handled = request.was_already_handled
try:
# Update the request in the API
processed_request = await self._update_request(request)
Expand Down Expand Up @@ -389,39 +397,41 @@ async def reclaim_request(
if request.was_already_handled:
request.handled_at = None

try:
# Update the request in the API.
processed_request = await self._update_request(request, forefront=forefront)
processed_request.unique_key = request.unique_key

# If the request was previously handled, decrement our handled count since
# we're putting it back for processing.
if request.was_already_handled and not processed_request.was_already_handled:
self._assumed_handled_count -= 1

# Update the cache
cache_key = unique_key_to_request_id(request.unique_key)
self._cache_request(
cache_key,
processed_request,
hydrated_request=request,
)
# Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
async with self._fetch_lock:
try:
# Update the request in the API.
processed_request = await self._update_request(request, forefront=forefront)
processed_request.unique_key = request.unique_key

# If the request was previously handled, decrement our handled count since
# we're putting it back for processing.
if request.was_already_handled and not processed_request.was_already_handled:
self._assumed_handled_count -= 1

# Update the cache
cache_key = unique_key_to_request_id(request.unique_key)
self._cache_request(
cache_key,
processed_request,
hydrated_request=request,
)

# If we're adding to the forefront, we need to check for forefront requests
# in the next list_head call
if forefront:
self._should_check_for_forefront_requests = True
# If we're adding to the forefront, we need to check for forefront requests
# in the next list_head call
if forefront:
self._should_check_for_forefront_requests = True

# Try to release the lock on the request
try:
await self._delete_request_lock(request.id, forefront=forefront)
except Exception as err:
logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err)
except Exception as exc:
logger.debug(f'Error reclaiming request {request.id}: {exc!s}')
return None
else:
return processed_request
# Try to release the lock on the request
try:
await self._delete_request_lock(request.id, forefront=forefront)
except Exception as err:
logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err)
except Exception as exc:
logger.debug(f'Error reclaiming request {request.id}: {exc!s}')
return None
else:
return processed_request

@override
async def is_empty(self) -> bool:
Expand All @@ -430,9 +440,11 @@ async def is_empty(self) -> bool:
Returns:
True if the queue is empty, False otherwise.
"""
head = await self._list_head(limit=1, lock_time=None)

return len(head.items) == 0 and not self._queue_has_locked_requests
# Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent.
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
async with self._fetch_lock:
head = await self._list_head(limit=1, lock_time=None)
return len(head.items) == 0 and not self._queue_has_locked_requests

async def _ensure_head_is_non_empty(self) -> None:
"""Ensure that the queue head has requests if they are available in the queue."""
Expand Down Expand Up @@ -545,7 +557,6 @@ async def _list_head(
# Return from cache if available and we're not checking for new forefront requests
if self._queue_head and not self._should_check_for_forefront_requests:
logger.debug(f'Using cached queue head with {len(self._queue_head)} requests')

# Create a list of requests from the cached queue head
items = []
for request_id in list(self._queue_head)[:limit]:
Expand All @@ -563,7 +574,6 @@ async def _list_head(
queue_has_locked_requests=self._queue_has_locked_requests,
lock_time=lock_time,
)

leftover_buffer = list[str]()
if self._should_check_for_forefront_requests:
leftover_buffer = list(self._queue_head)
Expand Down Expand Up @@ -607,13 +617,11 @@ async def _list_head(
),
hydrated_request=request,
)

self._queue_head.append(request.id)

for leftover_request_id in leftover_buffer:
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
self._queue_head.append(leftover_request_id)

return RequestQueueHead.model_validate(response)

async def _prolong_request_lock(
Expand Down
27 changes: 15 additions & 12 deletions tests/integration/test_actor_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,18 @@ async def test_request_queue_is_finished(
request_queue_name = generate_unique_resource_name('request_queue')

async with Actor:
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
await request_queue.add_request(Request.from_url('http://example.com'))
assert not await request_queue.is_finished()

request = await request_queue.fetch_next_request()
assert request is not None
assert not await request_queue.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)

await request_queue.mark_request_as_handled(request)
assert await request_queue.is_finished()
try:
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
await request_queue.add_request(Request.from_url('http://example.com'))
assert not await request_queue.is_finished()

request = await request_queue.fetch_next_request()
assert request is not None
assert not await request_queue.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)

await request_queue.mark_request_as_handled(request)
assert await request_queue.is_finished()
finally:
await request_queue.drop()
4 changes: 1 addition & 3 deletions tests/integration/test_crawlers_with_storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from typing import TYPE_CHECKING

import pytest

if TYPE_CHECKING:
from .conftest import MakeActorFunction, RunActorFunction

Expand Down Expand Up @@ -78,7 +76,6 @@ async def default_handler(context: ParselCrawlingContext) -> None:
assert run_result.status == 'SUCCEEDED'


@pytest.mark.skip(reason='Sometimes crawler does not respect max_request_retries argument, see issue #540')
async def test_actor_on_platform_max_request_retries(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
Expand All @@ -87,6 +84,7 @@ async def test_actor_on_platform_max_request_retries(

async def main() -> None:
"""The crawler entry point."""

from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext

from apify import Actor
Expand Down
Loading