Skip to content

Commit eb1c0ac

Browse files
committed
add retry logic
1 parent f49ac31 commit eb1c0ac

File tree

1 file changed

+37
-9
lines changed

1 file changed

+37
-9
lines changed

src/apify_client/clients/resource_clients/request_queue.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import logging
55
import math
6+
from dataclasses import dataclass
67
from datetime import timedelta
78
from typing import Any, TypedDict
89

@@ -20,12 +21,30 @@
2021

2122

2223
class BatchAddRequestsResult(TypedDict):
23-
"""Result of the batch add requests operation."""
24+
"""Result of the batch add requests operation.
25+
26+
Args:
27+
processed_requests: List of requests that were added.
28+
unprocessed_requests: List of requests that failed to be added.
29+
"""
2430

2531
processed_requests: list[dict]
2632
unprocessed_requests: list[dict]
2733

2834

35+
@dataclass
36+
class AddRequestsBatch:
37+
"""Batch of requests to add to the request queue.
38+
39+
Args:
40+
requests: List of requests to be added to the request queue.
41+
num_of_retries: Number of retries for the batch.
42+
"""
43+
44+
requests: list[dict]
45+
num_of_retries: int = 0
46+
47+
2948
class RequestQueueClient(ResourceClient):
3049
"""Sub-client for manipulating a single request queue."""
3150

@@ -559,15 +578,13 @@ async def delete_request_lock(
559578

560579
async def _batch_add_requests_worker(
561580
self,
562-
queue: asyncio.Queue,
581+
queue: asyncio.Queue[AddRequestsBatch],
563582
request_params: dict,
564583
max_unprocessed_requests_retries: int,
565584
min_delay_between_unprocessed_requests_retries: timedelta,
566585
) -> BatchAddRequestsResult:
567-
processed_requests = []
568-
unprocessed_requests = []
569-
570-
# TODO: add retry logic
586+
processed_requests = list[dict]()
587+
unprocessed_requests = list[dict]()
571588

572589
try:
573590
while True:
@@ -577,13 +594,23 @@ async def _batch_add_requests_worker(
577594
url=self._url('requests/batch'),
578595
method='POST',
579596
params=request_params,
580-
json=batch,
597+
json=batch.requests,
581598
)
582599

583600
response_parsed = parse_date_fields(pluck_data(response.json()))
584601

602+
# If the request was successful, add it to the processed requests.
585603
if 200 <= response.status_code <= 299:
586604
processed_requests.append(response_parsed)
605+
606+
# If the request was not successful and the number of retries is less than the maximum,
607+
# retry the request.
608+
elif batch.num_of_retries < max_unprocessed_requests_retries:
609+
batch.num_of_retries += 1
610+
await asyncio.sleep(min_delay_between_unprocessed_requests_retries.total_seconds())
611+
await queue.put(batch)
612+
613+
# Otherwise, add the request to the unprocessed requests.
587614
else:
588615
unprocessed_requests.append(response_parsed)
589616

@@ -625,10 +652,11 @@ async def batch_add_requests(
625652
Result of the operation with processed and unprocessed requests.
626653
"""
627654
payload_size_limit_bytes = _MAX_PAYLOAD_SIZE_BYTES - math.ceil(_MAX_PAYLOAD_SIZE_BYTES * _SAFETY_BUFFER_PERCENT)
655+
# TODO: payload size limit bytes
628656

629657
request_params = self._params(clientKey=self.client_key, forefront=forefront)
630658
tasks = set[asyncio.Task]()
631-
queue: asyncio.Queue[list[dict]] = asyncio.Queue()
659+
queue: asyncio.Queue[AddRequestsBatch] = asyncio.Queue()
632660

633661
# Get the number of request batches.
634662
number_of_batches = math.ceil(len(requests) / _RQ_MAX_REQUESTS_PER_BATCH)
@@ -637,7 +665,7 @@ async def batch_add_requests(
637665
for i in range(number_of_batches):
638666
start = i * _RQ_MAX_REQUESTS_PER_BATCH
639667
end = (i + 1) * _RQ_MAX_REQUESTS_PER_BATCH
640-
batch = requests[start:end]
668+
batch = AddRequestsBatch(requests[start:end])
641669
await queue.put(batch)
642670

643671
# Start the worker tasks.

0 commit comments

Comments
 (0)