Skip to content

Commit 8e2f5d4

Browse files
committed
Draft for tests
1 parent 7e4e5da commit 8e2f5d4

File tree

1 file changed

+9
-98
lines changed

1 file changed

+9
-98
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 9 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from crawlee.storage_clients._base import RequestQueueClient
1818
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1919

20-
from ._models import CachedRequest, ProlongRequestLockResponse, RequestQueueHead
20+
from ._models import CachedRequest, RequestQueueHead
2121
from apify import Request
2222

2323
if TYPE_CHECKING:
@@ -498,11 +498,6 @@ async def reclaim_request(
498498
if forefront:
499499
self._should_check_for_forefront_requests = True
500500

501-
# Try to release the lock on the request
502-
try:
503-
await self._delete_request_lock(request.unique_key, forefront=forefront)
504-
except Exception as err:
505-
logger.debug(f'Failed to delete request lock for request {request.unique_key}', exc_info=err)
506501
except Exception as exc:
507502
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
508503
return None
@@ -516,10 +511,10 @@ async def is_empty(self) -> bool:
516511
Returns:
517512
True if the queue is empty, False otherwise.
518513
"""
519-
# Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent.
514+
# Check _list_head.
520515
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
521516
async with self._fetch_lock:
522-
head = await self._list_head(limit=1, lock_time=None)
517+
head = await self._list_head(limit=1)
523518
return len(head.items) == 0 and not self._queue_has_locked_requests
524519

525520
async def _ensure_head_is_non_empty(self) -> None:
@@ -529,7 +524,7 @@ async def _ensure_head_is_non_empty(self) -> None:
529524
return
530525

531526
# Fetch requests from the API and populate the queue head
532-
await self._list_head(lock_time=self._DEFAULT_LOCK_TIME)
527+
await self._list_head()
533528

534529
async def _get_or_hydrate_request(self, unique_key: str) -> Request | None:
535530
"""Get a request by unique key, either from cache or by fetching from API.
@@ -544,32 +539,16 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None:
544539
cached_entry = self._requests_cache.get(unique_key)
545540

546541
if cached_entry and cached_entry.hydrated:
547-
# If we have the request hydrated in cache, check if lock is expired
548-
if cached_entry.lock_expires_at and cached_entry.lock_expires_at < datetime.now(tz=timezone.utc):
549-
# Try to prolong the lock if it's expired
550-
try:
551-
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
552-
response = await self._prolong_request_lock(unique_key, lock_secs=lock_secs)
553-
cached_entry.lock_expires_at = response.lock_expires_at
554-
except Exception:
555-
# If prolonging the lock fails, we lost the request
556-
logger.debug(f'Failed to prolong lock for request {unique_key}, returning None')
557-
return None
558-
542+
# If we have the request hydrated in cache, return it
559543
return cached_entry.hydrated
560544

561545
# If not in cache or not hydrated, fetch the request
562546
try:
563-
# Try to acquire or prolong the lock
564-
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
565-
await self._prolong_request_lock(unique_key, lock_secs=lock_secs)
566-
567547
# Fetch the request data
568548
request = await self.get_request(unique_key)
569549

570-
# If request is not found, release lock and return None
550+
# If request is not found and return None
571551
if not request:
572-
await self._delete_request_lock(unique_key)
573552
return None
574553

575554
# Update cache with hydrated request
@@ -584,7 +563,7 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None:
584563
hydrated_request=request,
585564
)
586565
except Exception as exc:
587-
logger.debug(f'Error fetching or locking request {unique_key}: {exc!s}')
566+
logger.debug(f'Error fetching request {unique_key}: {exc!s}')
588567
return None
589568
else:
590569
return request
@@ -618,14 +597,11 @@ async def _update_request(
618597
async def _list_head(
619598
self,
620599
*,
621-
lock_time: timedelta | None = None,
622600
limit: int = 25,
623601
) -> RequestQueueHead:
624602
"""Retrieve requests from the beginning of the queue.
625603
626604
Args:
627-
lock_time: Duration for which to lock the retrieved requests.
628-
If None, requests will not be locked.
629605
limit: Maximum number of requests to retrieve.
630606
631607
Returns:
@@ -648,8 +624,8 @@ async def _list_head(
648624
had_multiple_clients=metadata.had_multiple_clients,
649625
queue_modified_at=metadata.modified_at,
650626
items=items,
627+
lock_time=None,
651628
queue_has_locked_requests=self._queue_has_locked_requests,
652-
lock_time=lock_time,
653629
)
654630
leftover_buffer = list[str]()
655631
if self._should_check_for_forefront_requests:
@@ -658,11 +634,7 @@ async def _list_head(
658634
self._should_check_for_forefront_requests = False
659635

660636
# Otherwise fetch from API
661-
lock_time = lock_time or self._DEFAULT_LOCK_TIME
662-
lock_secs = int(lock_time.total_seconds())
663-
664-
response = await self._api_client.list_and_lock_head(
665-
lock_secs=lock_secs,
637+
response = await self._api_client.list_head(
666638
limit=limit,
667639
)
668640

@@ -701,67 +673,6 @@ async def _list_head(
701673
self._queue_head.append(leftover_unique_key)
702674
return RequestQueueHead.model_validate(response)
703675

704-
async def _prolong_request_lock(
705-
self,
706-
unique_key: str,
707-
*,
708-
lock_secs: int,
709-
) -> ProlongRequestLockResponse:
710-
"""Prolong the lock on a specific request in the queue.
711-
712-
Args:
713-
unique_key: Unique key of the request whose lock is to be prolonged.
714-
lock_secs: The additional amount of time, in seconds, that the request will remain locked.
715-
716-
Returns:
717-
A response containing the time at which the lock will expire.
718-
"""
719-
response = await self._api_client.prolong_request_lock(
720-
request_id=unique_key_to_request_id(unique_key),
721-
# All requests reaching this code were the tip of the queue at the moment when they were fetched,
722-
# so if their lock expires, they should be put back to the forefront as their handling is long overdue.
723-
forefront=True,
724-
lock_secs=lock_secs,
725-
)
726-
727-
result = ProlongRequestLockResponse(
728-
lock_expires_at=datetime.fromisoformat(response['lockExpiresAt'].replace('Z', '+00:00'))
729-
)
730-
731-
# Update the cache with the new lock expiration
732-
for cached_request in self._requests_cache.values():
733-
if cached_request.unique_key == unique_key:
734-
cached_request.lock_expires_at = result.lock_expires_at
735-
break
736-
737-
return result
738-
739-
async def _delete_request_lock(
740-
self,
741-
unique_key: str,
742-
*,
743-
forefront: bool = False,
744-
) -> None:
745-
"""Delete the lock on a specific request in the queue.
746-
747-
Args:
748-
unique_key: Unique key of the request to delete the lock.
749-
forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
750-
"""
751-
try:
752-
await self._api_client.delete_request_lock(
753-
request_id=unique_key_to_request_id(unique_key),
754-
forefront=forefront,
755-
)
756-
757-
# Update the cache to remove the lock
758-
for cached_request in self._requests_cache.values():
759-
if cached_request.unique_key == unique_key:
760-
cached_request.lock_expires_at = None
761-
break
762-
except Exception as err:
763-
logger.debug(f'Failed to delete request lock for request {unique_key}', exc_info=err)
764-
765676
def _cache_request(
766677
self,
767678
cache_key: str,

0 commit comments

Comments
 (0)