From 73cec629ff70603d319f9dfdaaa4748c6dce60a8 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 11:04:18 +0200 Subject: [PATCH 1/4] fix: skip already-handled requests in single RQ client's fetch_next_request --- .../_apify/_request_queue_single_client.py | 6 +++ .../test_apify_request_queue_client.py | 44 +++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index a42f72522..c84e2ce27 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -198,6 +198,12 @@ async def fetch_next_request(self) -> Request | None: # head reconciliations, with no recovery path for the caller. self._requests_in_progress.discard(request_id) continue + if request.handled_at is not None: + # The request was already handled on the platform (e.g. by another producer). Skip it and + # remember its id for local deduplication, mirroring the shared client's guard. + self._requests_in_progress.discard(request_id) + self._requests_already_handled.add(request_id) + continue return request # No request locally and the ones returned from the platform are already in progress. return None diff --git a/tests/unit/storage_clients/test_apify_request_queue_client.py b/tests/unit/storage_clients/test_apify_request_queue_client.py index ae110adfa..f4d080f4c 100644 --- a/tests/unit/storage_clients/test_apify_request_queue_client.py +++ b/tests/unit/storage_clients/test_apify_request_queue_client.py @@ -5,6 +5,7 @@ import pytest +from apify_client._models import Request as ClientRequest from apify_client._models import RequestQueueHead from crawlee.storage_clients.models import RequestQueueMetadata @@ -92,3 +93,46 @@ async def test_list_head_limit(in_progress_count: int, expected_limit: int) -> N await client._list_head() api_client.list_head.assert_awaited_once_with(limit=expected_limit) + + +async def test_fetch_next_request_skips_already_handled() -> None: + """A request the platform reports as already handled must not be returned by `fetch_next_request`.""" + client, api_client = _make_single_client() + + unique_key = 'https://example.com' + request_id = unique_key_to_request_id(unique_key) + + # Head reconciliation returns nothing new. + api_client.list_head = AsyncMock( + return_value=RequestQueueHead( + limit=200, + queue_modified_at=datetime.now(tz=UTC), + had_multiple_clients=False, + items=[], + ) + ) + # The platform reports this request as already handled. + api_client.get_request = AsyncMock( + return_value=ClientRequest.model_validate( + { + 'id': request_id, + 'uniqueKey': unique_key, + 'url': unique_key, + 'method': 'GET', + 'headers': {}, + 'userData': {}, + 'retryCount': 0, + 'noRetry': False, + 'handledAt': datetime.now(tz=UTC), + } + ) + ) + + # Seed the local head estimate with the request id. + client._head_requests.append(request_id) + + result = await client.fetch_next_request() + + assert result is None, 'Already-handled request must not be fetched.' + assert request_id not in client._requests_in_progress, 'Handled request must not be left in progress.' + assert request_id in client._requests_already_handled, 'Handled request id should be cached for deduplication.' From 79a9ad5446a13deb5d31eaa408ad3b2ff27f6455 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 11:49:42 +0200 Subject: [PATCH 2/4] fix: correct handled/pending counts when reclaiming a previously handled request --- .../_apify/_request_queue_shared_client.py | 7 +- .../_apify/_request_queue_single_client.py | 7 +- .../test_apify_request_queue_client.py | 70 ++++++++++++++++--- 3 files changed, 72 insertions(+), 12 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 496ec4f19..d7bfb2957 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -250,7 +250,10 @@ async def reclaim_request( """Specific implementation of this method for the RQ shared access mode.""" # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. - if request.was_already_handled: + # Capture this before clearing `handled_at`, otherwise the computed `was_already_handled` property + # would always be False below and the metadata counters would never be adjusted. + was_already_handled = request.was_already_handled + if was_already_handled: request.handled_at = None # Reclaim with lock to prevent race conditions that could lead to double processing of the same request. @@ -262,7 +265,7 @@ async def reclaim_request( # If the request was previously handled, decrement our handled count since # we're putting it back for processing. - if request.was_already_handled and not processed_request.was_already_handled: + if was_already_handled and not processed_request.was_already_handled: self.metadata.handled_request_count -= 1 self.metadata.pending_request_count += 1 diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index c84e2ce27..3658adc0b 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -250,7 +250,10 @@ async def reclaim_request( request_id = unique_key_to_request_id(request.unique_key) - if request.was_already_handled: + # Capture this before clearing `handled_at`, otherwise the computed `was_already_handled` property + # would always be False below and the metadata counters would never be adjusted. + was_already_handled = request.was_already_handled + if was_already_handled: request.handled_at = None try: @@ -271,7 +274,7 @@ async def reclaim_request( processed_request.unique_key = request.unique_key # If the request was previously handled, decrement our handled count since # we're putting it back for processing. - if request.was_already_handled and not processed_request.was_already_handled: + if was_already_handled and not processed_request.was_already_handled: self.metadata.handled_request_count -= 1 self.metadata.pending_request_count += 1 diff --git a/tests/unit/storage_clients/test_apify_request_queue_client.py b/tests/unit/storage_clients/test_apify_request_queue_client.py index f4d080f4c..4d28c000f 100644 --- a/tests/unit/storage_clients/test_apify_request_queue_client.py +++ b/tests/unit/storage_clients/test_apify_request_queue_client.py @@ -1,25 +1,27 @@ from __future__ import annotations from datetime import UTC, datetime +from typing import TYPE_CHECKING from unittest.mock import AsyncMock import pytest from apify_client._models import Request as ClientRequest -from apify_client._models import RequestQueueHead +from apify_client._models import RequestQueueHead, RequestRegistration from crawlee.storage_clients.models import RequestQueueMetadata +from apify import Request +from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient from apify.storage_clients._apify._request_queue_single_client import ApifyRequestQueueSingleClient from apify.storage_clients._apify._utils import unique_key_to_request_id +if TYPE_CHECKING: + from collections.abc import Callable -def _make_single_client( - api_client: AsyncMock | None = None, -) -> tuple[ApifyRequestQueueSingleClient, AsyncMock]: - if api_client is None: - api_client = AsyncMock() + +def _make_metadata() -> RequestQueueMetadata: now = datetime.now(tz=UTC) - metadata = RequestQueueMetadata( + return RequestQueueMetadata( id='test-rq-id', name='test-rq', accessed_at=now, @@ -30,7 +32,28 @@ def _make_single_client( pending_request_count=0, total_request_count=0, ) - client = ApifyRequestQueueSingleClient(api_client=api_client, metadata=metadata, cache_size=100) + + +def _make_single_client( + api_client: AsyncMock | None = None, +) -> tuple[ApifyRequestQueueSingleClient, AsyncMock]: + if api_client is None: + api_client = AsyncMock() + client = ApifyRequestQueueSingleClient(api_client=api_client, metadata=_make_metadata(), cache_size=100) + return client, api_client + + +def _make_shared_client( + api_client: AsyncMock | None = None, +) -> tuple[ApifyRequestQueueSharedClient, AsyncMock]: + if api_client is None: + api_client = AsyncMock() + client = ApifyRequestQueueSharedClient( + api_client=api_client, + metadata=_make_metadata(), + cache_size=100, + metadata_getter=AsyncMock(), + ) return client, api_client @@ -136,3 +159,34 @@ async def test_fetch_next_request_skips_already_handled() -> None: assert result is None, 'Already-handled request must not be fetched.' assert request_id not in client._requests_in_progress, 'Handled request must not be left in progress.' assert request_id in client._requests_already_handled, 'Handled request id should be cached for deduplication.' + + +@pytest.mark.parametrize( + 'make_client', + [_make_single_client, _make_shared_client], + ids=['single_client', 'shared_client'], +) +async def test_reclaim_previously_handled_adjusts_counts( + make_client: Callable[[], tuple[ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient, AsyncMock]], +) -> None: + """Reclaiming a previously handled request must move it from handled back to pending in the metadata.""" + client, api_client = make_client() + client.metadata.handled_request_count = 1 + client.metadata.pending_request_count = 0 + + unique_key = 'https://example.com' + request_id = unique_key_to_request_id(unique_key) + request = Request.from_url(unique_key, unique_key=unique_key) + request.handled_at = datetime.now(tz=UTC) + + # After reclaiming, the platform reports the request as no longer handled. + api_client.update_request = AsyncMock( + return_value=RequestRegistration.model_validate( + {'requestId': request_id, 'wasAlreadyPresent': True, 'wasAlreadyHandled': False} + ) + ) + + await client.reclaim_request(request) + + assert client.metadata.handled_request_count == 0, 'Reclaimed request must be removed from the handled count.' + assert client.metadata.pending_request_count == 1, 'Reclaimed request must be added back to the pending count.' From 04b0339a4b97d863766d2bd20aad465fb801ffcb Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 19:28:37 +0200 Subject: [PATCH 3/4] refactor: drop redundant already-handled guard from single RQ client --- .../_apify/_request_queue_single_client.py | 6 --- .../test_apify_request_queue_client.py | 44 ------------------- 2 files changed, 50 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 3658adc0b..e086cb338 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -198,12 +198,6 @@ async def fetch_next_request(self) -> Request | None: # head reconciliations, with no recovery path for the caller. self._requests_in_progress.discard(request_id) continue - if request.handled_at is not None: - # The request was already handled on the platform (e.g. by another producer). Skip it and - # remember its id for local deduplication, mirroring the shared client's guard. - self._requests_in_progress.discard(request_id) - self._requests_already_handled.add(request_id) - continue return request # No request locally and the ones returned from the platform are already in progress. return None diff --git a/tests/unit/storage_clients/test_apify_request_queue_client.py b/tests/unit/storage_clients/test_apify_request_queue_client.py index bc88bb903..d58f3d9b9 100644 --- a/tests/unit/storage_clients/test_apify_request_queue_client.py +++ b/tests/unit/storage_clients/test_apify_request_queue_client.py @@ -6,7 +6,6 @@ import pytest -from apify_client._models import Request as ClientRequest from apify_client._models import RequestQueueHead, RequestQueueStats, RequestRegistration from crawlee.storage_clients.models import RequestQueueMetadata @@ -145,49 +144,6 @@ async def test_list_head_limit(in_progress_count: int, expected_limit: int) -> N api_client.list_head.assert_awaited_once_with(limit=expected_limit) -async def test_fetch_next_request_skips_already_handled() -> None: - """A request the platform reports as already handled must not be returned by `fetch_next_request`.""" - client, api_client = _make_single_client() - - unique_key = 'https://example.com' - request_id = unique_key_to_request_id(unique_key) - - # Head reconciliation returns nothing new. - api_client.list_head = AsyncMock( - return_value=RequestQueueHead( - limit=200, - queue_modified_at=datetime.now(tz=UTC), - had_multiple_clients=False, - items=[], - ) - ) - # The platform reports this request as already handled. - api_client.get_request = AsyncMock( - return_value=ClientRequest.model_validate( - { - 'id': request_id, - 'uniqueKey': unique_key, - 'url': unique_key, - 'method': 'GET', - 'headers': {}, - 'userData': {}, - 'retryCount': 0, - 'noRetry': False, - 'handledAt': datetime.now(tz=UTC), - } - ) - ) - - # Seed the local head estimate with the request id. - client._head_requests.append(request_id) - - result = await client.fetch_next_request() - - assert result is None, 'Already-handled request must not be fetched.' - assert request_id not in client._requests_in_progress, 'Handled request must not be left in progress.' - assert request_id in client._requests_already_handled, 'Handled request id should be cached for deduplication.' - - @pytest.mark.parametrize( 'make_client', [_make_single_client, _make_shared_client], From 29967d1de0077e2151e02cc22939cadc3438d7ac Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Sat, 13 Jun 2026 10:01:32 +0200 Subject: [PATCH 4/4] fix: base reclaim_request counter adjustment on platform response --- .../_apify/_request_queue_shared_client.py | 11 ++- .../_apify/_request_queue_single_client.py | 11 ++- tests/integration/test_request_queue.py | 40 +++++++++++ .../test_apify_request_queue_client.py | 70 +++---------------- 4 files changed, 56 insertions(+), 76 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index d7bfb2957..48d282a99 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -250,10 +250,7 @@ async def reclaim_request( """Specific implementation of this method for the RQ shared access mode.""" # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. - # Capture this before clearing `handled_at`, otherwise the computed `was_already_handled` property - # would always be False below and the metadata counters would never be adjusted. - was_already_handled = request.was_already_handled - if was_already_handled: + if request.was_already_handled: request.handled_at = None # Reclaim with lock to prevent race conditions that could lead to double processing of the same request. @@ -263,9 +260,9 @@ async def reclaim_request( processed_request = await self._update_request(request, forefront=forefront) processed_request.unique_key = request.unique_key - # If the request was previously handled, decrement our handled count since - # we're putting it back for processing. - if was_already_handled and not processed_request.was_already_handled: + # The platform reports the request's state before this update via `was_already_handled`. If it was + # handled, this update moved it from handled back to pending, so mirror that in the local metadata. + if processed_request.was_already_handled: self.metadata.handled_request_count -= 1 self.metadata.pending_request_count += 1 diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index e086cb338..4029300d2 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -244,10 +244,7 @@ async def reclaim_request( request_id = unique_key_to_request_id(request.unique_key) - # Capture this before clearing `handled_at`, otherwise the computed `was_already_handled` property - # would always be False below and the metadata counters would never be adjusted. - was_already_handled = request.was_already_handled - if was_already_handled: + if request.was_already_handled: request.handled_at = None try: @@ -266,9 +263,9 @@ async def reclaim_request( processed_request = await self._update_request(request, forefront=forefront) processed_request.id = request_id processed_request.unique_key = request.unique_key - # If the request was previously handled, decrement our handled count since - # we're putting it back for processing. - if was_already_handled and not processed_request.was_already_handled: + # The platform reports the request's state before this update via `was_already_handled`. If it was + # handled, this update moved it from handled back to pending, so mirror that in the local metadata. + if processed_request.was_already_handled: self.metadata.handled_request_count -= 1 self.metadata.pending_request_count += 1 diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 1c4b56186..6a9ba3cb5 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -356,6 +356,46 @@ async def test_request_reclaim_with_forefront( Actor.log.info(f'Test completed - processed {remaining_count} additional requests') +async def test_reclaim_handled_request_moves_back_to_pending( + request_queue_apify: RequestQueue, + rq_poll_timeout: int, +) -> None: + """Reclaiming an already-handled request must move it from handled back to pending in the queue metadata.""" + rq = request_queue_apify + + # Add a request and mark it as handled. + await rq.add_request('https://example.com/reclaim-handled') + request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) + assert request is not None + await rq.mark_request_as_handled(request) + + # The local estimate reflects the handled count immediately, even before the API metadata propagates. + metadata = await poll_until_condition( + rq.get_metadata, + lambda m: m.handled_request_count == 1, + timeout=rq_poll_timeout, + backoff_factor=2, + ) + assert metadata.handled_request_count == 1, f'handled={metadata.handled_request_count}' + + # Reclaim the already-handled request. The platform reports the prior (handled) state on the update. + reclaim_result = await rq.reclaim_request(request) + assert reclaim_result is not None + assert reclaim_result.was_already_handled is True + + # The counts must move handled -> pending. The decrement only becomes visible once the API-side metadata + # catches up, since the merged metadata takes max(api, local) for the handled count, so poll until it settles. + metadata = await poll_until_condition( + rq.get_metadata, + lambda m: m.handled_request_count == 0 and m.pending_request_count == 1, + timeout=60, + poll_interval=5, + ) + assert metadata.handled_request_count == 0, f'handled={metadata.handled_request_count}' + assert metadata.pending_request_count == 1, f'pending={metadata.pending_request_count}' + assert metadata.total_request_count == 1, f'total={metadata.total_request_count}' + + async def test_complex_request_objects( request_queue_apify: RequestQueue, rq_poll_timeout: int, diff --git a/tests/unit/storage_clients/test_apify_request_queue_client.py b/tests/unit/storage_clients/test_apify_request_queue_client.py index d58f3d9b9..cfdc0ed17 100644 --- a/tests/unit/storage_clients/test_apify_request_queue_client.py +++ b/tests/unit/storage_clients/test_apify_request_queue_client.py @@ -1,27 +1,25 @@ from __future__ import annotations from datetime import UTC, datetime -from typing import TYPE_CHECKING from unittest.mock import AsyncMock import pytest -from apify_client._models import RequestQueueHead, RequestQueueStats, RequestRegistration +from apify_client._models import RequestQueueHead, RequestQueueStats from crawlee.storage_clients.models import RequestQueueMetadata -from apify import Request from apify.storage_clients._apify._models import ApifyRequestQueueMetadata -from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient from apify.storage_clients._apify._request_queue_single_client import ApifyRequestQueueSingleClient from apify.storage_clients._apify._utils import unique_key_to_request_id -if TYPE_CHECKING: - from collections.abc import Callable - -def _make_metadata() -> RequestQueueMetadata: +def _make_single_client( + api_client: AsyncMock | None = None, +) -> tuple[ApifyRequestQueueSingleClient, AsyncMock]: + if api_client is None: + api_client = AsyncMock() now = datetime.now(tz=UTC) - return RequestQueueMetadata( + metadata = RequestQueueMetadata( id='test-rq-id', name='test-rq', accessed_at=now, @@ -32,28 +30,7 @@ def _make_metadata() -> RequestQueueMetadata: pending_request_count=0, total_request_count=0, ) - - -def _make_single_client( - api_client: AsyncMock | None = None, -) -> tuple[ApifyRequestQueueSingleClient, AsyncMock]: - if api_client is None: - api_client = AsyncMock() - client = ApifyRequestQueueSingleClient(api_client=api_client, metadata=_make_metadata(), cache_size=100) - return client, api_client - - -def _make_shared_client( - api_client: AsyncMock | None = None, -) -> tuple[ApifyRequestQueueSharedClient, AsyncMock]: - if api_client is None: - api_client = AsyncMock() - client = ApifyRequestQueueSharedClient( - api_client=api_client, - metadata=_make_metadata(), - cache_size=100, - metadata_getter=AsyncMock(), - ) + client = ApifyRequestQueueSingleClient(api_client=api_client, metadata=metadata, cache_size=100) return client, api_client @@ -142,34 +119,3 @@ async def test_list_head_limit(in_progress_count: int, expected_limit: int) -> N await client._list_head() api_client.list_head.assert_awaited_once_with(limit=expected_limit) - - -@pytest.mark.parametrize( - 'make_client', - [_make_single_client, _make_shared_client], - ids=['single_client', 'shared_client'], -) -async def test_reclaim_previously_handled_adjusts_counts( - make_client: Callable[[], tuple[ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient, AsyncMock]], -) -> None: - """Reclaiming a previously handled request must move it from handled back to pending in the metadata.""" - client, api_client = make_client() - client.metadata.handled_request_count = 1 - client.metadata.pending_request_count = 0 - - unique_key = 'https://example.com' - request_id = unique_key_to_request_id(unique_key) - request = Request.from_url(unique_key, unique_key=unique_key) - request.handled_at = datetime.now(tz=UTC) - - # After reclaiming, the platform reports the request as no longer handled. - api_client.update_request = AsyncMock( - return_value=RequestRegistration.model_validate( - {'requestId': request_id, 'wasAlreadyPresent': True, 'wasAlreadyHandled': False} - ) - ) - - await client.reclaim_request(request) - - assert client.metadata.handled_request_count == 0, 'Reclaimed request must be removed from the handled count.' - assert client.metadata.pending_request_count == 1, 'Reclaimed request must be added back to the pending count.'