diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 496ec4f1..48d282a9 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -260,9 +260,9 @@ async def reclaim_request( processed_request = await self._update_request(request, forefront=forefront) processed_request.unique_key = request.unique_key - # If the request was previously handled, decrement our handled count since - # we're putting it back for processing. - if request.was_already_handled and not processed_request.was_already_handled: + # The platform reports the request's state before this update via `was_already_handled`. If it was + # handled, this update moved it from handled back to pending, so mirror that in the local metadata. + if processed_request.was_already_handled: self.metadata.handled_request_count -= 1 self.metadata.pending_request_count += 1 diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index a42f7252..4029300d 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -263,9 +263,9 @@ async def reclaim_request( processed_request = await self._update_request(request, forefront=forefront) processed_request.id = request_id processed_request.unique_key = request.unique_key - # If the request was previously handled, decrement our handled count since - # we're putting it back for processing. - if request.was_already_handled and not processed_request.was_already_handled: + # The platform reports the request's state before this update via `was_already_handled`. If it was + # handled, this update moved it from handled back to pending, so mirror that in the local metadata. + if processed_request.was_already_handled: self.metadata.handled_request_count -= 1 self.metadata.pending_request_count += 1 diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 1c4b5618..6a9ba3cb 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -356,6 +356,46 @@ async def test_request_reclaim_with_forefront( Actor.log.info(f'Test completed - processed {remaining_count} additional requests') +async def test_reclaim_handled_request_moves_back_to_pending( + request_queue_apify: RequestQueue, + rq_poll_timeout: int, +) -> None: + """Reclaiming an already-handled request must move it from handled back to pending in the queue metadata.""" + rq = request_queue_apify + + # Add a request and mark it as handled. + await rq.add_request('https://example.com/reclaim-handled') + request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) + assert request is not None + await rq.mark_request_as_handled(request) + + # The local estimate reflects the handled count immediately, even before the API metadata propagates. + metadata = await poll_until_condition( + rq.get_metadata, + lambda m: m.handled_request_count == 1, + timeout=rq_poll_timeout, + backoff_factor=2, + ) + assert metadata.handled_request_count == 1, f'handled={metadata.handled_request_count}' + + # Reclaim the already-handled request. The platform reports the prior (handled) state on the update. + reclaim_result = await rq.reclaim_request(request) + assert reclaim_result is not None + assert reclaim_result.was_already_handled is True + + # The counts must move handled -> pending. The decrement only becomes visible once the API-side metadata + # catches up, since the merged metadata takes max(api, local) for the handled count, so poll until it settles. + metadata = await poll_until_condition( + rq.get_metadata, + lambda m: m.handled_request_count == 0 and m.pending_request_count == 1, + timeout=60, + poll_interval=5, + ) + assert metadata.handled_request_count == 0, f'handled={metadata.handled_request_count}' + assert metadata.pending_request_count == 1, f'pending={metadata.pending_request_count}' + assert metadata.total_request_count == 1, f'total={metadata.total_request_count}' + + async def test_complex_request_objects( request_queue_apify: RequestQueue, rq_poll_timeout: int,