Skip to content

Commit 7e4e5da

Browse files
authored
fix: Use same client_key for Actor created request_queue and improve its metadata estimation (#552)
### Description - When creating `RequestQueue` from Actor on the platform or with `force_cloud=True` the `client_key` should be set to `run_id`. This ensures: - Each API call of the same `RequestQueue` instance that is using `ApifyRequestQueueClient` will be done with the same `client_key` and thus in metadata `had_multiple_clients=False` - Multiple instances of RequestQueue created by `Actor.open_request_queue()` on the platform share the same `client_key` and thus in metadata `had_multiple_clients=False` - On the platform, since the `client_key` is set to `run_id`, it remains the same for resurrected or migrated run, and thus in metadata `had_multiple_clients=False` - Improved reliability of `had_multiple_clients` allows better estimation of `RequestQueue` metadata. - When `had_multiple_clients=False`, it is possible to trust local estimation of the metadata. - When `had_multiple_clients=True`, local estimation is no longer valid, but still can, in some cases, improve estimation by being ahead of the delayed API update of the metadata. Therefore API-based metadata are fused with local metadata estimation to produce as good estimation as we can. - `ApifyRequestQueueClient` init changed to properly initialize from full metadata - to enable more reliable metadata after migration/resurrection or when using existing `RequestQueue` - During `_list_head`, if there is a call to API, use the available `had_multiple_clients` to update local estimation of this value. This is a cheap way of knowing if there is another client or not without the need to make a new API call. `_list_head` is called frequently enough to make the local estimation of `had_multiple_clients` decently good. ### Issues - Closes: #536 ### Testing - New tests added
1 parent 1eb11ef commit 7e4e5da

File tree

8 files changed

+269
-83
lines changed

8 files changed

+269
-83
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 69 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from typing_extensions import override
1414

1515
from apify_client import ApifyClientAsync
16+
from crawlee._utils.crypto import crypto_random_object_id
1617
from crawlee.storage_clients._base import RequestQueueClient
1718
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1819

@@ -65,10 +66,7 @@ def __init__(
6566
self,
6667
*,
6768
api_client: RequestQueueClientAsync,
68-
id: str,
69-
name: str | None,
70-
total_request_count: int,
71-
handled_request_count: int,
69+
metadata: RequestQueueMetadata,
7270
) -> None:
7371
"""Initialize a new instance.
7472
@@ -77,11 +75,8 @@ def __init__(
7775
self._api_client = api_client
7876
"""The Apify request queue client for API operations."""
7977

80-
self._id = id
81-
"""The ID of the request queue."""
82-
83-
self._name = name
84-
"""The name of the request queue."""
78+
self._metadata = metadata
79+
"""Additional data related to the RequestQueue."""
8580

8681
self._queue_head = deque[str]()
8782
"""A deque to store request unique keys in the queue head."""
@@ -95,40 +90,43 @@ def __init__(
9590
self._should_check_for_forefront_requests = False
9691
"""Whether to check for forefront requests in the next list_head call."""
9792

98-
self._had_multiple_clients = False
99-
"""Whether the request queue has been accessed by multiple clients."""
100-
101-
self._initial_total_count = total_request_count
102-
"""The initial total request count (from the API) when the queue was opened."""
103-
104-
self._initial_handled_count = handled_request_count
105-
"""The initial handled request count (from the API) when the queue was opened."""
106-
107-
self._assumed_total_count = 0
108-
"""The number of requests we assume are in the queue (tracked manually for this instance)."""
109-
110-
self._assumed_handled_count = 0
111-
"""The number of requests we assume have been handled (tracked manually for this instance)."""
112-
11393
self._fetch_lock = asyncio.Lock()
11494
"""Fetch lock to minimize race conditions when communicating with API."""
11595

96+
async def _get_metadata_estimate(self) -> RequestQueueMetadata:
97+
"""Try to get cached metadata first. If multiple clients, fuse with global metadata.
98+
99+
This method is used internally to avoid unnecessary API call unless needed (multiple clients).
100+
Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one
101+
client, it is the better choice.
102+
"""
103+
if self._metadata.had_multiple_clients:
104+
return await self.get_metadata()
105+
# Get local estimation (will not include changes done bo another client)
106+
return self._metadata
107+
116108
@override
117109
async def get_metadata(self) -> RequestQueueMetadata:
118-
total_count = self._initial_total_count + self._assumed_total_count
119-
handled_count = self._initial_handled_count + self._assumed_handled_count
120-
pending_count = total_count - handled_count
110+
"""Get metadata about the request queue.
121111
112+
Returns:
113+
Metadata from the API, merged with local estimation, because in some cases, the data from the API can
114+
be delayed.
115+
"""
116+
response = await self._api_client.get()
117+
if response is None:
118+
raise ValueError('Failed to fetch request queue metadata from the API.')
119+
# Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.)
122120
return RequestQueueMetadata(
123-
id=self._id,
124-
name=self._name,
125-
total_request_count=total_count,
126-
handled_request_count=handled_count,
127-
pending_request_count=pending_count,
128-
created_at=datetime.now(timezone.utc),
129-
modified_at=datetime.now(timezone.utc),
130-
accessed_at=datetime.now(timezone.utc),
131-
had_multiple_clients=self._had_multiple_clients,
121+
id=response['id'],
122+
name=response['name'],
123+
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
124+
handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count),
125+
pending_request_count=response['pendingRequestCount'],
126+
created_at=min(response['createdAt'], self._metadata.created_at),
127+
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
128+
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
129+
had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients,
132130
)
133131

134132
@classmethod
@@ -187,27 +185,34 @@ async def open(
187185
)
188186
apify_rqs_client = apify_client_async.request_queues()
189187

190-
# If both id and name are provided, raise an error.
191-
if id and name:
192-
raise ValueError('Only one of "id" or "name" can be specified, not both.')
193-
194-
# If id is provided, get the storage by ID.
195-
if id and name is None:
196-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
188+
match (id, name):
189+
case (None, None):
190+
# If both id and name are None, try to get the default storage ID from environment variables.
191+
# The default storage ID environment variable is set by the Apify platform. It also contains
192+
# a new storage ID after Actor's reboot or migration.
193+
id = configuration.default_request_queue_id
194+
case (None, name):
195+
# If only name is provided, get or create the storage by name.
196+
id = RequestQueueMetadata.model_validate(
197+
await apify_rqs_client.get_or_create(name=name),
198+
).id
199+
case (_, None):
200+
# If only id is provided, use it.
201+
pass
202+
case (_, _):
203+
# If both id and name are provided, raise an error.
204+
raise ValueError('Only one of "id" or "name" can be specified, not both.')
205+
if id is None:
206+
raise RuntimeError('Unreachable code')
197207

198-
# If name is provided, get or create the storage by name.
199-
if name and id is None:
200-
id = RequestQueueMetadata.model_validate(
201-
await apify_rqs_client.get_or_create(name=name),
202-
).id
203-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
208+
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
209+
# It should persist across migrated or resurrected Actor runs on the Apify platform.
210+
_api_max_client_key_length = 32
211+
client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[
212+
:_api_max_client_key_length
213+
]
204214

205-
# If both id and name are None, try to get the default storage ID from environment variables.
206-
# The default storage ID environment variable is set by the Apify platform. It also contains
207-
# a new storage ID after Actor's reboot or migration.
208-
if id is None and name is None:
209-
id = configuration.default_request_queue_id
210-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
215+
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
211216

212217
# Fetch its metadata.
213218
metadata = await apify_rq_client.get()
@@ -217,27 +222,18 @@ async def open(
217222
id = RequestQueueMetadata.model_validate(
218223
await apify_rqs_client.get_or_create(),
219224
).id
220-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
225+
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
221226

222227
# Verify that the storage exists by fetching its metadata again.
223228
metadata = await apify_rq_client.get()
224229
if metadata is None:
225230
raise ValueError(f'Opening request queue with id={id} and name={name} failed.')
226231

227-
metadata_model = RequestQueueMetadata.model_validate(
228-
await apify_rqs_client.get_or_create(),
229-
)
230-
231-
# Ensure we have a valid ID.
232-
if id is None:
233-
raise ValueError('Request queue ID cannot be None.')
232+
metadata_model = RequestQueueMetadata.model_validate(metadata)
234233

235234
return cls(
236235
api_client=apify_rq_client,
237-
id=id,
238-
name=name,
239-
total_request_count=metadata_model.total_request_count,
240-
handled_request_count=metadata_model.handled_request_count,
236+
metadata=metadata_model,
241237
)
242238

243239
@override
@@ -341,7 +337,7 @@ async def add_batch_of_requests(
341337
if not processed_request.was_already_present and not processed_request.was_already_handled:
342338
new_request_count += 1
343339

344-
self._assumed_total_count += new_request_count
340+
self._metadata.total_request_count += new_request_count
345341

346342
return api_response
347343

@@ -439,7 +435,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
439435

440436
# Update assumed handled count if this wasn't already handled
441437
if not processed_request.was_already_handled:
442-
self._assumed_handled_count += 1
438+
self._metadata.handled_request_count += 1
443439

444440
# Update the cache with the handled request
445441
cache_key = request.unique_key
@@ -487,7 +483,7 @@ async def reclaim_request(
487483
# If the request was previously handled, decrement our handled count since
488484
# we're putting it back for processing.
489485
if request.was_already_handled and not processed_request.was_already_handled:
490-
self._assumed_handled_count -= 1
486+
self._metadata.handled_request_count -= 1
491487

492488
# Update the cache
493489
cache_key = request.unique_key
@@ -645,7 +641,7 @@ async def _list_head(
645641
if cached_request and cached_request.hydrated:
646642
items.append(cached_request.hydrated)
647643

648-
metadata = await self.get_metadata()
644+
metadata = await self._get_metadata_estimate()
649645

650646
return RequestQueueHead(
651647
limit=limit,
@@ -672,6 +668,8 @@ async def _list_head(
672668

673669
# Update the queue head cache
674670
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
671+
# Check if there is another client working with the RequestQueue
672+
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)
675673

676674
for request_data in response.get('items', []):
677675
request = Request.model_validate(request_data)

tests/integration/_utils.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,13 @@
55

66
def generate_unique_resource_name(label: str) -> str:
77
"""Generates a unique resource name, which will contain the given label."""
8+
name_template = 'python-sdk-tests-{}-generated-{}'
9+
template_length = len(name_template.format('', ''))
10+
api_name_limit = 63
11+
generated_random_id_length = 8
12+
label_length_limit = api_name_limit - template_length - generated_random_id_length
13+
814
label = label.replace('_', '-')
9-
return f'python-sdk-tests-{label}-generated-{crypto_random_object_id(8)}'
15+
assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}'
16+
17+
return name_template.format(label, crypto_random_object_id(generated_random_id_length))

tests/integration/conftest.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import apify._actor
2020
from ._utils import generate_unique_resource_name
21+
from apify import Actor
2122
from apify._models import ActorRun
2223

2324
if TYPE_CHECKING:
24-
from collections.abc import Awaitable, Callable, Coroutine, Iterator, Mapping
25+
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterator, Mapping
2526
from decimal import Decimal
2627

2728
from apify_client.clients.resource_clients import ActorClientAsync
29+
from crawlee.storages import RequestQueue
2830

2931
_TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
3032
_API_URL_ENV_VAR = 'APIFY_INTEGRATION_TESTS_API_URL'
@@ -104,6 +106,18 @@ def apify_client_async(apify_token: str) -> ApifyClientAsync:
104106
return ApifyClientAsync(apify_token, api_url=api_url)
105107

106108

109+
@pytest.fixture
110+
async def request_queue_force_cloud(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> AsyncGenerator[RequestQueue]:
111+
"""Create an instance of the Apify request queue on the platform and drop it when the test is finished."""
112+
request_queue_name = generate_unique_resource_name('request_queue')
113+
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)
114+
115+
async with Actor:
116+
rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
117+
yield rq
118+
await rq.drop()
119+
120+
107121
@pytest.fixture(scope='session')
108122
def sdk_wheel_path(tmp_path_factory: pytest.TempPathFactory, testrun_uid: str) -> Path:
109123
"""Build the package wheel if it hasn't been built yet, and return the path to the wheel."""

0 commit comments

Comments
 (0)