Skip to content

Commit d96c3c4

Browse files
committed
Merge remote-tracking branch 'origin/master' into add-stats-to-apify-rq
2 parents dddef14 + 7e4e5da commit d96c3c4

File tree

8 files changed

+273
-85
lines changed

8 files changed

+273
-85
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 73 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
from datetime import datetime, timedelta, timezone
88
from hashlib import sha256
99
from logging import getLogger
10-
from typing import TYPE_CHECKING, Final, Annotated
10+
from typing import TYPE_CHECKING, Annotated, Final
1111

1212
from cachetools import LRUCache
1313
from pydantic import BaseModel, ConfigDict, Field
1414
from typing_extensions import override
1515

1616
from apify_client import ApifyClientAsync
17+
from crawlee._utils.crypto import crypto_random_object_id
1718
from crawlee.storage_clients._base import RequestQueueClient
1819
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1920

@@ -71,6 +72,7 @@ class RequestQueueStats(BaseModel):
7172
write_count: Annotated[int, Field(alias='writeCount', default=0)]
7273
"""The number of request queue writes."""
7374

75+
7476
class ApifyRequestQueueMetadata(RequestQueueMetadata):
7577
stats: Annotated[RequestQueueStats, Field(alias='stats', default_factory=RequestQueueStats)]
7678
"""Additional optional statistics about the request queue."""
@@ -89,10 +91,7 @@ def __init__(
8991
self,
9092
*,
9193
api_client: RequestQueueClientAsync,
92-
id: str,
93-
name: str | None,
94-
total_request_count: int,
95-
handled_request_count: int,
94+
metadata: RequestQueueMetadata,
9695
) -> None:
9796
"""Initialize a new instance.
9897
@@ -101,11 +100,8 @@ def __init__(
101100
self._api_client = api_client
102101
"""The Apify request queue client for API operations."""
103102

104-
self._id = id
105-
"""The ID of the request queue."""
106-
107-
self._name = name
108-
"""The name of the request queue."""
103+
self._metadata = metadata
104+
"""Additional data related to the RequestQueue."""
109105

110106
self._queue_head = deque[str]()
111107
"""A deque to store request unique keys in the queue head."""
@@ -119,41 +115,45 @@ def __init__(
119115
self._should_check_for_forefront_requests = False
120116
"""Whether to check for forefront requests in the next list_head call."""
121117

122-
self._had_multiple_clients = False
123-
"""Whether the request queue has been accessed by multiple clients."""
124-
125-
self._initial_total_count = total_request_count
126-
"""The initial total request count (from the API) when the queue was opened."""
127-
128-
self._initial_handled_count = handled_request_count
129-
"""The initial handled request count (from the API) when the queue was opened."""
130-
131-
self._assumed_total_count = 0
132-
"""The number of requests we assume are in the queue (tracked manually for this instance)."""
133-
134-
self._assumed_handled_count = 0
135-
"""The number of requests we assume have been handled (tracked manually for this instance)."""
136-
137118
self._fetch_lock = asyncio.Lock()
138119
"""Fetch lock to minimize race conditions when communicating with API."""
139120

121+
async def _get_metadata_estimate(self) -> RequestQueueMetadata:
122+
"""Try to get cached metadata first. If multiple clients, fuse with global metadata.
123+
124+
This method is used internally to avoid unnecessary API call unless needed (multiple clients).
125+
Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one
126+
client, it is the better choice.
127+
"""
128+
if self._metadata.had_multiple_clients:
129+
return await self.get_metadata()
130+
# Get local estimation (will not include changes done bo another client)
131+
return self._metadata
132+
140133
@override
141134
async def get_metadata(self) -> ApifyRequestQueueMetadata:
142-
total_count = self._initial_total_count + self._assumed_total_count
143-
handled_count = self._initial_handled_count + self._assumed_handled_count
144-
pending_count = total_count - handled_count
135+
"""Get metadata about the request queue.
145136
137+
Returns:
138+
Metadata from the API, merged with local estimation, because in some cases, the data from the API can
139+
be delayed.
140+
"""
141+
response = await self._api_client.get()
142+
if response is None:
143+
raise ValueError('Failed to fetch request queue metadata from the API.')
144+
# Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.)
146145
return ApifyRequestQueueMetadata(
147-
id=self._id,
148-
name=self._name,
149-
total_request_count=total_count,
150-
handled_request_count=handled_count,
151-
pending_request_count=pending_count,
152-
created_at=datetime.now(timezone.utc),
153-
modified_at=datetime.now(timezone.utc),
154-
accessed_at=datetime.now(timezone.utc),
155-
had_multiple_clients=self._had_multiple_clients,
156-
stats=RequestQueueStats.model_validate({}) # TODO: update after https://github.com/apify/apify-sdk-python/pull/552
146+
id=response['id'],
147+
name=response['name'],
148+
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
149+
handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count),
150+
pending_request_count=response['pendingRequestCount'],
151+
created_at=min(response['createdAt'], self._metadata.created_at),
152+
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
153+
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
154+
had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients,
155+
stats=RequestQueueStats.model_validate({}),
156+
# TODO: update after https://github.com/apify/apify-sdk-python/pull/552
157157
)
158158

159159
@classmethod
@@ -212,27 +212,34 @@ async def open(
212212
)
213213
apify_rqs_client = apify_client_async.request_queues()
214214

215-
# If both id and name are provided, raise an error.
216-
if id and name:
217-
raise ValueError('Only one of "id" or "name" can be specified, not both.')
218-
219-
# If id is provided, get the storage by ID.
220-
if id and name is None:
221-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
215+
match (id, name):
216+
case (None, None):
217+
# If both id and name are None, try to get the default storage ID from environment variables.
218+
# The default storage ID environment variable is set by the Apify platform. It also contains
219+
# a new storage ID after Actor's reboot or migration.
220+
id = configuration.default_request_queue_id
221+
case (None, name):
222+
# If only name is provided, get or create the storage by name.
223+
id = RequestQueueMetadata.model_validate(
224+
await apify_rqs_client.get_or_create(name=name),
225+
).id
226+
case (_, None):
227+
# If only id is provided, use it.
228+
pass
229+
case (_, _):
230+
# If both id and name are provided, raise an error.
231+
raise ValueError('Only one of "id" or "name" can be specified, not both.')
232+
if id is None:
233+
raise RuntimeError('Unreachable code')
222234

223-
# If name is provided, get or create the storage by name.
224-
if name and id is None:
225-
id = RequestQueueMetadata.model_validate(
226-
await apify_rqs_client.get_or_create(name=name),
227-
).id
228-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
235+
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
236+
# It should persist across migrated or resurrected Actor runs on the Apify platform.
237+
_api_max_client_key_length = 32
238+
client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[
239+
:_api_max_client_key_length
240+
]
229241

230-
# If both id and name are None, try to get the default storage ID from environment variables.
231-
# The default storage ID environment variable is set by the Apify platform. It also contains
232-
# a new storage ID after Actor's reboot or migration.
233-
if id is None and name is None:
234-
id = configuration.default_request_queue_id
235-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
242+
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
236243

237244
# Fetch its metadata.
238245
metadata = await apify_rq_client.get()
@@ -242,27 +249,18 @@ async def open(
242249
id = RequestQueueMetadata.model_validate(
243250
await apify_rqs_client.get_or_create(),
244251
).id
245-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
252+
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
246253

247254
# Verify that the storage exists by fetching its metadata again.
248255
metadata = await apify_rq_client.get()
249256
if metadata is None:
250257
raise ValueError(f'Opening request queue with id={id} and name={name} failed.')
251258

252-
metadata_model = RequestQueueMetadata.model_validate(
253-
await apify_rqs_client.get_or_create(),
254-
)
255-
256-
# Ensure we have a valid ID.
257-
if id is None:
258-
raise ValueError('Request queue ID cannot be None.')
259+
metadata_model = RequestQueueMetadata.model_validate(metadata)
259260

260261
return cls(
261262
api_client=apify_rq_client,
262-
id=id,
263-
name=name,
264-
total_request_count=metadata_model.total_request_count,
265-
handled_request_count=metadata_model.handled_request_count,
263+
metadata=metadata_model,
266264
)
267265

268266
@override
@@ -366,7 +364,7 @@ async def add_batch_of_requests(
366364
if not processed_request.was_already_present and not processed_request.was_already_handled:
367365
new_request_count += 1
368366

369-
self._assumed_total_count += new_request_count
367+
self._metadata.total_request_count += new_request_count
370368

371369
return api_response
372370

@@ -464,7 +462,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
464462

465463
# Update assumed handled count if this wasn't already handled
466464
if not processed_request.was_already_handled:
467-
self._assumed_handled_count += 1
465+
self._metadata.handled_request_count += 1
468466

469467
# Update the cache with the handled request
470468
cache_key = request.unique_key
@@ -512,7 +510,7 @@ async def reclaim_request(
512510
# If the request was previously handled, decrement our handled count since
513511
# we're putting it back for processing.
514512
if request.was_already_handled and not processed_request.was_already_handled:
515-
self._assumed_handled_count -= 1
513+
self._metadata.handled_request_count -= 1
516514

517515
# Update the cache
518516
cache_key = request.unique_key
@@ -670,7 +668,7 @@ async def _list_head(
670668
if cached_request and cached_request.hydrated:
671669
items.append(cached_request.hydrated)
672670

673-
metadata = await self.get_metadata()
671+
metadata = await self._get_metadata_estimate()
674672

675673
return RequestQueueHead(
676674
limit=limit,
@@ -697,6 +695,8 @@ async def _list_head(
697695

698696
# Update the queue head cache
699697
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
698+
# Check if there is another client working with the RequestQueue
699+
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)
700700

701701
for request_data in response.get('items', []):
702702
request = Request.model_validate(request_data)

tests/integration/_utils.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,13 @@
55

66
def generate_unique_resource_name(label: str) -> str:
77
"""Generates a unique resource name, which will contain the given label."""
8+
name_template = 'python-sdk-tests-{}-generated-{}'
9+
template_length = len(name_template.format('', ''))
10+
api_name_limit = 63
11+
generated_random_id_length = 8
12+
label_length_limit = api_name_limit - template_length - generated_random_id_length
13+
814
label = label.replace('_', '-')
9-
return f'python-sdk-tests-{label}-generated-{crypto_random_object_id(8)}'
15+
assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}'
16+
17+
return name_template.format(label, crypto_random_object_id(generated_random_id_length))

tests/integration/conftest.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import apify._actor
2020
from ._utils import generate_unique_resource_name
21+
from apify import Actor
2122
from apify._models import ActorRun
2223

2324
if TYPE_CHECKING:
24-
from collections.abc import Awaitable, Callable, Coroutine, Iterator, Mapping
25+
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterator, Mapping
2526
from decimal import Decimal
2627

2728
from apify_client.clients.resource_clients import ActorClientAsync
29+
from crawlee.storages import RequestQueue
2830

2931
_TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
3032
_API_URL_ENV_VAR = 'APIFY_INTEGRATION_TESTS_API_URL'
@@ -104,6 +106,18 @@ def apify_client_async(apify_token: str) -> ApifyClientAsync:
104106
return ApifyClientAsync(apify_token, api_url=api_url)
105107

106108

109+
@pytest.fixture
110+
async def request_queue_force_cloud(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> AsyncGenerator[RequestQueue]:
111+
"""Create an instance of the Apify request queue on the platform and drop it when the test is finished."""
112+
request_queue_name = generate_unique_resource_name('request_queue')
113+
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)
114+
115+
async with Actor:
116+
rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
117+
yield rq
118+
await rq.drop()
119+
120+
107121
@pytest.fixture(scope='session')
108122
def sdk_wheel_path(tmp_path_factory: pytest.TempPathFactory, testrun_uid: str) -> Path:
109123
"""Build the package wheel if it hasn't been built yet, and return the path to the wheel."""

0 commit comments

Comments
 (0)