1
1
from __future__ import annotations
2
2
3
+ import asyncio
3
4
from collections import deque
4
5
from datetime import datetime , timedelta , timezone
5
6
from logging import getLogger
@@ -84,6 +85,9 @@ def __init__(
84
85
self ._assumed_handled_count = 0
85
86
"""The number of requests we assume have been handled (tracked manually for this instance)."""
86
87
88
+ self ._fetch_lock = asyncio .Lock ()
89
+ """Fetch lock to minimize race conditions when communicating with API."""
90
+
87
91
@override
88
92
async def get_metadata (self ) -> RequestQueueMetadata :
89
93
total_count = self ._initial_total_count + self ._assumed_total_count
@@ -268,7 +272,6 @@ async def add_batch_of_requests(
268
272
self ._cache_request (
269
273
unique_key_to_request_id (request .unique_key ),
270
274
processed_request ,
271
- forefront = False ,
272
275
)
273
276
new_requests .append (request )
274
277
@@ -334,15 +337,17 @@ async def fetch_next_request(self) -> Request | None:
334
337
Returns:
335
338
The request or `None` if there are no more pending requests.
336
339
"""
337
- # Ensure the queue head has requests if available
338
- await self ._ensure_head_is_non_empty ()
340
+ # Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions.
341
+ async with self ._fetch_lock :
342
+ await self ._ensure_head_is_non_empty ()
339
343
340
- # If queue head is empty after ensuring, there are no requests
341
- if not self ._queue_head :
342
- return None
344
+ # If queue head is empty after ensuring, there are no requests
345
+ if not self ._queue_head :
346
+ return None
347
+
348
+ # Get the next request ID from the queue head
349
+ next_request_id = self ._queue_head .popleft ()
343
350
344
- # Get the next request ID from the queue head
345
- next_request_id = self ._queue_head .popleft ()
346
351
request = await self ._get_or_hydrate_request (next_request_id )
347
352
348
353
# Handle potential inconsistency where request might not be in the main table yet
@@ -388,6 +393,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
388
393
if request .handled_at is None :
389
394
request .handled_at = datetime .now (tz = timezone .utc )
390
395
396
+ if cached_request := self ._requests_cache [request .id ]:
397
+ cached_request .was_already_handled = request .was_already_handled
391
398
try :
392
399
# Update the request in the API
393
400
processed_request = await self ._update_request (request )
@@ -402,7 +409,6 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
402
409
self ._cache_request (
403
410
cache_key ,
404
411
processed_request ,
405
- forefront = False ,
406
412
hydrated_request = request ,
407
413
)
408
414
except Exception as exc :
@@ -434,40 +440,41 @@ async def reclaim_request(
434
440
if request .was_already_handled :
435
441
request .handled_at = None
436
442
437
- try :
438
- # Update the request in the API.
439
- processed_request = await self ._update_request (request , forefront = forefront )
440
- processed_request .unique_key = request .unique_key
443
+ # Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
444
+ async with self ._fetch_lock :
445
+ try :
446
+ # Update the request in the API.
447
+ processed_request = await self ._update_request (request , forefront = forefront )
448
+ processed_request .unique_key = request .unique_key
441
449
442
- # If the request was previously handled, decrement our handled count since
443
- # we're putting it back for processing.
444
- if request .was_already_handled and not processed_request .was_already_handled :
445
- self ._assumed_handled_count -= 1
450
+ # If the request was previously handled, decrement our handled count since
451
+ # we're putting it back for processing.
452
+ if request .was_already_handled and not processed_request .was_already_handled :
453
+ self ._assumed_handled_count -= 1
446
454
447
- # Update the cache
448
- cache_key = unique_key_to_request_id (request .unique_key )
449
- self ._cache_request (
450
- cache_key ,
451
- processed_request ,
452
- forefront = forefront ,
453
- hydrated_request = request ,
454
- )
455
+ # Update the cache
456
+ cache_key = unique_key_to_request_id (request .unique_key )
457
+ self ._cache_request (
458
+ cache_key ,
459
+ processed_request ,
460
+ hydrated_request = request ,
461
+ )
455
462
456
- # If we're adding to the forefront, we need to check for forefront requests
457
- # in the next list_head call
458
- if forefront :
459
- self ._should_check_for_forefront_requests = True
463
+ # If we're adding to the forefront, we need to check for forefront requests
464
+ # in the next list_head call
465
+ if forefront :
466
+ self ._should_check_for_forefront_requests = True
460
467
461
- # Try to release the lock on the request
462
- try :
463
- await self ._delete_request_lock (request .id , forefront = forefront )
464
- except Exception as err :
465
- logger .debug (f'Failed to delete request lock for request { request .id } ' , exc_info = err )
466
- except Exception as exc :
467
- logger .debug (f'Error reclaiming request { request .id } : { exc !s} ' )
468
- return None
469
- else :
470
- return processed_request
468
+ # Try to release the lock on the request
469
+ try :
470
+ await self ._delete_request_lock (request .id , forefront = forefront )
471
+ except Exception as err :
472
+ logger .debug (f'Failed to delete request lock for request { request .id } ' , exc_info = err )
473
+ except Exception as exc :
474
+ logger .debug (f'Error reclaiming request { request .id } : { exc !s} ' )
475
+ return None
476
+ else :
477
+ return processed_request
471
478
472
479
@override
473
480
async def is_empty (self ) -> bool :
@@ -476,9 +483,11 @@ async def is_empty(self) -> bool:
476
483
Returns:
477
484
True if the queue is empty, False otherwise.
478
485
"""
479
- head = await self ._list_head (limit = 1 , lock_time = None )
480
-
481
- return len (head .items ) == 0 and not self ._queue_has_locked_requests
486
+ # Check _list_head and self._queue_has_locked_requests with lock to make sure they are consistent.
487
+ # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
488
+ async with self ._fetch_lock :
489
+ head = await self ._list_head (limit = 1 , lock_time = None )
490
+ return len (head .items ) == 0 and not self ._queue_has_locked_requests
482
491
483
492
async def _ensure_head_is_non_empty (self ) -> None :
484
493
"""Ensure that the queue head has requests if they are available in the queue."""
@@ -507,9 +516,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
507
516
# Try to prolong the lock if it's expired
508
517
try :
509
518
lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
510
- response = await self ._prolong_request_lock (
511
- request_id , forefront = cached_entry .forefront , lock_secs = lock_secs
512
- )
519
+ response = await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
513
520
cached_entry .lock_expires_at = response .lock_expires_at
514
521
except Exception :
515
522
# If prolonging the lock fails, we lost the request
@@ -522,7 +529,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
522
529
try :
523
530
# Try to acquire or prolong the lock
524
531
lock_secs = int (self ._DEFAULT_LOCK_TIME .total_seconds ())
525
- await self ._prolong_request_lock (request_id , forefront = False , lock_secs = lock_secs )
532
+ await self ._prolong_request_lock (request_id , lock_secs = lock_secs )
526
533
527
534
# Fetch the request data
528
535
request = await self .get_request (request_id )
@@ -542,7 +549,6 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
542
549
was_already_present = True ,
543
550
was_already_handled = request .handled_at is not None ,
544
551
),
545
- forefront = False ,
546
552
hydrated_request = request ,
547
553
)
548
554
except Exception as exc :
@@ -594,7 +600,6 @@ async def _list_head(
594
600
# Return from cache if available and we're not checking for new forefront requests
595
601
if self ._queue_head and not self ._should_check_for_forefront_requests :
596
602
logger .debug (f'Using cached queue head with { len (self ._queue_head )} requests' )
597
-
598
603
# Create a list of requests from the cached queue head
599
604
items = []
600
605
for request_id in list (self ._queue_head )[:limit ]:
@@ -612,6 +617,11 @@ async def _list_head(
612
617
queue_has_locked_requests = self ._queue_has_locked_requests ,
613
618
lock_time = lock_time ,
614
619
)
620
+ leftover_buffer = list [str ]()
621
+ if self ._should_check_for_forefront_requests :
622
+ leftover_buffer = list (self ._queue_head )
623
+ self ._queue_head .clear ()
624
+ self ._should_check_for_forefront_requests = False
615
625
616
626
# Otherwise fetch from API
617
627
lock_time = lock_time or self ._DEFAULT_LOCK_TIME
@@ -625,15 +635,6 @@ async def _list_head(
625
635
# Update the queue head cache
626
636
self ._queue_has_locked_requests = response .get ('queueHasLockedRequests' , False )
627
637
628
- # Clear current queue head if we're checking for forefront requests
629
- if self ._should_check_for_forefront_requests :
630
- self ._queue_head .clear ()
631
- self ._should_check_for_forefront_requests = False
632
-
633
- # Process and cache the requests
634
- head_id_buffer = list [str ]()
635
- forefront_head_id_buffer = list [str ]()
636
-
637
638
for request_data in response .get ('items' , []):
638
639
request = Request .model_validate (request_data )
639
640
@@ -648,59 +649,44 @@ async def _list_head(
648
649
)
649
650
continue
650
651
651
- # Check if this request was already cached and if it was added to forefront
652
- cache_key = unique_key_to_request_id (request .unique_key )
653
- cached_request = self ._requests_cache .get (cache_key )
654
- forefront = cached_request .forefront if cached_request else False
655
-
656
- # Add to appropriate buffer based on forefront flag
657
- if forefront :
658
- forefront_head_id_buffer .insert (0 , request .id )
659
- else :
660
- head_id_buffer .append (request .id )
661
-
662
652
# Cache the request
663
653
self ._cache_request (
664
- cache_key ,
654
+ unique_key_to_request_id ( request . unique_key ) ,
665
655
ProcessedRequest (
666
656
id = request .id ,
667
657
unique_key = request .unique_key ,
668
658
was_already_present = True ,
669
659
was_already_handled = False ,
670
660
),
671
- forefront = forefront ,
672
661
hydrated_request = request ,
673
662
)
663
+ self ._queue_head .append (request .id )
674
664
675
- # Update the queue head deque
676
- for request_id in head_id_buffer :
677
- self ._queue_head .append (request_id )
678
-
679
- for request_id in forefront_head_id_buffer :
680
- self ._queue_head .appendleft (request_id )
681
-
665
+ for leftover_request_id in leftover_buffer :
666
+ # After adding new requests to the forefront, any existing leftover locked request is kept in the end.
667
+ self ._queue_head .append (leftover_request_id )
682
668
return RequestQueueHead .model_validate (response )
683
669
684
670
async def _prolong_request_lock (
685
671
self ,
686
672
request_id : str ,
687
673
* ,
688
- forefront : bool = False ,
689
674
lock_secs : int ,
690
675
) -> ProlongRequestLockResponse :
691
676
"""Prolong the lock on a specific request in the queue.
692
677
693
678
Args:
694
679
request_id: The identifier of the request whose lock is to be prolonged.
695
- forefront: Whether to put the request in the beginning or the end of the queue after lock expires.
696
680
lock_secs: The additional amount of time, in seconds, that the request will remain locked.
697
681
698
682
Returns:
699
683
A response containing the time at which the lock will expire.
700
684
"""
701
685
response = await self ._api_client .prolong_request_lock (
702
686
request_id = request_id ,
703
- forefront = forefront ,
687
+ # All requests reaching this code were the tip of the queue at the moment when they were fetched,
688
+ # so if their lock expires, they should be put back to the forefront as their handling is long overdue.
689
+ forefront = True ,
704
690
lock_secs = lock_secs ,
705
691
)
706
692
@@ -747,7 +733,6 @@ def _cache_request(
747
733
cache_key : str ,
748
734
processed_request : ProcessedRequest ,
749
735
* ,
750
- forefront : bool ,
751
736
hydrated_request : Request | None = None ,
752
737
) -> None :
753
738
"""Cache a request for future use.
@@ -763,5 +748,4 @@ def _cache_request(
763
748
was_already_handled = processed_request .was_already_handled ,
764
749
hydrated = hydrated_request ,
765
750
lock_expires_at = None ,
766
- forefront = forefront ,
767
751
)
0 commit comments