Skip to content

Commit 8b7fc77

Browse files
authored
fix: Suggest solving the forefront handling of reclaimed requests (#537)
Drop forefront info from local chache as it is unreliable and complicates the flow. Only the platform knows the real forefront, do not try to keep imperfect copy of it locally and rather design the system to work without being aware of the cached forefront.
1 parent 1e63518 commit 8b7fc77

File tree

2 files changed

+16
-42
lines changed

2 files changed

+16
-42
lines changed

src/apify/storage_clients/_apify/_models.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,3 @@ class CachedRequest(BaseModel):
105105

106106
lock_expires_at: datetime | None = None
107107
"""The expiration time of the lock on the request."""
108-
109-
forefront: bool = False
110-
"""Whether the request was added to the forefront of the queue."""

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
358358
self._cache_request(
359359
cache_key,
360360
processed_request,
361-
forefront=False,
362361
hydrated_request=request,
363362
)
364363
except Exception as exc:
@@ -405,7 +404,6 @@ async def reclaim_request(
405404
self._cache_request(
406405
cache_key,
407406
processed_request,
408-
forefront=forefront,
409407
hydrated_request=request,
410408
)
411409

@@ -463,9 +461,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
463461
# Try to prolong the lock if it's expired
464462
try:
465463
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
466-
response = await self._prolong_request_lock(
467-
request_id, forefront=cached_entry.forefront, lock_secs=lock_secs
468-
)
464+
response = await self._prolong_request_lock(request_id, lock_secs=lock_secs)
469465
cached_entry.lock_expires_at = response.lock_expires_at
470466
except Exception:
471467
# If prolonging the lock fails, we lost the request
@@ -478,7 +474,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
478474
try:
479475
# Try to acquire or prolong the lock
480476
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
481-
await self._prolong_request_lock(request_id, forefront=False, lock_secs=lock_secs)
477+
await self._prolong_request_lock(request_id, lock_secs=lock_secs)
482478

483479
# Fetch the request data
484480
request = await self.get_request(request_id)
@@ -498,7 +494,6 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
498494
was_already_present=True,
499495
was_already_handled=request.handled_at is not None,
500496
),
501-
forefront=False,
502497
hydrated_request=request,
503498
)
504499
except Exception as exc:
@@ -569,6 +564,12 @@ async def _list_head(
569564
lock_time=lock_time,
570565
)
571566

567+
leftover_buffer = list[str]()
568+
if self._should_check_for_forefront_requests:
569+
leftover_buffer = list(self._queue_head)
570+
self._queue_head.clear()
571+
self._should_check_for_forefront_requests = False
572+
572573
# Otherwise fetch from API
573574
lock_time = lock_time or self._DEFAULT_LOCK_TIME
574575
lock_secs = int(lock_time.total_seconds())
@@ -581,15 +582,6 @@ async def _list_head(
581582
# Update the queue head cache
582583
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
583584

584-
# Clear current queue head if we're checking for forefront requests
585-
if self._should_check_for_forefront_requests:
586-
self._queue_head.clear()
587-
self._should_check_for_forefront_requests = False
588-
589-
# Process and cache the requests
590-
head_id_buffer = list[str]()
591-
forefront_head_id_buffer = list[str]()
592-
593585
for request_data in response.get('items', []):
594586
request = Request.model_validate(request_data)
595587

@@ -604,59 +596,46 @@ async def _list_head(
604596
)
605597
continue
606598

607-
# Check if this request was already cached and if it was added to forefront
608-
cache_key = unique_key_to_request_id(request.unique_key)
609-
cached_request = self._requests_cache.get(cache_key)
610-
forefront = cached_request.forefront if cached_request else False
611-
612-
# Add to appropriate buffer based on forefront flag
613-
if forefront:
614-
forefront_head_id_buffer.insert(0, request.id)
615-
else:
616-
head_id_buffer.append(request.id)
617-
618599
# Cache the request
619600
self._cache_request(
620-
cache_key,
601+
unique_key_to_request_id(request.unique_key),
621602
ProcessedRequest(
622603
id=request.id,
623604
unique_key=request.unique_key,
624605
was_already_present=True,
625606
was_already_handled=False,
626607
),
627-
forefront=forefront,
628608
hydrated_request=request,
629609
)
630610

631-
# Update the queue head deque
632-
for request_id in head_id_buffer:
633-
self._queue_head.append(request_id)
611+
self._queue_head.append(request.id)
634612

635-
for request_id in forefront_head_id_buffer:
636-
self._queue_head.appendleft(request_id)
613+
for leftover_request_id in leftover_buffer:
614+
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
615+
self._queue_head.append(leftover_request_id)
637616

638617
return RequestQueueHead.model_validate(response)
639618

640619
async def _prolong_request_lock(
641620
self,
642621
request_id: str,
643622
*,
644-
forefront: bool = False,
645623
lock_secs: int,
646624
) -> ProlongRequestLockResponse:
647625
"""Prolong the lock on a specific request in the queue.
648626
649627
Args:
650628
request_id: The identifier of the request whose lock is to be prolonged.
651-
forefront: Whether to put the request in the beginning or the end of the queue after lock expires.
652629
lock_secs: The additional amount of time, in seconds, that the request will remain locked.
653630
654631
Returns:
655632
A response containing the time at which the lock will expire.
656633
"""
657634
response = await self._api_client.prolong_request_lock(
658635
request_id=request_id,
659-
forefront=forefront,
636+
# All requests reaching this code were the tip of the queue at the moment when they were fetched,
637+
# so if their lock expires, they should be put back to the forefront as their handling is long overdue.
638+
forefront=True,
660639
lock_secs=lock_secs,
661640
)
662641

@@ -703,7 +682,6 @@ def _cache_request(
703682
cache_key: str,
704683
processed_request: ProcessedRequest,
705684
*,
706-
forefront: bool,
707685
hydrated_request: Request | None = None,
708686
) -> None:
709687
"""Cache a request for future use.
@@ -719,5 +697,4 @@ def _cache_request(
719697
was_already_handled=processed_request.was_already_handled,
720698
hydrated=hydrated_request,
721699
lock_expires_at=None,
722-
forefront=forefront,
723700
)

0 commit comments

Comments
 (0)