Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ async def reclaim_request(
"""Specific implementation of this method for the RQ shared access mode."""
# Check if the request was marked as handled and clear it. When reclaiming,
# we want to put the request back for processing.
if request.was_already_handled:
# Capture this before clearing `handled_at`, otherwise the computed `was_already_handled` property
# would always be False below and the metadata counters would never be adjusted.
was_already_handled = request.was_already_handled
if was_already_handled:
request.handled_at = None

# Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
Expand All @@ -262,7 +265,7 @@ async def reclaim_request(

# If the request was previously handled, decrement our handled count since
# we're putting it back for processing.
if request.was_already_handled and not processed_request.was_already_handled:
if was_already_handled and not processed_request.was_already_handled:
self.metadata.handled_request_count -= 1
self.metadata.pending_request_count += 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ async def reclaim_request(

request_id = unique_key_to_request_id(request.unique_key)

if request.was_already_handled:
# Capture this before clearing `handled_at`, otherwise the computed `was_already_handled` property
# would always be False below and the metadata counters would never be adjusted.
was_already_handled = request.was_already_handled
if was_already_handled:
request.handled_at = None

try:
Expand All @@ -265,7 +268,7 @@ async def reclaim_request(
processed_request.unique_key = request.unique_key
# If the request was previously handled, decrement our handled count since
# we're putting it back for processing.
if request.was_already_handled and not processed_request.was_already_handled:
if was_already_handled and not processed_request.was_already_handled:
self.metadata.handled_request_count -= 1
self.metadata.pending_request_count += 1

Expand Down
70 changes: 62 additions & 8 deletions tests/unit/storage_clients/test_apify_request_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
from __future__ import annotations

from datetime import UTC, datetime
from typing import TYPE_CHECKING
from unittest.mock import AsyncMock

import pytest

from apify_client._models import RequestQueueHead, RequestQueueStats
from apify_client._models import RequestQueueHead, RequestQueueStats, RequestRegistration
from crawlee.storage_clients.models import RequestQueueMetadata

from apify import Request
from apify.storage_clients._apify._models import ApifyRequestQueueMetadata
from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient
from apify.storage_clients._apify._request_queue_single_client import ApifyRequestQueueSingleClient
from apify.storage_clients._apify._utils import unique_key_to_request_id

if TYPE_CHECKING:
from collections.abc import Callable

def _make_single_client(
api_client: AsyncMock | None = None,
) -> tuple[ApifyRequestQueueSingleClient, AsyncMock]:
if api_client is None:
api_client = AsyncMock()

def _make_metadata() -> RequestQueueMetadata:
now = datetime.now(tz=UTC)
metadata = RequestQueueMetadata(
return RequestQueueMetadata(
id='test-rq-id',
name='test-rq',
accessed_at=now,
Expand All @@ -30,7 +32,28 @@ def _make_single_client(
pending_request_count=0,
total_request_count=0,
)
client = ApifyRequestQueueSingleClient(api_client=api_client, metadata=metadata, cache_size=100)


def _make_single_client(
api_client: AsyncMock | None = None,
) -> tuple[ApifyRequestQueueSingleClient, AsyncMock]:
if api_client is None:
api_client = AsyncMock()
client = ApifyRequestQueueSingleClient(api_client=api_client, metadata=_make_metadata(), cache_size=100)
return client, api_client


def _make_shared_client(
api_client: AsyncMock | None = None,
) -> tuple[ApifyRequestQueueSharedClient, AsyncMock]:
if api_client is None:
api_client = AsyncMock()
client = ApifyRequestQueueSharedClient(
api_client=api_client,
metadata=_make_metadata(),
cache_size=100,
metadata_getter=AsyncMock(),
)
return client, api_client


Expand Down Expand Up @@ -119,3 +142,34 @@ async def test_list_head_limit(in_progress_count: int, expected_limit: int) -> N
await client._list_head()

api_client.list_head.assert_awaited_once_with(limit=expected_limit)


@pytest.mark.parametrize(
'make_client',
[_make_single_client, _make_shared_client],
ids=['single_client', 'shared_client'],
)
async def test_reclaim_previously_handled_adjusts_counts(
make_client: Callable[[], tuple[ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient, AsyncMock]],
) -> None:
"""Reclaiming a previously handled request must move it from handled back to pending in the metadata."""
client, api_client = make_client()
client.metadata.handled_request_count = 1
client.metadata.pending_request_count = 0

unique_key = 'https://example.com'
request_id = unique_key_to_request_id(unique_key)
request = Request.from_url(unique_key, unique_key=unique_key)
request.handled_at = datetime.now(tz=UTC)

# After reclaiming, the platform reports the request as no longer handled.
api_client.update_request = AsyncMock(
return_value=RequestRegistration.model_validate(
{'requestId': request_id, 'wasAlreadyPresent': True, 'wasAlreadyHandled': False}
)
)

await client.reclaim_request(request)

assert client.metadata.handled_request_count == 0, 'Reclaimed request must be removed from the handled count.'
assert client.metadata.pending_request_count == 1, 'Reclaimed request must be added back to the pending count.'
Loading