Skip to content

Commit b159d3e

Browse files
committed
Draft, unique key enables better metadata handling
1 parent 8ba084d commit b159d3e

File tree

3 files changed

+193
-87
lines changed

3 files changed

+193
-87
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 81 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing_extensions import override
1111

1212
from apify_client import ApifyClientAsync
13+
from crawlee._utils.crypto import crypto_random_object_id
1314
from crawlee._utils.requests import unique_key_to_request_id
1415
from crawlee.storage_clients._base import RequestQueueClient
1516
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
@@ -40,10 +41,7 @@ def __init__(
4041
self,
4142
*,
4243
api_client: RequestQueueClientAsync,
43-
id: str,
44-
name: str | None,
45-
total_request_count: int,
46-
handled_request_count: int,
44+
metadata: RequestQueueMetadata,
4745
) -> None:
4846
"""Initialize a new instance.
4947
@@ -52,11 +50,8 @@ def __init__(
5250
self._api_client = api_client
5351
"""The Apify request queue client for API operations."""
5452

55-
self._id = id
56-
"""The ID of the request queue."""
57-
58-
self._name = name
59-
"""The name of the request queue."""
53+
self._metadata = metadata
54+
"""Additional data related to the RequestQueue."""
6055

6156
self._queue_head = deque[str]()
6257
"""A deque to store request IDs in the queue head."""
@@ -70,41 +65,55 @@ def __init__(
7065
self._should_check_for_forefront_requests = False
7166
"""Whether to check for forefront requests in the next list_head call."""
7267

73-
self._had_multiple_clients = False
74-
"""Whether the request queue has been accessed by multiple clients."""
75-
76-
self._initial_total_count = total_request_count
77-
"""The initial total request count (from the API) when the queue was opened."""
78-
79-
self._initial_handled_count = handled_request_count
80-
"""The initial handled request count (from the API) when the queue was opened."""
81-
82-
self._assumed_total_count = 0
83-
"""The number of requests we assume are in the queue (tracked manually for this instance)."""
84-
85-
self._assumed_handled_count = 0
86-
"""The number of requests we assume have been handled (tracked manually for this instance)."""
87-
8868
self._fetch_lock = asyncio.Lock()
8969
"""Fetch lock to minimize race conditions when communicating with API."""
9070

9171
@override
9272
async def get_metadata(self) -> RequestQueueMetadata:
93-
total_count = self._initial_total_count + self._assumed_total_count
94-
handled_count = self._initial_handled_count + self._assumed_handled_count
95-
pending_count = total_count - handled_count
96-
97-
return RequestQueueMetadata(
98-
id=self._id,
99-
name=self._name,
100-
total_request_count=total_count,
101-
handled_request_count=handled_count,
102-
pending_request_count=pending_count,
103-
created_at=datetime.now(timezone.utc),
104-
modified_at=datetime.now(timezone.utc),
105-
accessed_at=datetime.now(timezone.utc),
106-
had_multiple_clients=self._had_multiple_clients,
107-
)
73+
"""Get metadata about the request queue."""
74+
if self._metadata.had_multiple_clients:
75+
# Enhanced from API (can be delayed few seconds)
76+
response = await self._api_client.get()
77+
if response is None:
78+
raise ValueError('Failed to fetch request queue metadata from the API.')
79+
return RequestQueueMetadata(
80+
id=response['id'],
81+
name=response['name'],
82+
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
83+
handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count),
84+
pending_request_count=response['pendingRequestCount'],
85+
created_at=response['createdAt'],
86+
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
87+
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
88+
had_multiple_clients=response['hadMultipleClients'],
89+
)
90+
# Update local estimation?
91+
# Get local estimation (will not include changes done bo another client)
92+
return self._metadata
93+
94+
95+
@override
96+
async def get_metadata(self) -> RequestQueueMetadata:
97+
"""Get metadata about the request queue."""
98+
if self._metadata.had_multiple_clients:
99+
# Enhanced from API (can be delayed few seconds)
100+
response = await self._api_client.get()
101+
if response is None:
102+
raise ValueError('Failed to fetch request queue metadata from the API.')
103+
return RequestQueueMetadata(
104+
id=response['id'],
105+
name=response['name'],
106+
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
107+
handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count),
108+
pending_request_count=response['pendingRequestCount'],
109+
created_at=response['createdAt'],
110+
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
111+
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
112+
had_multiple_clients=response['hadMultipleClients'],
113+
)
114+
# Update local estimation?
115+
# Get local estimation (will not include changes done bo another client)
116+
return self._metadata
108117

109118
@classmethod
110119
async def open(
@@ -162,27 +171,33 @@ async def open(
162171
)
163172
apify_rqs_client = apify_client_async.request_queues()
164173

165-
# If both id and name are provided, raise an error.
166-
if id and name:
167-
raise ValueError('Only one of "id" or "name" can be specified, not both.')
168-
169-
# If id is provided, get the storage by ID.
170-
if id and name is None:
171-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
174+
match (id, name):
175+
case (None, None):
176+
# If both id and name are None, try to get the default storage ID from environment variables.
177+
# The default storage ID environment variable is set by the Apify platform. It also contains
178+
# a new storage ID after Actor's reboot or migration.
179+
id = configuration.default_request_queue_id
180+
case (None, name):
181+
# If name is provided, get or create the storage by name.
182+
id = RequestQueueMetadata.model_validate(
183+
await apify_rqs_client.get_or_create(name=name),
184+
).id
185+
case (_, None):
186+
pass
187+
case (_, _):
188+
# If both id and name are provided, raise an error.
189+
raise ValueError('Only one of "id" or "name" can be specified, not both.')
190+
if id is None:
191+
raise RuntimeError('Unreachable code')
172192

173-
# If name is provided, get or create the storage by name.
174-
if name and id is None:
175-
id = RequestQueueMetadata.model_validate(
176-
await apify_rqs_client.get_or_create(name=name),
177-
).id
178-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
193+
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
194+
# It should persist across migrated Actor runs on the Apify platform.
195+
_api_max_client_key_length = 32
196+
client_key = (configuration.actor_run_id or crypto_random_object_id(length=_api_max_client_key_length))[
197+
:_api_max_client_key_length
198+
]
179199

180-
# If both id and name are None, try to get the default storage ID from environment variables.
181-
# The default storage ID environment variable is set by the Apify platform. It also contains
182-
# a new storage ID after Actor's reboot or migration.
183-
if id is None and name is None:
184-
id = configuration.default_request_queue_id
185-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
200+
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
186201

187202
# Fetch its metadata.
188203
metadata = await apify_rq_client.get()
@@ -192,27 +207,18 @@ async def open(
192207
id = RequestQueueMetadata.model_validate(
193208
await apify_rqs_client.get_or_create(),
194209
).id
195-
apify_rq_client = apify_client_async.request_queue(request_queue_id=id)
210+
apify_rq_client = apify_client_async.request_queue(request_queue_id=id, client_key=client_key)
196211

197212
# Verify that the storage exists by fetching its metadata again.
198213
metadata = await apify_rq_client.get()
199214
if metadata is None:
200215
raise ValueError(f'Opening request queue with id={id} and name={name} failed.')
201216

202-
metadata_model = RequestQueueMetadata.model_validate(
203-
await apify_rqs_client.get_or_create(),
204-
)
205-
206-
# Ensure we have a valid ID.
207-
if id is None:
208-
raise ValueError('Request queue ID cannot be None.')
217+
metadata_model = RequestQueueMetadata.model_validate(metadata)
209218

210219
return cls(
211220
api_client=apify_rq_client,
212-
id=id,
213-
name=name,
214-
total_request_count=metadata_model.total_request_count,
215-
handled_request_count=metadata_model.handled_request_count,
221+
metadata=metadata_model,
216222
)
217223

218224
@override
@@ -261,7 +267,7 @@ async def add_batch_of_requests(
261267
if not processed_request.was_already_present and not processed_request.was_already_handled:
262268
new_request_count += 1
263269

264-
self._assumed_total_count += new_request_count
270+
self._metadata.total_request_count += new_request_count
265271

266272
return api_response
267273

@@ -359,7 +365,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
359365

360366
# Update assumed handled count if this wasn't already handled
361367
if not processed_request.was_already_handled:
362-
self._assumed_handled_count += 1
368+
self._metadata.handled_request_count += 1
363369

364370
# Update the cache with the handled request
365371
cache_key = unique_key_to_request_id(request.unique_key)
@@ -407,7 +413,7 @@ async def reclaim_request(
407413
# If the request was previously handled, decrement our handled count since
408414
# we're putting it back for processing.
409415
if request.was_already_handled and not processed_request.was_already_handled:
410-
self._assumed_handled_count -= 1
416+
self._metadata.handled_request_count -= 1
411417

412418
# Update the cache
413419
cache_key = unique_key_to_request_id(request.unique_key)
@@ -591,6 +597,8 @@ async def _list_head(
591597

592598
# Update the queue head cache
593599
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
600+
# Check if there is another client working with the RequestQueue
601+
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)
594602

595603
for request_data in response.get('items', []):
596604
request = Request.model_validate(request_data)

tests/integration/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ def apify_token() -> str:
9696
return api_token
9797

9898

99+
@pytest.fixture(autouse=True)
100+
def set_token(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> None:
101+
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)
102+
103+
99104
@pytest.fixture
100105
def apify_client_async(apify_token: str) -> ApifyClientAsync:
101106
"""Create an instance of the ApifyClientAsync.

0 commit comments

Comments
 (0)