Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/apify/storage_clients/_apify/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,3 @@ class CachedRequest(BaseModel):

lock_expires_at: datetime | None = None
"""The expiration time of the lock on the request."""

forefront: bool = False
"""Whether the request was added to the forefront of the queue."""
55 changes: 16 additions & 39 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
self._cache_request(
cache_key,
processed_request,
forefront=False,
hydrated_request=request,
)
except Exception as exc:
Expand Down Expand Up @@ -405,7 +404,6 @@ async def reclaim_request(
self._cache_request(
cache_key,
processed_request,
forefront=forefront,
hydrated_request=request,
)

Expand Down Expand Up @@ -463,9 +461,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
# Try to prolong the lock if it's expired
try:
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
response = await self._prolong_request_lock(
request_id, forefront=cached_entry.forefront, lock_secs=lock_secs
)
response = await self._prolong_request_lock(request_id, lock_secs=lock_secs)
cached_entry.lock_expires_at = response.lock_expires_at
except Exception:
# If prolonging the lock fails, we lost the request
Expand All @@ -478,7 +474,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
try:
# Try to acquire or prolong the lock
lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds())
await self._prolong_request_lock(request_id, forefront=False, lock_secs=lock_secs)
await self._prolong_request_lock(request_id, lock_secs=lock_secs)

# Fetch the request data
request = await self.get_request(request_id)
Expand All @@ -498,7 +494,6 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
was_already_present=True,
was_already_handled=request.handled_at is not None,
),
forefront=False,
hydrated_request=request,
)
except Exception as exc:
Expand Down Expand Up @@ -569,6 +564,12 @@ async def _list_head(
lock_time=lock_time,
)

leftover_buffer = list[str]()
if self._should_check_for_forefront_requests:
leftover_buffer = list(self._queue_head)
self._queue_head.clear()
self._should_check_for_forefront_requests = False

# Otherwise fetch from API
lock_time = lock_time or self._DEFAULT_LOCK_TIME
lock_secs = int(lock_time.total_seconds())
Expand All @@ -581,15 +582,6 @@ async def _list_head(
# Update the queue head cache
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)

# Clear current queue head if we're checking for forefront requests
if self._should_check_for_forefront_requests:
self._queue_head.clear()
self._should_check_for_forefront_requests = False

# Process and cache the requests
head_id_buffer = list[str]()
forefront_head_id_buffer = list[str]()

for request_data in response.get('items', []):
request = Request.model_validate(request_data)

Expand All @@ -604,59 +596,46 @@ async def _list_head(
)
continue

# Check if this request was already cached and if it was added to forefront
cache_key = unique_key_to_request_id(request.unique_key)
cached_request = self._requests_cache.get(cache_key)
forefront = cached_request.forefront if cached_request else False

# Add to appropriate buffer based on forefront flag
if forefront:
forefront_head_id_buffer.insert(0, request.id)
else:
head_id_buffer.append(request.id)

# Cache the request
self._cache_request(
cache_key,
unique_key_to_request_id(request.unique_key),
ProcessedRequest(
id=request.id,
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=False,
),
forefront=forefront,
hydrated_request=request,
)

# Update the queue head deque
for request_id in head_id_buffer:
self._queue_head.append(request_id)
self._queue_head.append(request.id)

for request_id in forefront_head_id_buffer:
self._queue_head.appendleft(request_id)
for leftover_request_id in leftover_buffer:
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
self._queue_head.append(leftover_request_id)

return RequestQueueHead.model_validate(response)

async def _prolong_request_lock(
self,
request_id: str,
*,
forefront: bool = False,
lock_secs: int,
) -> ProlongRequestLockResponse:
"""Prolong the lock on a specific request in the queue.

Args:
request_id: The identifier of the request whose lock is to be prolonged.
forefront: Whether to put the request in the beginning or the end of the queue after lock expires.
lock_secs: The additional amount of time, in seconds, that the request will remain locked.

Returns:
A response containing the time at which the lock will expire.
"""
response = await self._api_client.prolong_request_lock(
request_id=request_id,
forefront=forefront,
# All requests reaching this code were the tip of the queue at the moment when they were fetched,
# so if their lock expires, they should be put back to the forefront as their handling is long overdue.
forefront=True,
lock_secs=lock_secs,
)

Expand Down Expand Up @@ -703,7 +682,6 @@ def _cache_request(
cache_key: str,
processed_request: ProcessedRequest,
*,
forefront: bool,
hydrated_request: Request | None = None,
) -> None:
"""Cache a request for future use.
Expand All @@ -719,5 +697,4 @@ def _cache_request(
was_already_handled=processed_request.was_already_handled,
hydrated=hydrated_request,
lock_expires_at=None,
forefront=forefront,
)