Skip to content

Commit 6131fff

Browse files
committed
Locks not needed with in_progress
1 parent 08df986 commit 6131fff

File tree

1 file changed

+53
-67
lines changed

1 file changed

+53
-67
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 53 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ async def add_batch_of_requests(
291291
# (added by another producer or before migration).
292292

293293

294-
new_requests: list[ProcessedRequest] = []
294+
new_requests: list[Request] = []
295295
already_present_requests: list[ProcessedRequest] = []
296296

297297
for request in requests:
@@ -316,15 +316,7 @@ async def add_batch_of_requests(
316316
)
317317
)
318318
else:
319-
new_requests.append(
320-
ProcessedRequest.model_validate(
321-
{
322-
'uniqueKey': request.unique_key,
323-
'wasAlreadyPresent': False,
324-
'wasAlreadyHandled': request.was_already_handled,
325-
}
326-
)
327-
)
319+
new_requests.append(request)
328320

329321

330322
# Update local caches
@@ -400,19 +392,18 @@ async def fetch_next_request(self) -> Request | None:
400392
Returns:
401393
The request or `None` if there are no more pending requests.
402394
"""
403-
async with self._fetch_lock:
404-
await self._ensure_head_is_non_empty()
405-
406-
while self._head_requests:
407-
request_unique_key = self._head_requests.pop()
408-
if (
409-
request_unique_key not in self._requests_in_progress and
410-
request_unique_key not in self._requests_already_handled
411-
):
412-
self._requests_in_progress.add(request_unique_key)
413-
return await self.get_request(request_unique_key)
414-
# No request locally and the ones returned from the platform are already in progress.
415-
return None
395+
await self._ensure_head_is_non_empty()
396+
397+
while self._head_requests:
398+
request_unique_key = self._head_requests.pop()
399+
if (
400+
request_unique_key not in self._requests_in_progress and
401+
request_unique_key not in self._requests_already_handled
402+
):
403+
self._requests_in_progress.add(request_unique_key)
404+
return await self.get_request(request_unique_key)
405+
# No request locally and the ones returned from the platform are already in progress.
406+
return None
416407

417408
async def _ensure_head_is_non_empty(self) -> None:
418409
"""Ensure that the queue head has requests if they are available in the queue."""
@@ -427,7 +418,6 @@ async def _list_head(self) -> None:
427418
response = await self._api_client.list_head(limit=requested_head_items)
428419

429420
# Update metadata
430-
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
431421
# Check if there is another client working with the RequestQueue
432422
self._metadata.had_multiple_clients = response.get('hadMultipleClients', False)
433423
# Should warn once? This might be outside expected context if the other consumers consumes at the same time
@@ -472,23 +462,22 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
472462
if cached_request := self._requests_cache[request.unique_key]:
473463
cached_request.handled_at = request.handled_at
474464

475-
async with self._fetch_lock:
476-
try:
477-
# Update the request in the API
478-
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
479-
# adding to the queue.)
480-
processed_request = await self._update_request(request)
481-
# Remove request from cache. It will most likely not be needed.
482-
self._requests_cache.pop(request.unique_key)
483-
self._requests_in_progress.discard(request.unique_key)
484-
# Remember that we handled this request, to optimize local deduplication.
485-
self._requests_already_handled.add(request.unique_key)
486-
487-
except Exception as exc:
488-
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
489-
return None
490-
else:
491-
return processed_request
465+
try:
466+
# Update the request in the API
467+
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
468+
# adding to the queue.)
469+
processed_request = await self._update_request(request)
470+
# Remember that we handled this request, to optimize local deduplication.
471+
self._requests_already_handled.add(request.unique_key)
472+
# Remove request from cache. It will most likely not be needed.
473+
self._requests_cache.pop(request.unique_key)
474+
self._requests_in_progress.discard(request.unique_key)
475+
476+
except Exception as exc:
477+
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
478+
return None
479+
else:
480+
return processed_request
492481

493482
@override
494483
async def reclaim_request(
@@ -513,33 +502,31 @@ async def reclaim_request(
513502
if request.was_already_handled:
514503
request.handled_at = None
515504

516-
# Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
517-
async with self._fetch_lock:
518-
try:
519-
# Make sure request is in the local cache. We might need it.
520-
self._requests_cache[request.unique_key] = request
505+
try:
506+
# Make sure request is in the local cache. We might need it.
507+
self._requests_cache[request.unique_key] = request
521508

522-
# No longer in progress
523-
self._requests_in_progress.discard(request.unique_key)
524-
# No longer handled
525-
self._requests_already_handled.discard(request.unique_key)
509+
# No longer in progress
510+
self._requests_in_progress.discard(request.unique_key)
511+
# No longer handled
512+
self._requests_already_handled.discard(request.unique_key)
526513

527-
if forefront:
528-
# Append to top of the local head estimation
529-
self._head_requests.append(request.unique_key)
514+
if forefront:
515+
# Append to top of the local head estimation
516+
self._head_requests.append(request.unique_key)
530517

531-
processed_request = await self._update_request(request, forefront=forefront)
532-
processed_request.unique_key = request.unique_key
533-
# If the request was previously handled, decrement our handled count since
534-
# we're putting it back for processing.
535-
if request.was_already_handled and not processed_request.was_already_handled:
536-
self._metadata.handled_request_count -= 1
518+
processed_request = await self._update_request(request, forefront=forefront)
519+
processed_request.unique_key = request.unique_key
520+
# If the request was previously handled, decrement our handled count since
521+
# we're putting it back for processing.
522+
if request.was_already_handled and not processed_request.was_already_handled:
523+
self._metadata.handled_request_count -= 1
537524

538-
except Exception as exc:
539-
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
540-
return None
541-
else:
542-
return processed_request
525+
except Exception as exc:
526+
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
527+
return None
528+
else:
529+
return processed_request
543530

544531
@override
545532
async def is_empty(self) -> bool:
@@ -549,9 +536,8 @@ async def is_empty(self) -> bool:
549536
True if the queue is empty, False otherwise.
550537
"""
551538
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
552-
async with self._fetch_lock:
553-
await self._ensure_head_is_non_empty()
554-
return not self._head_requests and not self._queue_has_locked_requests and not self._requests_in_progress
539+
await self._ensure_head_is_non_empty()
540+
return not self._head_requests and not self._requests_in_progress
555541

556542
async def _update_request(
557543
self,

0 commit comments

Comments
 (0)