Skip to content

Commit 08df986

Browse files
committed
Try to use list_head
1 parent 1d869a4 commit 08df986

File tree

1 file changed

+121
-46
lines changed

1 file changed

+121
-46
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 121 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import re
55
from base64 import b64encode
66
from collections import deque
7-
from datetime import datetime, timedelta, timezone
7+
from datetime import datetime, timezone
88
from hashlib import sha256
99
from logging import getLogger
1010
from typing import TYPE_CHECKING, Final
@@ -61,21 +61,24 @@ class ApifyRequestQueueClient(RequestQueueClient):
6161
- Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to be handled
6262
sooner. (Explanation below)
6363
- This client always consumes first own requests and only if no local requests exists it tries to get requests from
64-
the global queue.
64+
the global queue. ???
65+
- Requests are only added to the queue, never deleted. (Marking as handled is ok.)
6566
6667
If the constraints are not met, the client might work in an unpredictable way.
6768
6869
Optimization notes:
6970
- The client aggressively caches requests to avoid unnecessary API calls.
70-
- The client adds requests to the global queue if they are handled.
71+
- The client adds requests to the global queue if they are handled. (Potential optimization, but problematic,
72+
probably not worth it)
7173
- The client adds unhandled requests to the global queue only if local cache size reaches some threshold or based on
7274
external callback. (To prevent double API call per request - adding request to the global queue and marking it as
73-
handled. The client tries to do that in one step if possible.)
75+
handled. The client tries to do that in one step if possible.) (Potential optimization, but problematic,
76+
probably not worth it)
7477
- The client tracks own forefront (priority requests), that does not have to be in sync with the global forefront.
7578
"""
7679

77-
_DEFAULT_LOCK_TIME: Final[timedelta] = timedelta(minutes=3)
78-
"""The default lock time for requests in the queue."""
80+
_MAX_HEAD_ITEMS: Final[int] = 200
81+
"""The maximum head items read count limited by API."""
7982

8083
_MAX_CACHED_REQUESTS: Final[int] = 1_000_000
8184
"""Maximum number of requests that can be cached."""
@@ -102,8 +105,11 @@ def __init__(
102105
self._head_requests: deque[str] = deque()
103106
"""Ordered unique keys of requests that that represents queue head."""
104107

105-
self._requests_on_platform: set[str] = set()
106-
"""Set of requests unique keys that are already present on the platform. To enable local deduplication."""
108+
self._requests_already_handled: set[str] = set()
109+
"""Local estimation of requests unique keys that are already present and handled on the platform.
110+
111+
(Could be persisted to optimize migrations)
112+
To enhance local deduplication and track handled requests to reduce amount of API calls."""
107113

108114
self._requests_in_progress: set[str] = set()
109115
"""Set of requests unique keys that are being processed locally.
@@ -279,12 +285,27 @@ async def add_batch_of_requests(
279285
Response containing information about the added requests.
280286
"""
281287
# Do not try to add previously added requests to avoid pointless expensive calls to API
288+
# Check if request is known to be already handled (it has to be present as well.)
289+
# Check if request is known to be already present, but unhandled
290+
# Push to the platform. Probably not there, or we are not aware of it
291+
# (added by another producer or before migration).
292+
282293

283294
new_requests: list[ProcessedRequest] = []
284295
already_present_requests: list[ProcessedRequest] = []
285296

286297
for request in requests:
287-
if self._requests_cache.get(request.unique_key):
298+
if request.unique_key in self._requests_already_handled:
299+
already_present_requests.append(
300+
ProcessedRequest.model_validate(
301+
{
302+
'uniqueKey': request.unique_key,
303+
'wasAlreadyPresent': True,
304+
'wasAlreadyHandled': True,
305+
}
306+
)
307+
)
308+
elif self._requests_cache.get(request.unique_key):
288309
already_present_requests.append(
289310
ProcessedRequest.model_validate(
290311
{
@@ -294,7 +315,6 @@ async def add_batch_of_requests(
294315
}
295316
)
296317
)
297-
298318
else:
299319
new_requests.append(
300320
ProcessedRequest.model_validate(
@@ -314,14 +334,38 @@ async def add_batch_of_requests(
314334
else:
315335
self._head_requests.appendleft(request.unique_key)
316336

337+
if new_requests:
338+
# Prepare requests for API by converting to dictionaries.
339+
requests_dict = [
340+
request.model_dump(
341+
by_alias=True,
342+
)
343+
for request in new_requests
344+
]
317345

318-
api_response = AddRequestsResponse.model_validate(
319-
{'unprocessedRequests': [], 'processedRequests': already_present_requests+new_requests}
320-
)
346+
# Send requests to API.
347+
api_response = AddRequestsResponse.model_validate(
348+
await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
349+
)
350+
# Add the locally known already present processed requests based on the local cache.
351+
api_response.processed_requests.extend(already_present_requests)
352+
# Remove unprocessed requests from the cache
353+
for unprocessed_request in api_response.unprocessed_requests:
354+
self._requests_cache.pop(unprocessed_request.unique_key, None)
355+
356+
else:
357+
api_response = AddRequestsResponse.model_validate(
358+
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
359+
)
321360

322361

323362
# Update assumed total count for newly added requests.
324-
self._metadata.total_request_count += len(new_requests)
363+
new_request_count = 0
364+
for processed_request in api_response.processed_requests:
365+
if not processed_request.was_already_present and not processed_request.was_already_handled:
366+
new_request_count += 1
367+
self._metadata.total_request_count += new_request_count
368+
325369
return api_response
326370

327371
@override
@@ -361,29 +405,50 @@ async def fetch_next_request(self) -> Request | None:
361405

362406
while self._head_requests:
363407
request_unique_key = self._head_requests.pop()
364-
if request_unique_key not in self._requests_in_progress:
408+
if (
409+
request_unique_key not in self._requests_in_progress and
410+
request_unique_key not in self._requests_already_handled
411+
):
365412
self._requests_in_progress.add(request_unique_key)
366413
return await self.get_request(request_unique_key)
367414
# No request locally and the ones returned from the platform are already in progress.
368415
return None
369416

370417
async def _ensure_head_is_non_empty(self) -> None:
371418
"""Ensure that the queue head has requests if they are available in the queue."""
372-
if not self._head_requests:
373-
response = await self._api_client.list_and_lock_head(limit=25,
374-
lock_secs=int(self._DEFAULT_LOCK_TIME.total_seconds()))
375-
# Update the queue head cache
376-
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
377-
# Check if there is another client working with the RequestQueue
378-
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)
379-
if modified_at:= response.get('queueModifiedAt'):
380-
self._metadata.modified_at = max(self._metadata.modified_at, modified_at)
381-
382-
for request_data in response.get('items', []):
383-
request = Request.model_validate(request_data)
419+
if len(self._head_requests)<=1:
420+
await self._list_head()
421+
422+
423+
async def _list_head(self) -> None:
424+
desired_new_head_items = 100
425+
# The head will contain in progress requests as well, so we need to fetch more, to get some new ones.
426+
requested_head_items = max(self._MAX_HEAD_ITEMS, desired_new_head_items + len(self._requests_in_progress))
427+
response = await self._api_client.list_head(limit=requested_head_items)
428+
429+
# Update metadata
430+
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
431+
# Check if there is another client working with the RequestQueue
432+
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)
433+
# Should warn once? This might be outside expected context if the other consumers consumes at the same time
434+
435+
if modified_at := response.get('queueModifiedAt'):
436+
self._metadata.modified_at = max(self._metadata.modified_at, modified_at)
437+
438+
# Update the cached data
439+
for request_data in response.get('items', []):
440+
request = Request.model_validate(request_data)
441+
442+
if request.unique_key in self._requests_in_progress:
443+
# Ignore requests that are already in progress, we will not process them again.
444+
continue
445+
if request.was_already_handled:
446+
# Do not cache fully handled requests, we do not need them. Just cache their unique_key.
447+
self._requests_already_handled.add(request.unique_key)
448+
else:
384449
self._requests_cache[request.unique_key] = request
385-
self._head_requests.append(request.unique_key)
386-
self._requests_on_platform.add(request.unique_key)
450+
# Add new requests to the end of the head
451+
self._head_requests.appendleft(request.unique_key)
387452

388453

389454
@override
@@ -406,21 +471,24 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
406471

407472
if cached_request := self._requests_cache[request.unique_key]:
408473
cached_request.handled_at = request.handled_at
409-
try:
410-
# Update the request in the API
411-
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before adding
412-
# to the queue.)
413-
processed_request = await self._update_request(request)
414-
# Remove request from cache. It will no longer bee needed.
415-
self._requests_cache.pop(request.unique_key)
416-
self._requests_in_progress.discard(request.unique_key)
417-
self._requests_on_platform.add(request.unique_key)
418-
419-
except Exception as exc:
420-
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
421-
return None
422-
else:
423-
return processed_request
474+
475+
async with self._fetch_lock:
476+
try:
477+
# Update the request in the API
478+
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
479+
# adding to the queue.)
480+
processed_request = await self._update_request(request)
481+
# Remove request from cache. It will most likely not be needed.
482+
self._requests_cache.pop(request.unique_key)
483+
self._requests_in_progress.discard(request.unique_key)
484+
# Remember that we handled this request, to optimize local deduplication.
485+
self._requests_already_handled.add(request.unique_key)
486+
487+
except Exception as exc:
488+
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
489+
return None
490+
else:
491+
return processed_request
424492

425493
@override
426494
async def reclaim_request(
@@ -448,10 +516,17 @@ async def reclaim_request(
448516
# Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
449517
async with self._fetch_lock:
450518
try:
451-
# Update the request in the API.
519+
# Make sure request is in the local cache. We might need it.
452520
self._requests_cache[request.unique_key] = request
521+
522+
# No longer in progress
453523
self._requests_in_progress.discard(request.unique_key)
454-
self._head_requests.append(request.unique_key)
524+
# No longer handled
525+
self._requests_already_handled.discard(request.unique_key)
526+
527+
if forefront:
528+
# Append to top of the local head estimation
529+
self._head_requests.append(request.unique_key)
455530

456531
processed_request = await self._update_request(request, forefront=forefront)
457532
processed_request.unique_key = request.unique_key

0 commit comments

Comments
 (0)