5
5
import math
6
6
from dataclasses import dataclass
7
7
from datetime import timedelta
8
+ from multiprocessing import process
8
9
from typing import Any , TypedDict
9
10
10
11
from apify_shared .utils import filter_out_none_values_recursively , ignore_docs , parse_date_fields
@@ -586,10 +587,13 @@ async def _batch_add_requests_worker(
586
587
processed_requests = list [dict ]()
587
588
unprocessed_requests = list [dict ]()
588
589
589
- try :
590
- while True :
590
+ while True :
591
+ try :
591
592
batch = await queue .get ()
593
+ except asyncio .CancelledError :
594
+ break
592
595
596
+ try :
593
597
response = await self .http_client .call (
594
598
url = self ._url ('requests/batch' ),
595
599
method = 'POST' ,
@@ -599,29 +603,23 @@ async def _batch_add_requests_worker(
599
603
600
604
response_parsed = parse_date_fields (pluck_data (response .json ()))
601
605
602
- # If the request was successful, add it to the processed requests.
603
- if 200 <= response .status_code <= 299 :
604
- processed_requests .append (response_parsed )
605
-
606
606
# If the request was not successful and the number of retries is less than the maximum,
607
- # retry the request.
608
- elif batch .num_of_retries < max_unprocessed_requests_retries :
607
+ # put the batch back into the queue and retry the request later .
608
+ if ( not response . is_success ) and batch .num_of_retries < max_unprocessed_requests_retries :
609
609
batch .num_of_retries += 1
610
610
await asyncio .sleep (min_delay_between_unprocessed_requests_retries .total_seconds ())
611
611
await queue .put (batch )
612
612
613
- # Otherwise, add the request to the unprocessed requests.
613
+ # Otherwise, extract the processed and unprocessed requests from the response .
614
614
else :
615
- unprocessed_requests .append (response_parsed )
615
+ processed_requests .extend (response_parsed .get ('processedRequests' , []))
616
+ unprocessed_requests .extend (response_parsed .get ('unprocessedRequests' , []))
616
617
617
- except asyncio . CancelledError :
618
- logger .debug ( 'Worker task was cancelled. ' )
618
+ except Exception as exc :
619
+ logger .warning ( f'Error occurred while processing a batch of requests: { exc } ' )
619
620
620
- except Exception as exc :
621
- logger .warning ('Worker task failed with an exception.' , exc_info = exc )
622
-
623
- finally :
624
- queue .task_done ()
621
+ finally :
622
+ queue .task_done ()
625
623
626
624
return {
627
625
'processed_requests' : processed_requests ,
@@ -670,31 +668,35 @@ async def batch_add_requests(
670
668
671
669
# Start the worker tasks.
672
670
for i in range (max_parallel ):
673
- task = asyncio .create_task (
674
- self ._batch_add_requests_worker (
675
- queue ,
676
- request_params ,
677
- max_unprocessed_requests_retries ,
678
- min_delay_between_unprocessed_requests_retries ,
679
- ),
680
- name = f'batch_add_requests_worker_{ i } ' ,
671
+ coro = self ._batch_add_requests_worker (
672
+ queue ,
673
+ request_params ,
674
+ max_unprocessed_requests_retries ,
675
+ min_delay_between_unprocessed_requests_retries ,
681
676
)
677
+ task = asyncio .create_task (coro , name = f'batch_add_requests_worker_{ i } ' )
682
678
tasks .add (task )
683
679
684
680
# Wait for all batches to be processed.
685
681
await queue .join ()
686
682
687
- # Send cancel signals to all worker tasks.
683
+ # Send cancel signals to all worker tasks and wait for them to finish .
688
684
for task in tasks :
689
685
task .cancel ()
690
686
691
- # Wait for all worker tasks to finish.
692
687
results : list [BatchAddRequestsResult ] = await asyncio .gather (* tasks )
693
688
694
- # Combine the results from all worker tasks.
689
+ # Combine the results from all worker tasks and return them.
690
+ processed_requests = []
691
+ unprocessed_requests = []
692
+
693
+ for result in results :
694
+ processed_requests .extend (result ['processed_requests' ])
695
+ unprocessed_requests .extend (result ['unprocessed_requests' ])
696
+
695
697
return {
696
- 'processed_requests' : [ req for result in results for req in result [ ' processed_requests' ]] ,
697
- 'unprocessed_requests' : [ req for result in results for req in result [ ' unprocessed_requests' ]] ,
698
+ 'processed_requests' : processed_requests ,
699
+ 'unprocessed_requests' : unprocessed_requests ,
698
700
}
699
701
700
702
async def batch_delete_requests (self : RequestQueueClientAsync , requests : list [dict ]) -> dict :
0 commit comments