File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed
src/apify/storage_clients/_apify Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change @@ -86,7 +86,7 @@ def __init__(
86
86
"""The number of requests we assume have been handled (tracked manually for this instance)."""
87
87
88
88
self ._fetch_lock = asyncio .Lock ()
89
- """Fetch lock to minimize race conditions when communicationg with API."""
89
+ """Fetch lock to minimize race conditions when communicating with API."""
90
90
91
91
@override
92
92
async def get_metadata (self ) -> RequestQueueMetadata :
@@ -397,6 +397,7 @@ async def reclaim_request(
397
397
if request .was_already_handled :
398
398
request .handled_at = None
399
399
400
+ # Reclaim with lock to prevent race conditions that could lead to double processing of same the request.
400
401
async with self ._fetch_lock :
401
402
try :
402
403
# Update the request in the API.
@@ -439,6 +440,8 @@ async def is_empty(self) -> bool:
439
440
Returns:
440
441
True if the queue is empty, False otherwise.
441
442
"""
443
+ # Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent.
444
+ # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
442
445
async with self ._fetch_lock :
443
446
head = await self ._list_head (limit = 1 , lock_time = None )
444
447
return len (head .items ) == 0 and not self ._queue_has_locked_requests
You can’t perform that action at this time.
0 commit comments