Skip to content

Commit 666260d

Browse files
committed
implement sync version as well
1 parent a66c249 commit 666260d

File tree

1 file changed

+67
-13
lines changed

1 file changed

+67
-13
lines changed

src/apify_client/clients/resource_clients/request_queue.py

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import math
66
from dataclasses import dataclass
77
from datetime import timedelta
8+
from queue import Queue
9+
from time import sleep
810
from typing import TYPE_CHECKING, Any, TypedDict
911

1012
from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields
@@ -280,28 +282,78 @@ def delete_request_lock(self: RequestQueueClient, request_id: str, *, forefront:
280282
)
281283

282284
def batch_add_requests(
283-
self: RequestQueueClient,
285+
self,
284286
requests: list[dict],
285287
*,
286-
forefront: bool | None = None,
287-
) -> dict:
288-
"""Add requests to the queue.
288+
forefront: bool = False,
289+
max_unprocessed_requests_retries: int = 3,
290+
min_delay_between_unprocessed_requests_retries: timedelta = timedelta(milliseconds=500),
291+
) -> BatchAddRequestsResult:
292+
"""Add requests to the request queue in batches.
293+
294+
Requests are split into batches based on size and processed sequentially.
289295
290296
https://docs.apify.com/api/v2#/reference/request-queues/batch-request-operations/add-requests
291297
292298
Args:
293-
requests (list[dict]): list of the requests to add
294-
forefront (bool, optional): Whether to add the requests to the head or the end of the queue
299+
requests: List of requests to be added to the queue.
300+
forefront: Whether to add requests to the front of the queue.
301+
max_unprocessed_requests_retries: Number of retry attempts for unprocessed requests.
302+
min_delay_between_unprocessed_requests_retries: Minimum delay between retry attempts for unprocessed requests.
303+
304+
Returns:
305+
Result containing lists of processed and unprocessed requests.
295306
"""
296307
request_params = self._params(clientKey=self.client_key, forefront=forefront)
297308

298-
response = self.http_client.call(
299-
url=self._url('requests/batch'),
300-
method='POST',
301-
params=request_params,
302-
json=requests,
309+
# Compute the payload size limit to ensure it doesn't exceed the maximum allowed size.
310+
payload_size_limit_bytes = _MAX_PAYLOAD_SIZE_BYTES - math.ceil(_MAX_PAYLOAD_SIZE_BYTES * _SAFETY_BUFFER_PERCENT)
311+
312+
# Split the requests into batches, constrained by the max payload size and max requests per batch.
313+
batches = constrained_batches(
314+
requests,
315+
max_size=payload_size_limit_bytes,
316+
max_count=_RQ_MAX_REQUESTS_PER_BATCH,
303317
)
304-
return parse_date_fields(pluck_data(response.json()))
318+
319+
# Put the batches into the queue for processing.
320+
queue = Queue[AddRequestsBatch]()
321+
322+
for b in batches:
323+
queue.put(AddRequestsBatch(b))
324+
325+
processed_requests = list[dict]()
326+
unprocessed_requests = list[dict]()
327+
328+
# Process all batches in the queue sequentially.
329+
while not queue.empty():
330+
batch = queue.get()
331+
332+
# Send the batch to the API.
333+
response = self.http_client.call(
334+
url=self._url('requests/batch'),
335+
method='POST',
336+
params=request_params,
337+
json=list(batch.requests),
338+
)
339+
340+
response_parsed = parse_date_fields(pluck_data(response.json()))
341+
342+
# Retry if the request failed and the retry limit has not been reached.
343+
if not response.is_success and batch.num_of_retries < max_unprocessed_requests_retries:
344+
batch.num_of_retries += 1
345+
sleep(min_delay_between_unprocessed_requests_retries.total_seconds())
346+
queue.put(batch)
347+
348+
# Otherwise, add the processed/unprocessed requests to their respective lists.
349+
else:
350+
processed_requests.extend(response_parsed.get('processedRequests', []))
351+
unprocessed_requests.extend(response_parsed.get('unprocessedRequests', []))
352+
353+
return {
354+
'processed_requests': processed_requests,
355+
'unprocessed_requests': unprocessed_requests,
356+
}
305357

306358
def batch_delete_requests(self: RequestQueueClient, requests: list[dict]) -> dict:
307359
"""Delete given requests from the queue.
@@ -638,7 +690,7 @@ async def _batch_add_requests_worker(
638690
}
639691

640692
async def batch_add_requests(
641-
self: RequestQueueClientAsync,
693+
self,
642694
requests: list[dict],
643695
*,
644696
forefront: bool = False,
@@ -650,6 +702,8 @@ async def batch_add_requests(
650702
651703
Requests are split into batches based on size and processed in parallel.
652704
705+
https://docs.apify.com/api/v2#/reference/request-queues/batch-request-operations/add-requests
706+
653707
Args:
654708
requests: List of requests to be added to the queue.
655709
forefront: Whether to add requests to the front of the queue.

0 commit comments

Comments
 (0)