Skip to content

Commit 7ee8af4

Browse files
committed
Suggest solving the forefront handling of reclaimed requests
Drop forefront info from local chache as it is unreliable and complicates the flow. Only the platform knows the real forefront, do not try to keep imperfect copy of it locally and rather design the system to work without being aware of the cached forefront.
1 parent 662fbd5 commit 7ee8af4

File tree

3 files changed

+62
-44
lines changed

3 files changed

+62
-44
lines changed

src/apify/storage_clients/_apify/_models.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,3 @@ class CachedRequest(BaseModel):
105105

106106
lock_expires_at: datetime | None = None
107107
"""The expiration time of the lock on the request."""
108-
109-
forefront: bool = False
110-
"""Whether the request was added to the forefront of the queue."""

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
358358
self._cache_request(
359359
cache_key,
360360
processed_request,
361-
forefront=False,
362361
hydrated_request=request,
363362
)
364363
except Exception as exc:
@@ -405,7 +404,6 @@ async def reclaim_request(
405404
self._cache_request(
406405
cache_key,
407406
processed_request,
408-
forefront=forefront,
409407
hydrated_request=request,
410408
)
411409

@@ -463,9 +461,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
463461
# Try to prolong the lock if it's expired
464462
try:
465463
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
466-
response = await self._prolong_request_lock(
467-
request_id, forefront=cached_entry.forefront, lock_secs=lock_secs
468-
)
464+
response = await self._prolong_request_lock(request_id, lock_secs=lock_secs)
469465
cached_entry.lock_expires_at = response.lock_expires_at
470466
except Exception:
471467
# If prolonging the lock fails, we lost the request
@@ -478,7 +474,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
478474
try:
479475
# Try to acquire or prolong the lock
480476
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
481-
await self._prolong_request_lock(request_id, forefront=False, lock_secs=lock_secs)
477+
await self._prolong_request_lock(request_id, lock_secs=lock_secs)
482478

483479
# Fetch the request data
484480
request = await self.get_request(request_id)
@@ -498,7 +494,6 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
498494
was_already_present=True,
499495
was_already_handled=request.handled_at is not None,
500496
),
501-
forefront=False,
502497
hydrated_request=request,
503498
)
504499
except Exception as exc:
@@ -568,6 +563,8 @@ async def _list_head(
568563
queue_has_locked_requests=self._queue_has_locked_requests,
569564
lock_time=lock_time,
570565
)
566+
if self._should_check_for_forefront_requests:
567+
self._should_check_for_forefront_requests = False
571568

