26
26
27
27
logger = getLogger (__name__ )
28
28
29
- COUNTER = iter (range (10000 ))
30
29
31
30
class ApifyRequestQueueClient (RequestQueueClient ):
32
31
"""An Apify platform implementation of the request queue client."""
@@ -295,25 +294,18 @@ async def fetch_next_request(self) -> Request | None:
295
294
Returns:
296
295
The request or `None` if there are no more pending requests.
297
296
"""
298
- call_time = next (COUNTER )
299
297
# Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions.
300
- logger .debug (f'Before _fetch_lock, { call_time } ' )
301
298
async with self ._fetch_lock :
302
- logger .debug (f'Fetching, { call_time } ' )
303
299
await self ._ensure_head_is_non_empty ()
304
300
305
301
# If queue head is empty after ensuring, there are no requests
306
302
if not self ._queue_head :
307
- logger .debug (f'Empty, { call_time } ' )
308
303
return None
309
304
310
305
# Get the next request ID from the queue head
311
306
next_request_id = self ._queue_head .popleft ()
312
- logger .debug (f'New request, { call_time } ' )
313
307
314
- logger .debug (f'Before hydrate, { call_time } ' )
315
308
request = await self ._get_or_hydrate_request (next_request_id )
316
- logger .debug (f'After hydrate, { call_time } ' )
317
309
318
310
# Handle potential inconsistency where request might not be in the main table yet
319
311
if request is None :
@@ -331,16 +323,6 @@ async def fetch_next_request(self) -> Request | None:
331
323
)
332
324
return None
333
325
334
- # Use get request to ensure we have the full request object.
335
- #request = await self.get_request(request.id) This seems redundant
336
- if request is None :
337
- logger .debug (
338
- 'Request fetched from the beginning of queue was not found in the RQ' ,
339
- extra = {'nextRequestId' : next_request_id },
340
- )
341
- return None
342
-
343
- logger .debug (f'{ request .retry_count = } , { call_time } ' )
344
326
return request
345
327
346
328
@override
@@ -403,16 +385,13 @@ async def reclaim_request(
403
385
"""
404
386
# Check if the request was marked as handled and clear it. When reclaiming,
405
387
# we want to put the request back for processing.
406
- call_time = next (COUNTER )
407
388
if request .was_already_handled :
408
389
request .handled_at = None
409
390
410
391
async with self ._fetch_lock :
411
392
try :
412
393
# Update the request in the API.
413
- logger .debug (f'Before _update_request reclaiming, { call_time } ' )
414
394
processed_request = await self ._update_request (request , forefront = forefront )
415
- logger .debug (f'After _update_request reclaiming, { call_time } ' )
416
395
processed_request .unique_key = request .unique_key
417
396
418
397
# If the request was previously handled, decrement our handled count since
@@ -435,9 +414,7 @@ async def reclaim_request(
435
414
436
415
# Try to release the lock on the request
437
416
try :
438
- logger .debug (f'Before _delete_request_lock reclaiming, { call_time } ' )
439
417
await self ._delete_request_lock (request .id , forefront = forefront )
440
- logger .debug (f'After _delete_request_lock reclaiming, { call_time } ' )
441
418
except Exception as err :
442
419
logger .debug (f'Failed to delete request lock for request { request .id } ' , exc_info = err )
443
420
except Exception as exc :
@@ -453,13 +430,8 @@ async def is_empty(self) -> bool:
453
430
Returns:
454
431
True if the queue is empty, False otherwise.
455
432
"""
456
- call_time = next (COUNTER )
457
- logger .debug (f'Before _list_head is_empty, { call_time } ' )
458
433
async with self ._fetch_lock :
459
- logger .debug (f'During _list_head is_empty, { call_time } ' )
460
434
head = await self ._list_head (limit = 1 , lock_time = None )
461
- logger .debug (f'After _list_head is_empty, { call_time } ' )
462
- logger .debug (f'Finish _list_head is_empty, { call_time } ' )
463
435
return len (head .items ) == 0 and not self ._queue_has_locked_requests
464
436
465
437
async def _ensure_head_is_non_empty (self ) -> None :
@@ -571,10 +543,7 @@ async def _list_head(
571
543
A collection of requests from the beginning of the queue.
572
544
"""
573
545
# Return from cache if available and we're not checking for new forefront requests
574
- call_time = next (COUNTER )
575
546
if self ._queue_head and not self ._should_check_for_forefront_requests :
576
- logger .debug (f'Using cached queue head with { len (self ._queue_head )} requests, { call_time } ' )
577
-
578
547
# Create a list of requests from the cached queue head
579
548
items = []
580
549
for request_id in list (self ._queue_head )[:limit ]:
@@ -592,7 +561,6 @@ async def _list_head(
592
561
queue_has_locked_requests = self ._queue_has_locked_requests ,
593
562
lock_time = lock_time ,
594
563
)
595
- logger .debug (f'Updating cached queue head with { len (self ._queue_head )} requests, { call_time } ' )
596
564
leftover_buffer = list [str ]()
597
565
if self ._should_check_for_forefront_requests :
598
566
leftover_buffer = list (self ._queue_head )
@@ -636,14 +604,11 @@ async def _list_head(
636
604
),
637
605
hydrated_request = request ,
638
606
)
639
- logger .debug (f'Adding to head, { call_time } ' )
640
607
self ._queue_head .append (request .id )
641
- logger .debug (f'Cached queue head with { len (self ._queue_head )} requests, { call_time } ' )
642
608
643
609
for leftover_request_id in leftover_buffer :
644
610
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
645
611
self ._queue_head .append (leftover_request_id )
646
- logger .debug (f'Cached queue head with { len (self ._queue_head )} requests, { call_time } ' )
647
612
return RequestQueueHead .model_validate (response )
648
613
649
614
async def _prolong_request_lock (
0 commit comments