@@ -36,11 +36,6 @@ class BatchAddRequestsResult(TypedDict):
3636 unprocessedRequests : list [dict ]
3737
3838
39- def _get_unprocessed_request_from_request (request : dict [str , str ]) -> dict [str , str ]:
40- relevant_keys = {'url' , 'uniqueKey' , 'method' }
41- return {key : value for key , value in request .items () if key in relevant_keys }
42-
43-
4439class RequestQueueClient (ResourceClient ):
4540 """Sub-client for manipulating a single request queue."""
4641
@@ -323,14 +318,11 @@ def batch_add_requests(
323318 queue .put (batch )
324319
325320 processed_requests = list [dict ]()
326- unprocessed_requests = dict [ str , dict ]()
321+ unprocessed_requests = list [ dict ]()
327322
328323 # Process all batches in the queue sequentially.
329324 while not queue .empty ():
330325 request_batch = queue .get ()
331- # All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response.
332- for request in request_batch :
333- unprocessed_requests [request ['uniqueKey' ]] = _get_unprocessed_request_from_request (request )
334326
335327 # Send the batch to the API.
336328 response = self .http_client .call (
@@ -343,13 +335,11 @@ def batch_add_requests(
343335
344336 response_parsed = parse_date_fields (pluck_data (response .json ()))
345337 processed_requests .extend (response_parsed .get ('processedRequests' , []))
346-
347- for processed_request in response_parsed .get ('processedRequests' , []):
348- unprocessed_requests .pop (processed_request ['uniqueKey' ], None )
338+ unprocessed_requests .extend (response_parsed .get ('unprocessedRequests' , []))
349339
350340 return {
351341 'processedRequests' : processed_requests ,
352- 'unprocessedRequests' : list ( unprocessed_requests . values ()) ,
342+ 'unprocessedRequests' : unprocessed_requests ,
353343 }
354344
355345 def batch_delete_requests (self , requests : list [dict ]) -> dict :
@@ -650,39 +640,36 @@ async def _batch_add_requests_worker(
650640 Return result containing lists of processed and unprocessed requests by the worker.
651641 """
652642 processed_requests = list [dict ]()
653- unprocessed_requests = dict [ str , dict ]()
643+ unprocessed_requests = list [ dict ]()
654644
655645 while True :
656646 # Get the next batch from the queue.
657647 try :
658648 request_batch = await queue .get ()
659- # All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response.
660- for request in request_batch :
661- unprocessed_requests [request ['uniqueKey' ]] = _get_unprocessed_request_from_request (request )
662-
663649 except asyncio .CancelledError :
664650 break
665651
666- # Send the batch to the API.
667- response = await self .http_client .call (
668- url = self ._url ('requests/batch' ),
669- method = 'POST' ,
670- params = request_params ,
671- json = list (request_batch ),
672- timeout_secs = _MEDIUM_TIMEOUT ,
673- )
674-
675- response_parsed = parse_date_fields (pluck_data (response .json ()))
676- processed_requests .extend (response_parsed .get ('processedRequests' , []))
677-
678- for processed_request in response_parsed .get ('processedRequests' , []):
679- unprocessed_requests .pop (processed_request ['uniqueKey' ], None )
680-
681- queue .task_done ()
652+ try :
653+ # Send the batch to the API.
654+ response = await self .http_client .call (
655+ url = self ._url ('requests/batch' ),
656+ method = 'POST' ,
657+ params = request_params ,
658+ json = list (request_batch ),
659+ timeout_secs = _MEDIUM_TIMEOUT ,
660+ )
661+
662+ response_parsed = parse_date_fields (pluck_data (response .json ()))
663+ processed_requests .extend (response_parsed .get ('processedRequests' , []))
664+ unprocessed_requests .extend (response_parsed .get ('unprocessedRequests' , []))
665+
666+ finally :
667+ # Mark the batch as done whether it succeeded or failed.
668+ queue .task_done ()
682669
683670 return {
684671 'processedRequests' : processed_requests ,
685- 'unprocessedRequests' : list ( unprocessed_requests . values ()) ,
672+ 'unprocessedRequests' : unprocessed_requests ,
686673 }
687674
688675 async def batch_add_requests (
0 commit comments