572569
# Otherwise fetch from API
573570
lock_time = lock_time or self._DEFAULT_LOCK_TIME
@@ -581,16 +578,9 @@ async def _list_head(
581578
# Update the queue head cache
582579
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
583580

584-
# Clear current queue head if we're checking for forefront requests
585-
if self._should_check_for_forefront_requests:
586-
self._queue_head.clear()
587-
self._should_check_for_forefront_requests = False
588-
589-
# Process and cache the requests
590-
head_id_buffer = list[str]()
591-
forefront_head_id_buffer = list[str]()
592-
593-
for request_data in response.get('items', []):
581+
# Iterate over new requests and push them to the front of the queue.
582+
# Since we push to the front of the queue, we have to iterate in reverse order to preserve the intended order.
583+
for request_data in reversed(response.get('items', [])):
594584
request = Request.model_validate(request_data)
595585

596586
# Skip requests without ID or unique key
@@ -604,59 +594,43 @@ async def _list_head(
604594
)
605595
continue
606596

607-
# Check if this request was already cached and if it was added to forefront
608-
cache_key = unique_key_to_request_id(request.unique_key)
609-
cached_request = self._requests_cache.get(cache_key)
610-
forefront = cached_request.forefront if cached_request else False
611-
612-
# Add to appropriate buffer based on forefront flag
613-
if forefront:
614-
forefront_head_id_buffer.insert(0, request.id)
615-
else:
616-
head_id_buffer.append(request.id)
617-
618597
# Cache the request
619598
self._cache_request(
620-
cache_key,
599+
unique_key_to_request_id(request.unique_key),
621600
ProcessedRequest(
622601
id=request.id,
623602
unique_key=request.unique_key,
624603
was_already_present=True,
625604
was_already_handled=False,
626605
),
627-
forefront=forefront,
628606
hydrated_request=request,
629607
)
630608

631-
# Update the queue head deque
632-
for request_id in head_id_buffer:
633-
self._queue_head.append(request_id)
634-
635-
for request_id in forefront_head_id_buffer:
636-
self._queue_head.appendleft(request_id)
609+
# All new requests are added to the forefront, existing leftover locked requests kept in the end.
610+
self._queue_head.appendleft(request.id)
637611

638612
return RequestQueueHead.model_validate(response)
639613

640614
async def _prolong_request_lock(
641615
self,
642616
request_id: str,
643617
*,
644-
forefront: bool = False,
645618
lock_secs: int,
646619
) -> ProlongRequestLockResponse:
647620
"""Prolong the lock on a specific request in the queue.
648621
649622
Args:
650623
request_id: The identifier of the request whose lock is to be prolonged.
651-
forefront: Whether to put the request in the beginning or the end of the queue after lock expires.
652624
lock_secs: The additional amount of time, in seconds, that the request will remain locked.
653625
654626
Returns:
655627
A response containing the time at which the lock will expire.
656628
"""
657629
response = await self._api_client.prolong_request_lock(
658630
request_id=request_id,
659-
forefront=forefront,
631+
# All requests reaching this code were the tip of the queue at the moment when they were fetched,
632+
# so if their lock expires, they should be put back to the forefront as their handling is long overdue.
633+
forefront=True,
660634
lock_secs=lock_secs,
661635
)
662636

@@ -703,7 +677,6 @@ def _cache_request(
703677
cache_key: str,
704678
processed_request: ProcessedRequest,
705679
*,
706-
forefront: bool,
707680
hydrated_request: Request | None = None,
708681
) -> None:
709682
"""Cache a request for future use.
@@ -719,5 +692,4 @@ def _cache_request(
719692
was_already_handled=processed_request.was_already_handled,
720693
hydrated=hydrated_request,
721694
lock_expires_at=None,
722-
forefront=forefront,
723695
)

tests/integration/test_crawlers_with_storages.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,52 @@ async def default_handler(_: ParselCrawlingContext) -> None:
109109
run_result = await run_actor(actor)
110110

111111
assert run_result.status == 'SUCCEEDED'
112+
113+
114+
async def test_a() -> None:
115+
"""Test that the actor respects max_request_retries."""
116+
117+
from apify import Actor
118+
119+
async with Actor:
120+
rq = await Actor.open_request_queue(name='asdasdd', force_cloud=True)
121+
await rq.drop()
122+
rq = await Actor.open_request_queue(name='asdasdd', force_cloud=True)
123+
Actor.log.info('Request queue opened')
124+
125+
# Add initial requests
126+
await rq.add_request('https://example.com/1')
127+
await rq.add_request('https://example.com/2')
128+
Actor.log.info('Added initial requests')
129+
130+
# Fetch one and reclaim to forefront
131+
request1 = await rq.fetch_next_request()
132+
assert request1 is not None, f'request1={request1}'
133+
assert request1.url == 'https://example.com/1', f'request1.url={request1.url}'
134+
Actor.log.info(f'Fetched request: {request1.url}')
135+
136+
await rq.reclaim_request(request1, forefront=True)
137+
Actor.log.info('Reclaimed request to forefront')
138+
139+
# Add forefront request
140+
await rq.add_request('https://example.com/priority', forefront=True)
141+
Actor.log.info('Added new forefront request')
142+
143+
# Fetch all requests and verify forefront behavior
144+
urls_ordered = list[str]()
145+
while next_request := await rq.fetch_next_request():
146+
urls_ordered.append(next_request.url)
147+
await rq.mark_request_as_handled(next_request)
148+
149+
Actor.log.info(f'Final order of fetched URLs: {urls_ordered}')
150+
151+
# Verify that we got all 3 requests
152+
assert len(urls_ordered) == 3, f'len(urls_ordered)={len(urls_ordered)}'
153+
154+
assert urls_ordered[0] == 'https://example.com/priority', f'urls_ordered[0]={urls_ordered[0]}'
155+
assert urls_ordered[1] == request1.url, (
156+
f'urls_ordered[1]={urls_ordered[1]}',
157+
f'request1.url={request1.url}',
158+
)
159+
assert urls_ordered[2] == 'https://example.com/2', f'urls_ordered[2]={urls_ordered[2]}'
160+
Actor.log.info('Request ordering verified successfully')

0 commit comments

Comments
 (0)