|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
| 3 | +import asyncio |
3 | 4 | from collections import deque
|
4 | 5 | from datetime import datetime, timedelta, timezone
|
5 | 6 | from logging import getLogger
|
@@ -84,6 +85,9 @@ def __init__(
|
84 | 85 | self._assumed_handled_count = 0
|
85 | 86 | """The number of requests we assume have been handled (tracked manually for this instance)."""
|
86 | 87 |
|
| 88 | + self._fetch_lock = asyncio.Lock() |
| 89 | + """Fetch lock to minimize race conditions when communicationg with API.""" |
| 90 | + |
87 | 91 | @override
|
88 | 92 | async def get_metadata(self) -> RequestQueueMetadata:
|
89 | 93 | total_count = self._initial_total_count + self._assumed_total_count
|
@@ -291,15 +295,16 @@ async def fetch_next_request(self) -> Request | None:
|
291 | 295 | The request or `None` if there are no more pending requests.
|
292 | 296 | """
|
293 | 297 | # Ensure the queue head has requests if available
|
294 |
| - await self._ensure_head_is_non_empty() |
| 298 | + async with self._fetch_lock: |
| 299 | + await self._ensure_head_is_non_empty() |
295 | 300 |
|
296 |
| - # If queue head is empty after ensuring, there are no requests |
297 |
| - if not self._queue_head: |
298 |
| - return None |
| 301 | + # If queue head is empty after ensuring, there are no requests |
| 302 | + if not self._queue_head: |
| 303 | + return None |
299 | 304 |
|
300 |
| - # Get the next request ID from the queue head |
301 |
| - next_request_id = self._queue_head.popleft() |
302 |
| - request = await self._get_or_hydrate_request(next_request_id) |
| 305 | + # Get the next request ID from the queue head |
| 306 | + next_request_id = self._queue_head.popleft() |
| 307 | + request = await self._get_or_hydrate_request(next_request_id) |
303 | 308 |
|
304 | 309 | # Handle potential inconsistency where request might not be in the main table yet
|
305 | 310 | if request is None:
|
@@ -344,6 +349,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
|
344 | 349 | if request.handled_at is None:
|
345 | 350 | request.handled_at = datetime.now(tz=timezone.utc)
|
346 | 351 |
|
| 352 | + if cached_request := self._requests_cache[request.id]: |
| 353 | + cached_request.was_already_handled = request.was_already_handled |
347 | 354 | try:
|
348 | 355 | # Update the request in the API
|
349 | 356 | processed_request = await self._update_request(request)
|
|
0 commit comments