33import asyncio
44import logging
55import math
6- from dataclasses import dataclass
7- from datetime import timedelta
6+ from collections .abc import Iterable
87from queue import Queue
9- from time import sleep
108from typing import TYPE_CHECKING , Any , TypedDict
119
1210from apify_shared .utils import filter_out_none_values_recursively , ignore_docs , parse_date_fields
1715from apify_client .clients .base import ResourceClient , ResourceClientAsync
1816
1917if TYPE_CHECKING :
20- from collections . abc import Iterable
18+ from datetime import timedelta
2119
2220 from apify_shared .consts import StorageGeneralAccess
2321
@@ -43,19 +41,6 @@ class BatchAddRequestsResult(TypedDict):
4341 unprocessedRequests : list [dict ]
4442
4543
46- @dataclass
47- class AddRequestsBatch :
48- """Batch of requests to add to the request queue.
49-
50- Args:
51- requests: List of requests to be added to the request queue.
52- num_of_retries: Number of times this batch has been retried.
53- """
54-
55- requests : Iterable [dict ]
56- num_of_retries : int = 0
57-
58-
5944class RequestQueueClient (ResourceClient ):
6045 """Sub-client for manipulating a single request queue."""
6146
@@ -301,8 +286,8 @@ def batch_add_requests(
301286 * ,
302287 forefront : bool = False ,
303288 max_parallel : int = 1 ,
304- max_unprocessed_requests_retries : int = 3 ,
305- min_delay_between_unprocessed_requests_retries : timedelta = timedelta ( milliseconds = 500 ) ,
289+ max_unprocessed_requests_retries : int | None = None ,
290+ min_delay_between_unprocessed_requests_retries : timedelta | None = None ,
306291 ) -> BatchAddRequestsResult :
307292 """Add requests to the request queue in batches.
308293
@@ -316,13 +301,17 @@ def batch_add_requests(
316301 max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable
317302 to the async client. For the sync client, this value must be set to 1, as parallel execution
318303 is not supported.
319- max_unprocessed_requests_retries: Number of retry attempts for unprocessed requests.
320- min_delay_between_unprocessed_requests_retries: Minimum delay between retry attempts for unprocessed
321- requests.
304+ max_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
305+ min_delay_between_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
322306
323307 Returns:
324308 Result containing lists of processed and unprocessed requests.
325309 """
310+ if max_unprocessed_requests_retries :
311+ logger .warning ('`max_unprocessed_requests_retries` is deprecated and not used anymore.' )
312+ if min_delay_between_unprocessed_requests_retries :
313+ logger .warning ('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.' )
314+
326315 if max_parallel != 1 :
327316 raise NotImplementedError ('max_parallel is only supported in async client' )
328317
@@ -339,38 +328,30 @@ def batch_add_requests(
339328 )
340329
341330 # Put the batches into the queue for processing.
342- queue = Queue [AddRequestsBatch ]()
331+ queue = Queue [Iterable [ dict ] ]()
343332
344- for b in batches :
345- queue .put (AddRequestsBatch ( b ) )
333+ for batch in batches :
334+ queue .put (batch )
346335
347336 processed_requests = list [dict ]()
348337 unprocessed_requests = list [dict ]()
349338
350339 # Process all batches in the queue sequentially.
351340 while not queue .empty ():
352- batch = queue .get ()
341+ request_batch = queue .get ()
353342
354343 # Send the batch to the API.
355344 response = self .http_client .call (
356345 url = self ._url ('requests/batch' ),
357346 method = 'POST' ,
358347 params = request_params ,
359- json = list (batch . requests ),
348+ json = list (request_batch ),
360349 timeout_secs = _MEDIUM_TIMEOUT ,
361350 )
362351
363- # Retry if the request failed and the retry limit has not been reached.
364- if not response .is_success and batch .num_of_retries < max_unprocessed_requests_retries :
365- batch .num_of_retries += 1
366- sleep (min_delay_between_unprocessed_requests_retries .total_seconds ())
367- queue .put (batch )
368-
369- # Otherwise, add the processed/unprocessed requests to their respective lists.
370- else :
371- response_parsed = parse_date_fields (pluck_data (response .json ()))
372- processed_requests .extend (response_parsed .get ('processedRequests' , []))
373- unprocessed_requests .extend (response_parsed .get ('unprocessedRequests' , []))
352+ response_parsed = parse_date_fields (pluck_data (response .json ()))
353+ processed_requests .extend (response_parsed .get ('processedRequests' , []))
354+ unprocessed_requests .extend (response_parsed .get ('unprocessedRequests' , []))
374355
375356 return {
376357 'processedRequests' : processed_requests ,
@@ -667,14 +648,12 @@ async def delete_request_lock(
667648
668649 async def _batch_add_requests_worker (
669650 self ,
670- queue : asyncio .Queue [AddRequestsBatch ],
651+ queue : asyncio .Queue [Iterable [ dict ] ],
671652 request_params : dict ,
672- max_unprocessed_requests_retries : int ,
673- min_delay_between_unprocessed_requests_retries : timedelta ,
674653 ) -> BatchAddRequestsResult :
675654 """Worker function to process a batch of requests.
676655
677- This worker will process batches from the queue, retrying requests that fail until the retry limit is reached .
656+ This worker will process batches from the queue.
678657
679658 Return result containing lists of processed and unprocessed requests by the worker.
680659 """
@@ -684,7 +663,7 @@ async def _batch_add_requests_worker(
684663 while True :
685664 # Get the next batch from the queue.
686665 try :
687- batch = await queue .get ()
666+ request_batch = await queue .get ()
688667 except asyncio .CancelledError :
689668 break
690669
@@ -694,25 +673,13 @@ async def _batch_add_requests_worker(
694673 url = self ._url ('requests/batch' ),
695674 method = 'POST' ,
696675 params = request_params ,
697- json = list (batch . requests ),
676+ json = list (request_batch ),
698677 timeout_secs = _MEDIUM_TIMEOUT ,
699678 )
700679
701680 response_parsed = parse_date_fields (pluck_data (response .json ()))
702-
703- # Retry if the request failed and the retry limit has not been reached.
704- if not response .is_success and batch .num_of_retries < max_unprocessed_requests_retries :
705- batch .num_of_retries += 1
706- await asyncio .sleep (min_delay_between_unprocessed_requests_retries .total_seconds ())
707- await queue .put (batch )
708-
709- # Otherwise, add the processed/unprocessed requests to their respective lists.
710- else :
711- processed_requests .extend (response_parsed .get ('processedRequests' , []))
712- unprocessed_requests .extend (response_parsed .get ('unprocessedRequests' , []))
713-
714- except Exception as exc :
715- logger .warning (f'Error occurred while processing a batch of requests: { exc } ' )
681+ processed_requests .extend (response_parsed .get ('processedRequests' , []))
682+ unprocessed_requests .extend (response_parsed .get ('unprocessedRequests' , []))
716683
717684 finally :
718685 # Mark the batch as done whether it succeeded or failed.
@@ -729,8 +696,8 @@ async def batch_add_requests(
729696 * ,
730697 forefront : bool = False ,
731698 max_parallel : int = 5 ,
732- max_unprocessed_requests_retries : int = 3 ,
733- min_delay_between_unprocessed_requests_retries : timedelta = timedelta ( milliseconds = 500 ) ,
699+ max_unprocessed_requests_retries : int | None = None ,
700+ min_delay_between_unprocessed_requests_retries : timedelta | None = None ,
734701 ) -> BatchAddRequestsResult :
735702 """Add requests to the request queue in batches.
736703
@@ -744,15 +711,19 @@ async def batch_add_requests(
744711 max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable
745712 to the async client. For the sync client, this value must be set to 1, as parallel execution
746713 is not supported.
747- max_unprocessed_requests_retries: Number of retry attempts for unprocessed requests.
748- min_delay_between_unprocessed_requests_retries: Minimum delay between retry attempts for unprocessed
749- requests.
714+ max_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
715+ min_delay_between_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
750716
751717 Returns:
752718 Result containing lists of processed and unprocessed requests.
753719 """
720+ if max_unprocessed_requests_retries :
721+ logger .warning ('`max_unprocessed_requests_retries` is deprecated and not used anymore.' )
722+ if min_delay_between_unprocessed_requests_retries :
723+ logger .warning ('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.' )
724+
754725 tasks = set [asyncio .Task ]()
755- queue : asyncio .Queue [AddRequestsBatch ] = asyncio .Queue ()
726+ queue : asyncio .Queue [Iterable [ dict ] ] = asyncio .Queue ()
756727 request_params = self ._params (clientKey = self .client_key , forefront = forefront )
757728
758729 # Compute the payload size limit to ensure it doesn't exceed the maximum allowed size.
@@ -766,15 +737,13 @@ async def batch_add_requests(
766737 )
767738
768739 for batch in batches :
769- await queue .put (AddRequestsBatch ( batch ) )
740+ await queue .put (batch )
770741
771742 # Start a required number of worker tasks to process the batches.
772743 for i in range (max_parallel ):
773744 coro = self ._batch_add_requests_worker (
774745 queue ,
775746 request_params ,
776- max_unprocessed_requests_retries ,
777- min_delay_between_unprocessed_requests_retries ,
778747 )
779748 task = asyncio .create_task (coro , name = f'batch_add_requests_worker_{ i } ' )
780749 tasks .add (task )
0 commit comments